In [1]:
NUM_PARTICLES = 30
NUM_ITERATIONS = 100
INERTIA = 0.5
COGNITIVE = 2 
SOCIAL = 2
BOUNDS = [[-10,10],[-10,10]]
CONFIG_FILENAME = 'pso_config.json'
DEFAULT_CONFIG_NAMES = ["num_particles","num_iterations","inertia","cognitive","social","bounds"]
DEFAULT_PARTICLE = [NUM_PARTICLES, NUM_ITERATIONS,INERTIA, COGNITIVE, SOCIAL, BOUNDS]

In [3]:
import numpy as np 
import json 
from constants import *
import os
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import numpy as np

In [14]:


class Particle:
    def __init__(self, bounds):
        self.position = np.array([np.random.uniform(low, high) for low, high in bounds])
        self.velocity = np.zeros_like(self.position)
        self.best_position = np.copy(self.position)
        self.best_value = float('inf')

    def update_velocity(self, global_best_position, inertia, cognitive, social):
        r1 = np.random.random(len(self.position))
        r2 = np.random.random(len(self.position))

        cognitive_velocity = cognitive * r1 * (self.best_position - self.position)
        social_velocity = social * r2 * (global_best_position - self.position)
        self.velocity = inertia * self.velocity + cognitive_velocity + social_velocity

    def update_position(self, bounds):
        self.position += self.velocity
        for i in range(len(self.position)):
            if self.position[i] < bounds[i][0]:
                self.position[i] = bounds[i][0]
            if self.position[i] > bounds[i][1]:
                self.position[i] = bounds[i][1]


class ParticleSwarmOptimizer:
    def __init__(self, particle_class, objective_function, config_filename = None):
        
        self.particle_class = particle_class  # The particle class to use
        if config_filename is None:
            self.config = self.load_or_create_config()
        else:
            self.config = self.load_or_create_config(config_filename)
        self.objective_function = objective_function
        self.bounds = self.config['bounds']
        self.num_particles = self.config['num_particles']
        self.num_iterations = self.config['num_iterations']
        self.inertia = self.config['inertia']
        self.cognitive = self.config['cognitive']
        self.social = self.config['social']
        self.swarm = [self.particle_class(self.bounds) for _ in range(self.num_particles)]
        self.global_best_position = np.copy(self.swarm[0].position)
        self.global_best_value = float('inf')


    def load_or_create_config(self,filename=CONFIG_FILENAME):
        if os.path.exists(filename):
            try:
                with open(filename, 'r') as f:
                    config = json.load(f)
                    print("Loaded configuration from", filename)
            except json.JSONDecodeError as e:
                print(f"加载 {filename} 时出现 JSON 错误: {e}")
            # 如果 JSON 无效，创建新的默认配置
            config = self.create_default_config(filename)
        else:
            # 如果找不到文件，创建带有默认值的配置文件
            config = self.create_default_config(filename)
        return config

        
    def optimize(self, save_iterations=False, print_iterations=False):
        # Initialize a list to store the best values and positions at each iteration if required
        if save_iterations:
            best_values_per_iteration = np.zeros(self.num_iterations)
            best_positions_per_iteration = np.zeros((self.num_iterations, len(self.bounds)))
    
        for iteration in range(self.num_iterations):
            for particle in self.swarm:
                # Evaluate the objective function
                value = self.objective_function(particle.position)
                
                # Update personal best
                if value < particle.best_value:
                    particle.best_value = value
                    particle.best_position = np.copy(particle.position)
    
                # Update global best
                if value < self.global_best_value:
                    self.global_best_value = value
                    self.global_best_position = np.copy(particle.position)
    
            # Save the current global best value and position for this iteration if required
            if save_iterations:
                best_values_per_iteration[iteration] = self.global_best_value
                best_positions_per_iteration[iteration] = np.copy(self.global_best_position)
    
            # Update velocity and position of each particle
            for particle in self.swarm:
                particle.update_velocity(self.global_best_position, self.inertia, self.cognitive, self.social)
                particle.update_position(self.bounds)
    
            if print_iterations:
                print(f"Iteration {iteration+1}/{self.num_iterations}, Global Best Value: {self.global_best_value}")
        
        # Return the best values and positions per iteration if save_iterations is True
        if save_iterations:
            self.best_values_per_iteration = best_values_per_iteration
            self.best_positions_per_iteration = best_positions_per_iteration
            return self.global_best_position, self.global_best_value, self.best_values_per_iteration, self.best_positions_per_iteration
        else:
            return self.global_best_position, self.global_best_value, None, None



    
    def plot_pso_convergence(self, animated=False, gif_name="pso_convergence.gif"):
        """
        Plots the convergence of the PSO algorithm with a contour plot of the objective function.
        
        Parameters:
        - objective_function: the objective function to plot as a contour.
        - animated: if True, creates an animated GIF showing the convergence process with particle movement.
        - gif_name: name of the GIF file to save if animated=True.
        """
        objective_function = self.objective_function
        iteration_values = self.best_values_per_iteration
        iteration_positions = self.best_positions_per_iteration
        
        # Create a grid of points to evaluate the objective function for contour plotting
        x = np.linspace(self.bounds[0][0], self.bounds[0][1], 100)
        y = np.linspace(self.bounds[1][0], self.bounds[1][1], 100)
        X, Y = np.meshgrid(x, y)
        Z = np.array([[objective_function([x_val, y_val]) for x_val in x] for y_val in y])
        
        if animated:
            # Create the animated particle movement GIF with global best position
            fig, ax = plt.subplots()
    
            # Plot the contour of the objective function
            contour = ax.contourf(X, Y, Z, levels=50, cmap='viridis')
            plt.colorbar(contour, ax=ax)
    
            # Initialize a scatter plot for particle positions
            particles = ax.scatter([], [], c='blue', label="Particles", s=50)
            global_best = ax.scatter([], [], c='red', label="Global Best", s=100, marker='x')
    
            ax.set_title("PSO Particle Movement and Convergence")
            ax.set_xlabel("X Position")
            ax.set_ylabel("Y Position")
    
            def init():
                particles.set_offsets([])
                global_best.set_offsets([])
                return particles, global_best
    
            def update(frame):
                # Update particle positions (in this case, just the global best position)
                particle_positions = iteration_positions[frame]
                particles.set_offsets(particle_positions)
                
                # Update global best position (red cross)
                global_best_position = iteration_positions[frame]
                global_best.set_offsets(global_best_position)
    
                return particles, global_best
    
            ani = animation.FuncAnimation(fig, update, frames=len(iteration_positions), init_func=init, blit=True)
    
            # Save the animation as GIF
            ani.save(gif_name, writer='imagemagick', fps=5)
            print(f"Animation saved as {gif_name}")
    
        else:
            # Static plot with contour and global best value convergence over iterations
            fig, ax = plt.subplots()
    
            # Plot the contour of the objective function
            contour = ax.contourf(X, Y, Z, levels=50, cmap='viridis')
            plt.colorbar(contour, ax=ax)
    
            # Plot the global best values over iterations
            plt.plot(iteration_values, marker='o', linestyle='-', color='b')
            plt.title("PSO Convergence Plot")
            plt.xlabel("Iterations")
            plt.ylabel("Global Best Value")
            plt.grid(True)
            plt.show()

