In [23]:
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm

In [4]:
# Define objective function to minimize (Rastrigin function)
def rastrigin(x):
    A = 10
    return A * len(x) + sum([(xi ** 2 - A * np.cos(2 * np.pi * xi)) for xi in x])

In [24]:
# Particle Swarm Optimization implementation (baseline)
class Particle:
    def __init__(self, dim, bounds, objective=rastrigin):
        self.position = np.array([random.uniform(bounds[0], bounds[1]) for _ in range(dim)])
        self.velocity = np.zeros(dim)
        self.best_position = np.copy(self.position)
        self.best_value = objective(self.position)

class PSO:
    def __init__(self, dim, bounds, num_particles=30, num_iter=100, objective=rastrigin):
        self.dim = dim
        self.bounds = bounds
        self.num_particles = num_particles
        self.num_iter = num_iter
        self.objective = objective
        self.swarm = [Particle(dim, bounds, objective) for _ in range(num_particles)]
        self.global_best_position = np.copy(self.swarm[0].position)
        self.global_best_value = objective(self.global_best_position)

    def optimize(self):
        w = 0.5
        c1, c2 = 1.5, 1.5
        for _ in range(self.num_iter):
            for particle in self.swarm:
                r1, r2 = random.random(), random.random()
                particle.velocity = (w * particle.velocity +
                                     c1 * r1 * (particle.best_position - particle.position) +
                                     c2 * r2 * (self.global_best_position - particle.position))
                particle.position = np.clip(particle.position + particle.velocity, self.bounds[0], self.bounds[1])
                value = self.objective(particle.position)
                if value < particle.best_value:
                    particle.best_position, particle.best_value = particle.position, value
                if value < self.global_best_value:
                    self.global_best_position, self.global_best_value = particle.position, value
        return self.global_best_position, self.global_best_value

In [25]:
# Policy Network for the RL component
class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_size=16):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.action_mean = nn.Linear(hidden_size, action_dim)
        # Learnable log_std parameter for the Gaussian distribution
        self.log_std = nn.Parameter(torch.zeros(action_dim))

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        mean = self.action_mean(x)
        std = torch.exp(self.log_std)
        return mean, std

    def select_action(self, state):
        mean, std = self.forward(state)
        dist = torch.distributions.Normal(mean, std)
        action = dist.sample()
        log_prob = dist.log_prob(action).sum()  # Sum log probabilities across dimensions
        # Clamp actions to a plausible range for c1 and c2 (e.g., [1.0, 2.0])
        action = torch.clamp(action, 1.0, 2.0)
        return action, log_prob

# RL-enhanced PSO using the policy network to select acceleration coefficients c1 and c2
class RLPSO(PSO):
    def __init__(self, dim, bounds, num_particles=30, num_iter=100, objective=rastrigin, lr=1e-2):
        super().__init__(dim, bounds, num_particles, num_iter, objective)
        # Define a simple state: [iteration_ratio, global_best_value]
        self.state_dim = 2
        self.action_dim = 2  # one for c1 and one for c2
        self.policy_net = PolicyNetwork(self.state_dim, self.action_dim)
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.objective = objective

    def get_state(self, iteration):
        # Normalize iteration to [0, 1]
        iter_norm = iteration / self.num_iter
        # Use the current global best value (optionally, further normalization can be applied)
        state = np.array([iter_norm, self.global_best_value], dtype=np.float32)
        return torch.tensor(state)

    def optimize(self):
        w = 0.5
        for it in tqdm(range(1, self.num_iter + 1), desc="Training progress"):
            state = self.get_state(it)
            action, log_prob = self.policy_net.select_action(state)
            c1, c2 = action.detach().numpy()  # use the selected coefficients for this iteration

            # Store the old global best for reward computation
            old_global_best = self.global_best_value

            for particle in self.swarm:
                r1, r2 = random.random(), random.random()
                particle.velocity = (w * particle.velocity +
                                     c1 * r1 * (particle.best_position - particle.position) +
                                     c2 * r2 * (self.global_best_position - particle.position))
                particle.position = np.clip(particle.position + particle.velocity, self.bounds[0], self.bounds[1])
                value = self.objective(particle.position)
                if value < particle.best_value:
                    particle.best_position, particle.best_value = particle.position, value
                if value < self.global_best_value:
                    self.global_best_position, self.global_best_value = particle.position, value

            # Reward: improvement in global best value
            reward = old_global_best - self.global_best_value
            reward_tensor = torch.tensor(reward, dtype=torch.float32)

            # Update policy network using the REINFORCE rule: maximize reward
            loss = -log_prob * reward_tensor
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

        return self.global_best_position, self.global_best_value


In [66]:
# Testing both the baseline PSO and the RL-enhanced PSO
dim = 20
bounds = (-5.12, 5.12)
num_particles = 30
num_iter = 100

print("Running baseline PSO...")
pso_solver = PSO(dim, bounds, num_particles, num_iter)
best_position_pso, best_value_pso = pso_solver.optimize()
print("\nBaseline PSO solution:")
print("Best position:", best_position_pso)
print("Best value:", best_value_pso)

print("\nRunning RL-enhanced PSO (RLPSO)...")
rl_pso_solver = RLPSO(dim, bounds, num_particles, num_iter)
best_position_rl, best_value_rl = rl_pso_solver.optimize()
print("\nRLPSO solution:")
print("Best position:", best_position_rl)
print("Best value:", best_value_rl)

Running baseline PSO...

Baseline PSO solution:
Best position: [ 1.0235498   0.95176271 -2.15361613  2.96116039  4.02970897  0.02861689
 -2.96370063  0.09227535  0.95874     1.0464803   1.03747529 -0.05790331
  3.00524706  1.02219839  1.87580394  0.01068383  1.99013958  0.02099836
 -0.09674651 -0.93212351]
Best value: 76.81647433027369

Running RL-enhanced PSO (RLPSO)...


Training progress: 100%|██████████| 100/100 [00:00<00:00, 224.76it/s]


RLPSO solution:
Best position: [ 0.05631228 -0.1991127  -0.96856322 -2.92018528  2.98497274 -0.06295822
  0.98502784  0.91351776 -1.14774585 -0.97165242  1.05866453 -0.02145037
 -1.14531822 -0.84982772  0.82602287  0.95595066 -0.06202925  0.1184671
 -3.04233934 -0.02878297]
Best value: 70.35736406913466



