# Libs

In [1]:
import time
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation

from enum import Enum
from IPython.display import HTML
from typing import Callable, Tuple

SEED = 50
np.random.seed(SEED)

%matplotlib notebook

In [77]:
# Position type
PosType = Tuple[float, float]

# Velocity type
VelType = Tuple[float, float]

# Memory type
MemType = Tuple[PosType, float]

# History type
HistType = Tuple[PosType, PosType, PosType, VelType]

MEM_TAL_INDEX = 0
MEM_COST_INDEX = 1

class PSO:
    def __init__(self, num_particles: int = 10, c1 = 0.5, c2 = 0.5, speed_limit: int = 1,
                 position_scaller: int = 1, speed_scaller: int = 1, cost_function: Callable = None,
                 bubble_radius: int = 1, swarm_radius: int = 0.25, k: float = 0.01, p: float = 0.9,
                 mms: int = 10, mpr: float = 0.1, md: float = 0.1, mpp1: float = 0.1, mpp2: float = 0.1,
                 FFthr: float = 0.0) -> None:
        
        # Classic PSO parameters
        self.num_particles = num_particles
        self.cost_function = cost_function
        self.c1 = c1
        self.c2 = c2

        # Alternative PSO parameters
        self.initialize_aor_variables(k, p)
        self.initialize_mem_tool(mms, mpr, md, mpp1, mpp2, FFthr)

        # Initializations
        self.initialize_positions(scaller=position_scaller)
        self.initialize_velocities(scaller=speed_scaller)

        # Global control variables
        cost = [self.cost_function(x) for x in self.X]
        self.gbest = self.pbest.copy()[np.argmin(cost)]

        # History for plotting
        self.initialize_history()

        # Simulation control variables
        self.speed_limit = speed_limit
        self.walls = []
        self.bubble_radius = bubble_radius
        self.swarm_radius = swarm_radius

    def euclidean_distance(self, a: np.ndarray, b: np.ndarray) -> float:
        return np.sqrt((a[0] - b[0])**2 + (a[1] - b[1])**2)
    
    def get_history(self) -> HistType:
        return np.array(self.X_history), np.array(self.pbest_history), np.array(self.gbest_history), np.array(self.V_history)

    def initialize_history(self) -> None:
        self.X_history = []
        self.V_history = []
        self.pbest_history = []
        self.gbest_history = []

    def initialize_positions(self, scaller: int = 1) -> None:
        self.X = np.random.rand(self.num_particles, 2) * scaller -1
        self.pbest = self.X.copy()

    def initialize_velocities(self, scaller: int = 1) -> None:
        self.V = np.random.rand(self.num_particles, 2) * scaller 

    def initialize_aor_variables(self, k: float = 0.01, p: float = 0.9) -> None:
        # theta -> Angle of rotation (AoR)
        self.theta = np.zeros(self.num_particles, dtype=np.float16) 
        # s -> Dynamic change step for AoR
        self.s = np.array([1 if np.random.rand() > 0.5 else -1 for _ in range(self.num_particles)], dtype=np.float16)   
        # dc -> Probability of changing s each interaction
        self.dc = np.zeros(self.num_particles, dtype=np.float16)
        # temperature -> Prevent theta decreasing too quickly
        self.temperature = np.zeros(self.num_particles, dtype=np.float16)
        # Trr -> Temperature reduction rate
        self.Trr = np.zeros(self.num_particles, dtype=np.float16)

        # Constants
        self.k = k
        self.p = p

    def initialize_mem_tool(self, mms: int = 10, mpr: float = 0.1, md: float = 0.1, mpp1: float = 0.1, mpp2: float = 0.1, FFthr: float = 0.0) -> None:
        self.mem: list[list[MemType]] = [[] for _ in range(self.num_particles)]
        self.mms = mms # Maximum Memory Size
        self.mpr = mpr # Memory Point Radius
        self.md = md # Minimum Displacement
        self.mpp1 = mpp1 # Memory Point Penalty 1
        self.mpp2 = mpp2 # Memory Point Penalty 2
        self.FFthr = FFthr # Threshold

    def has_collision_with_particle(self, pos: tuple[int, int]) -> bool:
        for _, agent in enumerate(self.X):
            if np.linalg.norm(agent - pos) < self.swarm_radius:
                return True
        return False
    
    def has_collision_with_wall(self, pos: tuple[int, int]) -> bool:
        bubble = (pos[0], pos[1], self.bubble_radius, self.bubble_radius)
        for wall in self.walls:
            # if pos[0] > wall[0] and pos[0] < wall[0] + wall[2] and pos[1] > wall[1] and pos[1] < wall[1] + wall[3]:
            #     return True
            if bubble[0] < wall[0] + wall[2] and bubble[0] + bubble[2] > wall[0] \
                and bubble[1] < wall[1] + wall[3] and bubble[1] + bubble[3] > wall[1]:
                return True
        return False
    
    def update_velocity(self, index, n1=1.0, n2=1.0) -> None:
        # Randomly set r1 and r2
        r1 = np.random.rand()
        r2 = np.random.rand()

        # Update velocity as classic PSO [2]
        self.V[index] = self.V[index] + self.c1 * n1 * r1 * (self.pbest[index] - self.X[index]) + self.c2 * n2* r2 * (self.gbest - self.X[index])

        # Update velocity using AoR concept [7]
        RV = [[np.cos(self.theta[index]), -np.sin(self.theta[index])], [np.sin(self.theta[index]), np.cos(self.theta[index])]]
        self.V[index] = np.matmul(RV, self.V[index])

        # Update theta with temperature [8]
        self.theta[index] = self.temperature[index] * self.theta[index]

        # Update temperature [9]
        self.temperature[index] = self.Trr[index] * self.temperature[index]

        # Limit velocity between arbitrary limits
        if np.abs(self.V[index][0]) > self.speed_limit:
            self.V[index][0] = np.sign(self.V[index][0])
        if np.abs(self.V[index][1]) > self.speed_limit:
            self.V[index][1] = np.sign(self.V[index][1])

    def update_position(self, index) -> None:
        next_pos = self.X[index] + self.V[index]

        if self.has_collision_with_wall(next_pos) or self.has_collision_with_particle(next_pos):
            # Change theta [4]
            self.theta[index] += + self.s[index]

            # Set temperature to maximun
            self.temperature[index] = 1.0

            # Increase Dc [5]
            self.dc[index] *= + self.k

            # Randomly modify s and dc (Maybe add a scaller here)
            if np.random.rand() < self.dc[index]:
                print(f"Agent {index} modified. Cost: {self.cost_function(self.X[index]):.2f} Pos: {self.X[index]}")
                self.s[index] *= -1
                self.dc[index] = 0.0

            # Blocking condition to not pass through walls or particles
            next_pos = self.X[index]
        else:
            # Decrease dc [6]
                self.dc[index] *= self.p

        self.X[index] = next_pos

    def evaluate_memory(self, index: int = None) -> None:
        if index is None:
            return
        
        current_position = self.X[index]
        
        # If a particle memory is empty, then start it
        if 0 == len(self.mem[index]):
            self.mem[index].append([current_position, self.cost_function(current_position)])
            return
        
        # For each memory registered in the memory of a particle
        for i in range(len(self.mem[index])):

            # Take the τ and ν from the memory
            tal, val = self.mem[index][i]
            
            # if Pbest distance to any τ in the memory is less than MPR
            distance = self.euclidean_distance(current_position, tal)
            if distance < self.mpr:
                # Reduce corresponding ν inside the Memory by MPP1
                self.mem[index][i][1] = val - self.mpp1

                # Select the pair with minimum ν from the memory and set its τ as Pit
                best_cost_index = np.argmax(self.mem[index][i][:][1])
                self.mem[index][best_cost_index][MEM_TAL_INDEX] = current_position
            else:
                # if size of Memory i > MMS then 
                if len(self.mem[index]) > self.mms:
                    # Remove the oldest Memory i member
                    self.mem[index].pop(0)

                # Insert new xi into Memory i
                self.mem[index].append([current_position, self.cost_function(current_position)])

    def evaluate_epoch(self, index: int = None, epoch: int = 0) -> None:
        if index is None:
            return
        
        pbest_cost = self.cost_function(self.pbest[index])
        local_cost = self.cost_function(self.X[index])

        # Reduce ν for τ near to Pit1 inside the Memoryit by MPP2  
        if len(self.mem[index]) > 0 and self.md > self.euclidean_distance(local_cost, self.mem[index][-1][MEM_TAL_INDEX]):
            self.mem[index][-1][MEM_COST_INDEX] -= self.mpp2

        # Current x has better fitness than pbest
        if local_cost < pbest_cost:
            self.evaluate_memory(index)

            # Update the set of Personal-Bests it with P it
            self.pbest[index] = self.X.copy()[index]

            # Select the best element of Personal-Bests it as G i
            global_cost = self.cost_function(self.gbest)
            if pbest_cost < global_cost:
                print(f"Epoch {epoch}. Global best improved. Cost: {self.cost_function(self.gbest):.2f} Pos: {self.gbest}")
                self.gbest = self.pbest[index]

    def register_history(self) -> None:
        self.X_history.append(self.X.copy())
        self.V_history.append(self.V.copy())
        self.pbest_history.append(self.pbest.copy())
        self.gbest_history.append(self.gbest)

    def run(self, epochs: int = 300, end_condition_checker: Callable = None, timeout_epochs: int = 1000) -> None:      
        try:
            MAX_EPOCHS = timeout_epochs
            while epochs or end_condition_checker is not None:
                conter = epochs if end_condition_checker is None else MAX_EPOCHS-timeout_epochs

                for i in range(self.num_particles):
                    self.update_position(i)
                    self.evaluate_epoch(i, conter)
                    
                    n1 = 1 if self.cost_function(self.pbest[i]) > self.FFthr else 0
                    n2 = 1 if self.cost_function(self.gbest) > self.FFthr else 0

                    if n1 == 0 and n2 == 0:
                        self.mem: list[list[MemType]] = [[] for _ in range(self.num_particles)]
                    else:
                        self.update_velocity(i, n1, n2)

                self.register_history()

                if end_condition_checker is not None and end_condition_checker(self.gbest, self.pbest):
                    print(f"End condition reached in {conter} epochs. Best cost: {self.cost_function(self.gbest)}")
                    break
                elif end_condition_checker is None:
                    epochs -= 1
                elif end_condition_checker is not None and timeout_epochs > 0:
                    timeout_epochs -= 1
                elif timeout_epochs == 0:
                    print(f"Timeout reached in {conter} epochs. Best cost: {self.cost_function(self.gbest)}")
                    break
                
        except KeyboardInterrupt:
            print("Stopped by user")

    def animate(self, i):
        X_history, pbest_history, gbest_history, V_history = self.get_history()
        if i > len(X_history):
            return [*self.X_plots, *self.pbest_plots, *self.V_plots, self.gbest_plot]
        
        if X_history is None:
            raise Exception("Run simulation first")
        
        for j in range(len(self.X_plots)):
            self.X_plots[j].set_offsets(X_history[i][j])
            self.V_plots[j].set_offsets(X_history[i][j])
            self.V_plots[j].set_UVC(V_history[i][j][0], V_history[i][j][1])

        for j in range(len(self.pbest_plots)):
            self.pbest_plots[j].set_offsets(pbest_history[i][j])

        self.gbest_plot.set_offsets(gbest_history[i])

        self.plot_title.set_text(f"PSO Epoch: {i}")
        
        return [*self.X_plots, *self.pbest_plots, *self.V_plots, self.gbest_plot]

    def plot(self, ax: plt.Axes) -> None:
        self.plot_title = ax.set_title("PSO")
        ax.grid(visible=True, which='both', axis='both', color='gray', linestyle='--')
        ax.set_xlabel('X')
        ax.set_ylabel('Y')

        for wall in self.walls:
           ax.add_patch(plt.Rectangle((wall[0], wall[1]), wall[2], wall[3], color='black', fill=True, alpha=0.5))
        
        pbest_list = self.get_history()[1][0]
        X_list = self.get_history()[0][0]
        gbest = self.get_history()[2][0]
        V_list = self.get_history()[3][0]

        self.X_plots = [
            ax.scatter(x[0], x[1], color='blue', marker='o') for x in X_list
        ]
        
        self.V_plots = [
            ax.quiver(x[0], x[1], v[0], v[1], angles='xy', scale_units='xy', scale=1, width=0.005, alpha=0.3) for x, v in zip(X_list, V_list)
        ]
        
        self.pbest_plots = [
            ax.scatter(pbest[0], pbest[1], color='orange', marker='x', alpha=0.5) for pbest in pbest_list
        ]

        self.gbest_plot = ax.scatter(gbest[0], gbest[1], color='green', marker='x')

    def display(self, fig: plt.Figure, ax: plt.Axes, interval: int = 100) -> None:
        self.plot(ax)

        anim = animation.FuncAnimation(
            fig,
            self.animate,
            frames=len(self.X_history),
            interval=interval,
            blit=True
        )

        return anim
    
    def add_virtual_wall(self, x, y, width, height) -> None:
        self.walls.append((x, y, width, height))