Particle（） 构建单个粒子。从随机位置开始，然后具有 update_velocity（） 和 update_position（） 功能，以根据我们上面所说的内容更新速度和位置（您可以检查以确保我没有撒谎）。
ParticleSwarmOptimizer（） 需要两个对象：Particle（） 类，要构建一个类（如果需要，可以通过向粒子添加特征或修改更新特征来使算法更加复杂），objective_function是我们要最小化的黑盒函数。

https://miro.medium.com/v2/resize:fit:1100/format:webp/0*2eAaUvFBp2ksHI3n.png

In [15]:
import numpy as np
import json
import os
def quadratic_objective_function(x):
    return x[0] ** 2 + x[1] ** 2 + 50 
pso = ParticleSwarmOptimizer(Particle,quadratic_objective_function) 
best_position,best_value,iteration_values,iteration_position =  pso.optimize(save_iterations=True,print_iterations=False) 
print("Best Position:",best_position) 
print("Best Value:",best_value) 
pso.plot_pso_convergence(animated=True)

AttributeError: 'ParticleSwarmOptimizer' object has no attribute 'create_default_config'

In [13]:
def complex_objective_function(x):
    # A combination of a quadratic term, a sine wave, and an exponential decay
    term1 = (x[0]**2 + x[1]**2)  # Quadratic term
    term2 = 10 * np.sin(x[0]) * np.sin(4*x[1])  # Sine wave introducing non-linearity
    term3 = np.exp(-0.1 * (x[0]**2 + x[1]**2))  # Exponential decay to create local minima
    return term1 - term2 + 5 * term3 + 20  # The 20 is just to shift the entire surface upward

pso = ParticleSwarmOptimizer(Particle,complex_objective_function)
best_position, best_value, iteration_values,iteration_positions = pso.optimize(save_iterations=True,print_iterations=False)
print("Best Position:", best_position)
print("Best Value:", best_value)
pso.plot_pso_convergence(animated=True)

JSONDecodeError: Expecting value: line 1 column 1 (char 0)