## Swarm Intelligence: Formalizing Problem-Solving in Complex Systems

Swarm intelligence (SI) is a subfield of artificial intelligence that draws inspiration from the collective behavior of decentralized, self-organized systems in nature. These systems, often composed of simple agents, exhibit complex global behaviors through local interactions. Examples include ant colonies, bird flocks, and fish schools.

**Key Concepts in Swarm Intelligence**

* **Agents:** The basic units of a swarm system. Agents have limited individual capabilities.

* **Local Interactions:** Agents interact directly only with their immediate neighbors.

* **Decentralization:** Control is distributed among agents, with no central authority.

* **Self-Organization:** Global patterns emerge from local interactions, without explicit planning.

* **Emergence:** Complex, high-level behavior arises from simple, low-level rules.

**Common Swarm Intelligence Algorithms**

1.  **Ant Colony Optimization (ACO)**

    * Inspired by the foraging behavior of ants.

    * Ants deposit pheromone trails to indicate favorable paths.

    * Other ants follow these trails, reinforcing them over time.

    * Used for combinatorial optimization problems.

    * Formalization:

        Let $G = (V, E)$ be a graph, where $V$ is the set of vertices (nodes) and $E$ is the set of edges. Let $m$ be the number of ants. Let $\tau_{ij}(t)$ be the amount of pheromone on edge $(i, j)$ at time $t$. Let $\eta_{ij}$ be the desirability of edge $(i, j)$ (e.g., inverse of the distance). The probability $p_{ij}^k$ of ant $k$ moving from node $i$ to node $j$ is given by:

        $$
        p_{ij}^k(t) =
        \begin{cases}
        \frac{[\tau_{ij}(t)]^\alpha [\eta_{ij}]^\beta}{\sum_{l \in N_i^k} [\tau_{il}(t)]^\alpha [\eta_{il}]^\beta}, & \text{if } j \in N_i^k \\
        0,              & \text{otherwise}
        \end{cases}
        $$

        where:

        * $N_i^k$ is the set of neighbors of node $i$ not yet visited by ant $k$.

        * $\alpha$ and $\beta$ are parameters controlling the influence of pheromone and desirability.

In [None]:
import numpy as np
import random
import time
import multiprocessing as mp

In [None]:
 # Problem setup
num_cities = 100
cities = np.random.randint(-100, 100, size=(num_cities, 2))

num_ants = 100
num_iterations = 200
evaporation_rate = 0.5
alpha = 1  # Influence of pheromone
beta = 5  # Influence of distance
Q = 10  # Pheromone deposit constant
num_processes = mp.cpu_count()  # Use all available cores
print(num_processes)

In [None]:
# Pre-compute distances between cities
distances = np.zeros((num_cities, num_cities))
for i in range(num_cities):
    for j in range(num_cities):
        distances[i][j] = np.sqrt((cities[i][0] - cities[j][0]) ** 2 + (cities[i][1] - cities[j][1]) ** 2)

# Initialize pheromone levels on each path.  The pheromone matrix
# has dimensions num_cities x num_cities.  pheromone[i][j]
# represents the amount of pheromone on the path from city i to city j.
pheromone = np.ones((num_cities, num_cities))
best_path = None
best_distance = float('inf')

In [None]:
# Parallel processing setup using multiprocessing.  These structures
# are used to coordinate the work of the worker processes.
task_queue = mp.Queue()       # Queue for sending tasks to worker processes
result_queue = mp.Queue()     # Queue for receiving results from worker processes
processes = []              # List to store the process objects

In [None]:
def ant_worker(task_queue, result_queue, distances, alpha, beta, pheromone, num_cities):
    """
    Worker function for the multiprocessing pool.  Each worker runs ACO for its
    assigned ants.

    Args:
        task_queue (mp.Queue): Queue for tasks (in this case, just a signal to start).
        result_queue (mp.Queue): Queue for results (list of paths).
        distances (numpy.ndarray): A 2D numpy array representing the distance matrix.
        alpha (float): The influence of pheromone.
        beta (float): The influence of distance.
        pheromone (numpy.ndarray): The pheromone matrix (will be updated, so pass a shared version).
        num_cities (int): The number of cities
    """
    while True:
        task = task_queue.get()  # Get a task from the queue
        if task is None:
            break  # Sentinel value indicating no more tasks
        paths = []
        for _ in range(task):  # task is the number of ants for this worker
            path = []
            visited = [False] * num_cities
            path.append(random.randint(0, num_cities - 1))  # Start at a random city
            visited[path[0]] = True

            for _ in range(num_cities - 1):
                current_city = path[-1]
                probabilities = []
                for next_city in range(num_cities):
                    if not visited[next_city]:
                        pheromone_level = pheromone[current_city][next_city]
                        distance = distances[current_city][next_city]
                        probability = (pheromone_level ** alpha) * ((1 / distance) ** beta)
                        probabilities.append((next_city, probability))

                if not probabilities:
                    break

                total_probability = sum(p for _, p in probabilities)
                normalized_probabilities = [(city, p / total_probability) for city, p in probabilities]

                rand = random.random()
                cumulative_probability = 0
                next_city = -1
                for city, probability in normalized_probabilities:
                    cumulative_probability += probability
                    if rand <= cumulative_probability:
                        next_city = city
                        break
                if next_city == -1:
                    next_city = probabilities[-1][0]  # safety
                path.append(next_city)
                visited[next_city] = True
            paths.append(path)
        result_queue.put(paths)  # Put the result paths into the queue