In [78]:
def maze(add_virtual_walls: Callable, shape: tuple[int, int] = (50, 50)) -> None:

    add_virtual_walls(-5, -5, 3, shape[1])
    add_virtual_walls(-5, -5, shape[0], 3)
    add_virtual_walls(shape[0]-8, -5, 3, shape[1])
    add_virtual_walls(-5, shape[1]-8, shape[0], 3)

    add_virtual_walls(shape[0]/5 - 5, -5, 3, shape[1]/1.5)
    add_virtual_walls(shape[0]/5 - 5, shape[1]/1.5 - 8, shape[1]/3, 3)

    add_virtual_walls(shape[0]/2 - 7, 3.5, 3, shape[1]/2)


In [79]:
TARGET = (30, 10)
COST_TARGET = 0.5
NUM_PARTICLES = 10

def euclidean_distance(a: np.ndarray, b: np.ndarray) -> float:
    return np.sqrt((a[0] - b[0])**2 + (a[1] - b[1])**2)

def cost_function(X: np.ndarray) -> float:
    return euclidean_distance(X, np.array(TARGET))

def end_condition(gbest: np.ndarray, pbest: np.ndarray) -> bool:
    gbest_contition = cost_function(gbest) < COST_TARGET
    costs = np.array([cost_function(x) for x in pbest], dtype=np.float16)
    pbest_condition = np.count_nonzero(costs < COST_TARGET) >= NUM_PARTICLES

    return gbest_contition and pbest_condition

