In [4]:
import numpy as np
import random
from collections import defaultdict

# Monkeypatch
import math
np.math = math


class QLearningAssistedPSO:
    def __init__(self, dim, pop_size=30, max_iter=100):
        self.dim = dim
        self.pop_size = pop_size
        self.max_iter = max_iter
        
        # PSO parameters
        self.w = 0.9  # inertia weight
        self.c1 = 2.0  # cognitive parameter
        self.c2 = 2.0  # social parameter
        
        # Q-learning parameters
        self.alpha = 0.1  # learning rate
        self.gamma = 0.9  # discount factor
        self.epsilon = 0.1  # exploration rate
        
        # Q-table for state-action pairs
        self.q_table = defaultdict(lambda: defaultdict(float))
        
        # Define local search operations
        self.local_search_operations = [
            'gaussian_mutation',
            'levy_flight',
            'cauchy_mutation',
            'uniform_crossover',
            'differential_mutation',
            'opposition_based_learning'
        ]
        
        # Initialize particles
        self.particles = []
        self.velocities = []
        self.personal_best = []
        self.personal_best_fitness = []
        self.global_best = None
        self.global_best_fitness = float('inf')
        
    def initialize_population(self, bounds):
        """Initialize PSO population"""
        for i in range(self.pop_size):
            particle = np.random.uniform(bounds[0], bounds[1], self.dim)
            velocity = np.random.uniform(-1, 1, self.dim)
            
            self.particles.append(particle)
            self.velocities.append(velocity)
            self.personal_best.append(particle.copy())
            self.personal_best_fitness.append(float('inf'))
    
    def get_state(self, iteration, diversity, improvement_rate):
        """Define state representation for Q-learning"""
        # Discretize continuous values into states
        iter_state = "early" if iteration < self.max_iter * 0.3 else \
                    "middle" if iteration < self.max_iter * 0.7 else "late"
        
        div_state = "high" if diversity > 0.5 else \
                   "medium" if diversity > 0.2 else "low"
        
        imp_state = "good" if improvement_rate > 0.1 else \
                   "moderate" if improvement_rate > 0.01 else "poor"
        
        return f"{iter_state}_{div_state}_{imp_state}"
    
    def calculate_diversity(self):
        """Calculate population diversity"""
        if len(self.particles) < 2:
            return 0.0
        
        center = np.mean(self.particles, axis=0)
        distances = [np.linalg.norm(p - center) for p in self.particles]
        return np.mean(distances) / np.sqrt(self.dim)
    
    def select_local_search_operation(self, state):
        """Q-learning based operation selection"""
        if random.random() < self.epsilon:
            # Exploration: random selection
            return random.choice(self.local_search_operations)
        else:
            # Exploitation: select best action based on Q-values
            q_values = self.q_table[state]
            if not q_values:
                return random.choice(self.local_search_operations)
            return max(q_values.keys(), key=lambda k: q_values[k])
    
    def apply_local_search_operation(self, particle, operation):
        """Apply selected local search operation"""
        new_particle = particle.copy()
        
        if operation == 'gaussian_mutation':
            noise = np.random.normal(0, 0.1, self.dim)
            new_particle += noise
            
        elif operation == 'levy_flight':
            beta = 1.5
            sigma = (np.math.gamma(1 + beta) * np.sin(np.pi * beta / 2) / 
                    (np.math.gamma((1 + beta) / 2) * beta * (2 ** ((beta - 1) / 2)))) ** (1 / beta)
            u = np.random.normal(0, sigma, self.dim)
            v = np.random.normal(0, 1, self.dim)
            levy = u / (np.abs(v) ** (1 / beta))
            new_particle += 0.01 * levy
            
        elif operation == 'cauchy_mutation':
            cauchy_noise = np.random.standard_cauchy(self.dim) * 0.1
            new_particle += cauchy_noise
            
        elif operation == 'uniform_crossover':
            if len(self.particles) > 1:
                partner = random.choice(self.particles)
                mask = np.random.random(self.dim) < 0.5
                new_particle = np.where(mask, particle, partner)
                
        elif operation == 'differential_mutation':
            if len(self.particles) >= 3:
                indices = random.sample(range(len(self.particles)), 3)
                r1, r2, r3 = [self.particles[i] for i in indices]
                F = 0.5
                new_particle = r1 + F * (r2 - r3)
                
        elif operation == 'opposition_based_learning':
            # Assuming bounds are [-10, 10] for simplicity
            bounds_low, bounds_high = -10, 10
            new_particle = bounds_low + bounds_high - particle
        
        return new_particle
    
    def update_q_table(self, state, action, reward, next_state):
        """Update Q-table using Q-learning update rule"""
        current_q = self.q_table[state][action]
        
        # Find maximum Q-value for next state
        next_q_values = self.q_table[next_state]
        max_next_q = max(next_q_values.values()) if next_q_values else 0
        
        # Q-learning update
        new_q = current_q + self.alpha * (reward + self.gamma * max_next_q - current_q)
        self.q_table[state][action] = new_q
    
    def calculate_reward(self, old_fitness, new_fitness, diversity_change):
        """Calculate reward for Q-learning"""
        fitness_improvement = old_fitness - new_fitness
        
        # Reward based on fitness improvement
        if fitness_improvement > 0:
            reward = 1.0 + fitness_improvement * 10  # Scale improvement
        else:
            reward = -0.1  # Small penalty for no improvement
        
        # Additional reward for maintaining diversity
        if diversity_change > 0:
            reward += 0.2
        
        return reward
    
    def optimize(self, objective_function, bounds, verbose=True):
        """Main optimization loop with Q-learning assistance"""
        self.initialize_population(bounds)
        
        # Evaluate initial population
        for i, particle in enumerate(self.particles):
            fitness = objective_function(particle)
            self.personal_best_fitness[i] = fitness
            
            if fitness < self.global_best_fitness:
                self.global_best_fitness = fitness
                self.global_best = particle.copy()
        
        prev_best_fitness = self.global_best_fitness
        prev_diversity = self.calculate_diversity()
        
        for iteration in range(self.max_iter):
            current_diversity = self.calculate_diversity()
            improvement_rate = (prev_best_fitness - self.global_best_fitness) / max(abs(prev_best_fitness), 1e-10)
            
            # Get current state for Q-learning
            current_state = self.get_state(iteration, current_diversity, improvement_rate)
            
            for i in range(self.pop_size):
                # Standard PSO velocity and position update
                r1, r2 = random.random(), random.random()
                
                self.velocities[i] = (self.w * self.velocities[i] + 
                                    self.c1 * r1 * (self.personal_best[i] - self.particles[i]) +
                                    self.c2 * r2 * (self.global_best - self.particles[i]))
                
                self.particles[i] += self.velocities[i]
                
                # Apply bounds
                self.particles[i] = np.clip(self.particles[i], bounds[0], bounds[1])
                
                # Q-learning assisted local search operation selection
                selected_operation = self.select_local_search_operation(current_state)
                
                # Apply local search operation
                old_fitness = objective_function(self.particles[i])
                enhanced_particle = self.apply_local_search_operation(self.particles[i], selected_operation)
                enhanced_particle = np.clip(enhanced_particle, bounds[0], bounds[1])
                
                new_fitness = objective_function(enhanced_particle)
                
                # Accept improvement
                if new_fitness < old_fitness:
                    self.particles[i] = enhanced_particle
                    fitness = new_fitness
                else:
                    fitness = old_fitness
                
                # Update personal best
                if fitness < self.personal_best_fitness[i]:
                    self.personal_best_fitness[i] = fitness
                    self.personal_best[i] = self.particles[i].copy()
                
                # Update global best
                if fitness < self.global_best_fitness:
                    self.global_best_fitness = fitness
                    self.global_best = self.particles[i].copy()
                
                # Calculate reward and update Q-table
                diversity_change = current_diversity - prev_diversity
                reward = self.calculate_reward(old_fitness, new_fitness, diversity_change)
                
                next_diversity = self.calculate_diversity()
                next_improvement_rate = (self.global_best_fitness - new_fitness) / max(abs(self.global_best_fitness), 1e-10)
                next_state = self.get_state(iteration + 1, next_diversity, next_improvement_rate)
                
                self.update_q_table(current_state, selected_operation, reward, next_state)
            
            # Update tracking variables
            prev_best_fitness = self.global_best_fitness
            prev_diversity = current_diversity
            
            if verbose and iteration % 20 == 0:
                print(f"Iteration {iteration}: Best fitness = {self.global_best_fitness:.6f}")
        
        return self.global_best, self.global_best_fitness