In [None]:
# Start worker processes.  The worker processes will run the ant_worker function.
for _ in range(num_processes):
    p = mp.Process(target=ant_worker, args=(task_queue, result_queue, distances, alpha, beta, pheromone, num_cities))
    processes.append(p)
    p.start()  # Start the process

In [None]:
# Main ACO loop.  This loop iterates for the specified number of iterations.
for iteration in range(num_iterations):
    start_time = time.time()  # Time the start of the iteration

    # Divide ants among processes.  Each process gets a roughly equal
    # number of ants to process.
    ants_per_process = num_ants // num_processes
    remaining_ants = num_ants % num_processes
    for i in range(num_processes):
        n_ants = ants_per_process + (1 if i < remaining_ants else 0)
        task_queue.put(n_ants)  # Send the number of ants to process to the worker

    all_paths = []
    for _ in range(num_processes):
        paths = result_queue.get()  # Get the paths from a worker process
        all_paths.extend(paths)  # Add the paths to the list of all paths

    # Update pheromone levels based on the paths taken by the ants
    pheromone *= (1 - evaporation_rate)  # Evaporate existing pheromone
    for path in all_paths:
        path_distance = 0
        for i in range(len(path)):
            j = (i + 1) % len(path)
            path_distance += distances[path[i]][path[j]]
        for i in range(len(path)):
            j = (i + 1) % len(path)
            pheromone[path[i]][path[j]] += Q / path_distance
            pheromone[path[j]][path[i]] += Q / path_distance

    # Find the best path found in this iteration
    for path in all_paths:
        path_distance = 0
        for i in range(len(path)):
            j = (i + 1) % len(path)
            path_distance += distances[path[i]][path[j]]
        if path_distance < best_distance:
            best_distance = path_distance
            best_path = path

    end_time = time.time()  # Time the end of the iteration
    print(f"Iteration {iteration + 1}/{num_iterations}, Best distance: {best_distance:.2f}, Time: {end_time - start_time:.2f} seconds", end="\r")

In [None]:
# Stop worker processes.  Send a None task to each worker to signal that it should terminate.
for _ in range(num_processes):
    task_queue.put(None)
for p in processes:
    p.join()  # Wait for the process to finish

# Print the best path and its distance
print("Best Path:", best_path)
print("Best Distance:", best_distance)

2.  **Particle Swarm Optimization (PSO)**

    * Inspired by the social behavior of bird flocks or fish schools.

    * Particles move through a search space, adjusting their positions based on their own best-known position and the best-known position of their neighbors.

    * Used for continuous optimization problems.

    * Formalization:

        Let $x_i(t)$ be the position of particle $i$ at time $t$. Let $v_i(t)$ be the velocity of particle $i$ at time $t$. Let $p_i$ be the personal best position of particle $i$. Let $g$ be the global best position of the swarm. The update equations for position and velocity are:

        $$
        v_i(t+1) = \omega v_i(t) + c_1 r_1 (p_i - x_i(t)) + c_2 r_2 (g - x_i(t))
        $$

        $$
        x_i(t+1) = x_i(t) + v_i(t+1)
        $$

        where:

        * $\omega$ is the inertia weight.

        * $c_1$ and $c_2$ are acceleration coefficients.

        * $r_1$ and $r_2$ are random numbers in the range \[0, 1].

In [None]:
# Problem definition
dim = 10  # Dimensionality of the problem
num_particles = 1000
num_iterations = 1000
bounds = [(-10, 10)] * dim  # Define the bounds for each dimension
w = 0.7  # Inertia weight
c1 = 1.4  # Cognitive coefficient
c2 = 1.4  # Social coefficient
num_processes = mp.cpu_count()  # Use all available cores