pso = PSO(num_particles=NUM_PARTICLES, c1=0.3, c2=0.7, position_scaller=2, speed_scaller=0.2, cost_function=cost_function,
          bubble_radius=1, swarm_radius=0.01, speed_limit=0.5, k=0.005, p=0.9, mpr=0.25, mms=20, md=0.1, mpp1=0.2, mpp2=0.1)
maze(pso.add_virtual_wall, shape=(50, 50))
pso.run(end_condition_checker=end_condition, timeout_epochs=700)
# pso.run(epochs=500)

best_cost = [cost_function(x) for x in pso.X]
best_index = np.argmin(best_cost)
print(f"Best particle cost: {best_cost[best_index]}")
print(f"Best particle position: {pso.X[best_index]}")

IndexError: invalid index to scalar variable.

In [80]:
pso.mem

[[[array([1.03995554, 0.09407325]), np.float64(30.73546059385123)]],
 [[array([-0.25494528,  0.70001901]), np.float64(31.652035642954477)]],
 [[array([ 0.93922912, -0.60188664]), np.float64(30.934259399731783)]],
 [[array([ 1.07074244, -0.10294376]), np.float64(30.642640480986575)]],
 [[array([-0.68582541, -0.01060526]), np.float64(32.2774239803266)]],
 [[array([-0.78913259, -0.88723914]), np.float64(32.657352336325296)]],
 [[array([-0.98278874,  0.42566164]), np.float64(32.42840041014409)]],
 [[array([ 1.06886884, -0.06080517]), np.float64(30.630542777236247)]],
 [[array([-0.14200458,  0.35035715]), np.float64(31.648950174693486)]],
 [[array([0.66620039, 0.12177569]), np.float64(30.952400801613603)]]]

In [73]:
fig, ax = plt.subplots(figsize=(10, 10))
ax.scatter(TARGET[0], TARGET[1], color='red', marker='x')
ax.set_xlim(-10, 100)
ax.set_ylim(-10, 100)

# pso.plot(ax)
# plt.savefig('pso.png')

anim = pso.display(fig, ax, interval=10)
anim.save('pso.gif', writer='pillow')

<IPython.core.display.Javascript object>