# Example usage
def sphere_function(x):
    """Simple sphere function for testing"""
    return np.sum(x**2)

def rastrigin_function(x):
    """Rastrigin function - multimodal test function"""
    A = 10
    n = len(x)
    return A * n + np.sum(x**2 - A * np.cos(2 * np.pi * x))

# Test the Q-learning assisted PSO
if __name__ == "__main__":
    # Initialize optimizer
    ql_pso = QLearningAssistedPSO(dim=10, pop_size=30, max_iter=200)
    
    # Optimize sphere function
    print("Optimizing Sphere Function:")
    best_solution, best_fitness = ql_pso.optimize(
        sphere_function, 
        bounds=(-10, 10), 
        verbose=True
    )
    
    print(f"\nFinal Results:")
    print(f"Best solution: {best_solution}")
    print(f"Best fitness: {best_fitness}")
    
    # Display learned Q-table insights
    print(f"\nLearned Q-table has {len(ql_pso.q_table)} states")
    for state, actions in list(ql_pso.q_table.items())[:5]:
        print(f"State {state}:")
        for action, q_value in actions.items():
            print(f"  {action}: {q_value:.4f}")


Optimizing Sphere Function:
Iteration 0: Best fitness = 22.744421
Iteration 20: Best fitness = 17.883321
Iteration 40: Best fitness = 15.446013
Iteration 60: Best fitness = 15.446013
Iteration 80: Best fitness = 14.456458
Iteration 100: Best fitness = 14.456458
Iteration 120: Best fitness = 14.456458
Iteration 140: Best fitness = 11.804027
Iteration 160: Best fitness = 10.579282
Iteration 180: Best fitness = 10.579282

Final Results:
Best solution: [ 0.16943576 -1.87804194  0.96189846  1.01658864 -0.29824828  0.96479517
  0.24910888 -0.20657894 -0.43963382  0.82075816]
Best fitness: 7.505884817151466

Learned Q-table has 3 states
State early_high_poor:
  uniform_crossover: 5902.8241
  cauchy_mutation: 6435.3949
  gaussian_mutation: 5967.6910
  differential_mutation: 6464.7960
  levy_flight: 5814.1263
  opposition_based_learning: 6313.9384
State middle_high_poor:
  uniform_crossover: 11470.2163
  differential_mutation: 11877.0376
  opposition_based_learning: 11138.1915
  gaussian_mutati