In [None]:
def objective_function(x):
    """
    A sample objective function.  This is the function that PSO will try to minimize.
    In this case, it's a simple quadratic function.
    """
    return np.sum(x**2)

In [None]:
# Initialize particles
particles = []
for _ in range(num_particles):
    position = np.random.uniform(low=[b[0] for b in bounds], high=[b[1] for b in bounds], size=dim)
    velocity = np.random.uniform(low=-1, high=1, size=dim)
    particles.append({
        'position': position,
        'velocity': velocity,
        'pbest': position.copy(),
        'pbest_value': objective_function(position),
    })

# Initialize global best
gbest = particles[0]['pbest'].copy()
gbest_value = particles[0]['pbest_value']
for particle in particles:
    if particle['pbest_value'] < gbest_value:
        gbest = particle['pbest'].copy()
        gbest_value = particle['pbest_value']

# Create queues for communication
task_queue = mp.Queue()
result_queue = mp.Queue()
processes = []

In [None]:
def particle_worker(task_queue, result_queue, objective_function, dim, bounds, w, c1, c2):
    """
    Worker function for the multiprocessing pool.  Each worker initializes and updates
    a subset of particles.

    Args:
        task_queue (mp.Queue): Queue for tasks (in this case, a tuple containing particle index).
        result_queue (mp.Queue): Queue for results (a tuple containing updated particle info).
        objective_function (function): The objective function to minimize.
        dim (int): The dimensionality of the search space.
        bounds (list): A list of tuples, where each tuple represents the (min, max) bounds
                       for a dimension.
        w (float): Inertia weight.
        c1 (float): Cognitive coefficient.
        c2 (float): Social coefficient.
    """
    while True:
        task = task_queue.get()
        if task is None:
            break  # Sentinel value indicating no more tasks

        particle_index, position, velocity, pbest, pbest_value = task

        r1 = np.random.rand(dim)
        r2 = np.random.rand(dim)

        # Velocity update equation
        velocity = (w * velocity) + (c1 * r1 * (pbest - position)) + (c2 * r2 * (gbest - position))
        # Position update equation
        position = position + velocity

        # Handle boundary constraints
        for i in range(dim):
            if position[i] < bounds[i][0]:
                position[i] = bounds[i][0]
                velocity[i] = 0
            elif position[i] > bounds[i][1]:
                position[i] = bounds[i][1]
                velocity[i] = 0

        current_value = objective_function(position)

        if current_value < pbest_value:
            pbest = position.copy()
            pbest_value = current_value

        result_queue.put((particle_index, position, velocity, pbest, pbest_value, current_value))

In [None]:
# Create and start the process pool
for _ in range(num_processes):
    p = mp.Process(target=particle_worker, args=(task_queue, result_queue, objective_function, dim, bounds, w, c1, c2))
    processes.append(p)
    p.start()

# Main PSO loop
for iteration in range(num_iterations):
    start_time = time.time()

    # Send tasks to workers
    for i, particle in enumerate(particles):
        task_queue.put((i, particle['position'], particle['velocity'], particle['pbest'], particle['pbest_value']))

    # Collect results from workers and update particles
    for _ in range(num_particles):
        result = result_queue.get()
        particle_index, position, velocity, pbest, pbest_value, _ = result
        particles[particle_index]['position'] = position
        particles[particle_index]['velocity'] = velocity
        particles[particle_index]['pbest'] = pbest
        particles[particle_index]['pbest_value'] = pbest_value

        # Update global best
        if pbest_value < gbest_value:
            gbest = pbest.copy()
            gbest_value = pbest_value

    end_time = time.time()
    print(f"Iteration {iteration + 1}/{num_iterations}, Best value: {gbest_value:.4f}, Time: {end_time - start_time:.2f} seconds", end="\r")

# Terminate workers
for _ in range(num_processes):
    task_queue.put(None)
for p in processes:
    p.join()

print("Global Best Position:", gbest)
print("Global Best Value:", gbest_value)

3.  **Artificial Bee Colony (ABC)**

    * Inspired by the foraging behavior of honey bees.

    * Employed bees search for food sources (solutions) and share information with onlooker bees.

    * Onlooker bees select food sources based on the information provided and exploit them further.

    * Scout bees abandon poor food sources and search for new ones.

    * Used for global optimization problems.

# 🧬 Differential Evolution (DE)

## 💡 What Is Differential Evolution?

**Differential Evolution (DE)** is a **stochastic, population-based optimization algorithm** that is highly effective for solving complex optimization problems. It is especially well-suited for:

- Non-linear, non-differentiable functions  
- High-dimensional search spaces  
- Multi-modal (many local optima) problems  

Originally proposed by **Storn and Price (1997)**, DE has become a go-to tool for global optimization when gradient-based methods fail.

---

## ⚙️ How Differential Evolution Works

DE evolves a population of candidate solutions using three main operations:

### 1. **Mutation**

For each individual $ x_i $ in the population, a mutant vector $ v_i $ is generated:

$$
v_i = x_{r1} + F \cdot (x_{r2} - x_{r3})
$$

Where:
- $ x_{r1}, x_{r2}, x_{r3} $ are randomly chosen distinct individuals from the population (different from $ x_i $)
- $ F \in [0, 2] $ is a **scaling factor** that controls the amplification of the differential variation

---

### 2. **Crossover**

The mutant vector $ v_i $ is mixed with the original vector $ x_i $ to form a **trial vector** $u_i $:

$$
u_{ij} = 
\begin{cases}
v_{ij}, & \text{if } rand_j \leq CR \text{ or } j = j_{\text{rand}} \\
x_{ij}, & \text{otherwise}
\end{cases}
$$

Where:
- $ CR \in [0, 1] $ is the **crossover rate**
- $ j_{\text{rand}} $ ensures that at least one component comes from $ v_i $

---

### 3. **Selection**

The trial vector $ u_i $ is compared to the original vector $ x_i $. The one with the **better fitness** (lower cost for minimization) survives to the next generation:

$$
x_i^{(t+1)} = 
\begin{cases}
u_i, & \text{if } f(u_i) < f(x_i) \\
x_i, & \text{otherwise}
\end{cases}
$$

---

In [None]:
from scipy.optimize import differential_evolution

def rastrigin(x):
    return 10 * len(x) + sum([(i ** 2 - 10 * np.cos(2 * np.pi * i)) for i in x])

bounds = [(-5.12, 5.12)] * 2

result = differential_evolution(rastrigin, bounds)
print("Best solution:", result.x)
print("Best objective value:", result.fun)

# 🧬 Genetic Algorithms (GAs)

## 💡 What Are Genetic Algorithms?

**Genetic Algorithms (GAs)** are a class of **evolutionary algorithms** inspired by the process of **natural selection** and **genetic evolution** in biology. They are used to **solve optimization and search problems** by evolving a population of candidate solutions over time.

---

## 🔁 How Genetic Algorithms Work

GAs simulate the process of evolution through these main steps:

### 1. **Initialization**
- Generate an initial population of individuals (solutions) randomly.
- Each individual is typically encoded as a **chromosome** (e.g., binary string, list of numbers).

### 2. **Evaluation**
- Compute the **fitness** of each individual using a **fitness function** (objective function).

### 3. **Selection**
- Choose individuals for reproduction based on their fitness.
- Better individuals have a **higher probability** of being selected (e.g., roulette wheel, tournament).

### 4. **Crossover (Recombination)**
- Combine genetic material from two parents to create offspring.
- Simulates biological reproduction.

Example (one-point crossover for binary strings):

```
Parent 1: 101|011  
Parent 2: 111|000  
Child:    101000
```

### 5. **Mutation**
- Randomly flip bits or tweak parameters to maintain genetic diversity.
- Helps explore new areas of the search space.

### 6. **Replacement**
- Replace part or all of the population with the new offspring.
- Repeat the process for a fixed number of generations or until convergence.

---

## 🧠 Example: Binary Maximization Problem

Maximize the number of ones in a binary string of length 8:

$$
f(x) = \sum_{i=1}^{8} x_i
$$
---

In [None]:
import random

def fitness(individual):
    return sum(individual)

def create_individual(length=8):
    return [random.randint(0, 1) for _ in range(length)]

def crossover(parent1, parent2):
    point = random.randint(1, len(parent1) - 1)
    return parent1[:point] + parent2[point:]

def mutate(individual, prob=0.1):
    return [bit if random.random() > prob else 1 - bit for bit in individual]

# GA Parameters
population = [create_individual() for _ in range(10)]
generations = 20

for gen in range(generations):
    population = sorted(population, key=fitness, reverse=True)
    next_gen = population[:2]  # Elitism
    while len(next_gen) < len(population):
        p1, p2 = random.choices(population[:5], k=2)
        child = mutate(crossover(p1, p2))
        next_gen.append(child)
    population = next_gen

best = max(population, key=fitness)
print("Best individual:", best)
print("Fitness:", fitness(best))