<a href="https://colab.research.google.com/github/masoud-rafiee/PSO_KSP/blob/main/PSO_KSP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PSO - 0/1 Knapsack problem
Source: https://www.programmersought.com/article/18927400398/

In [23]:
import numpy as np
import random
import math
import matplotlib.pyplot as plt
from time import *
import itertools
from scipy.stats import qmc
import seaborn as sns
import pandas as pd #for tables and manage dataset as speradsheet
from sklearn.neighbors import KNeighborsClassifier #to classify diabets (o or 1) using selected features
# CROSS validation: split data into parts and train/test -> for accuracy of features selected
from sklearn.model_selection import cross_val_score#for scaling features / to make features comparable too
from sklearn.preprocessing import StandardScaler#combining classify and scaling together
from sklearn.pipeline import make_pipeline#uploading the dataset

data=pd.read_csv('diabetes.csv')
X=data.drop('Outcome',axis=1) #input as featurres
y=data['Outcome'] #0 or 1
N=20 #num of particles
#num of features
D = X.shape[1]
T=50 #iterations
C1=1.5 #self-best known
C2=1.5 #global best known
V_max=10 #velocity upper bound
V_min=-10 #velocity lower bound
##### PSO FUNCTION  #####
def init_x(n,d):
    #to fiil the population with random 0 or 1
    return [[random.randint(0,1) for _ in range(d)] for _ in range(n)]

def init_v(n,d,V_max,V_min):
    #to set random velocities per feature per particels
    return [[random.random()*(V_max-V_min) + V_min for _ in range(d)] for _ in range(n)]

def fitness(p,n,d,X,y):
    fitVal=[] #storing each particles score
    for i in range (n):
        selected = [j for j in range(d) if p[i][j]==1]
        if len(selected)==0:
            fitVal.append(0) #penalize empty feature set
        else:
            X_selected=X.iloc[:,selected] #to get the datav from chosen features
            #testing and a+scaling accordingly
            classifier = make_pipeline(StandardScaler(), KNeighborsClassifier(n_neighbors=5))
            scores = cross_val_score(classifier, X_selected,y,cv =5) #test subset accuracy
            fitVal.append(scores.mean())
    return fitVal

def update_pbest(p, fitvalue, pbest, px, m):
    """
         Update individual optimal
         :param p: current population
         :param fitvalue: current fitness of each particle
         :param pbest: Individual optimal before update
         :param px: Individual optimal solution before update
         :param m: number of particles
         :return: updated individual optimal value, individual optimal solution
    """
    pb = pbest
    for i in range (m):
        if fitvalue[i] > pbest[i]:
            pbest[i] = fitvalue[i]
            px[i] = p[i]
    return pb, px


def update_gbest (p, pbest, gbest, g, m):
    """
         Update the global optimal solution
         :param p: particle swarm
         :param pbest: Individual fitness (individual best)
         :param gbest: global optimal
         :param g: global optimal solution
         :param m: number of particles
         :return: gbest global optimal, the global optimal solution corresponding to g
    """
    gb=gbest
    for i in range (m):
        if pbest[i] > gb:
            gb = pbest[i]
            g = p[i]
    return gb, g


def update_v(v, x, m, n, pbest, g, c1, c2, vmax, vmin):
    """
         refresh rate
         :param v: speed before update
         :param x: position before update
         :param m: number of particles
         :param n: particle dimension
         :param pbest: Individual optimal solution (two-dimensional list)
         :param g: global optimal solution (one-dimensional list)
         :param c1: acceleration factor
         :param c2: acceleration factor
         :param vmax: maximum speed
         :param vmin: minimum speed
         :return: Two-dimensional list of updated speeds
    """
    for i in range (m):
        a = random.random()
        b = random.random()
        for j in range (n):
            v[i][j]=v[i][j]+c1*a*(pbest[i][j]-x[i][j]) + c2*b*(g[j]-x[i][j])
            if v[i][j] < vmin:
                v[i][j] = vmin
            if v[i][j] > vmax:
                v[i][j] = vmax
    return v


def update_x(x, v, m, n):
    """
         Update x
         :param x: x before update
         :param v: updated v
         :param m: number of particles
         :param n: particle dimension
         :return: updated x
    """
    for i in range (m):
        for j in range(n):
            a = random.random()
            x[i][j] = 1/(1+math.exp(-v[i][j]))
            if x[i][j] > a:
                x[i][j] = 1
            else:
                x[i][j] = 0
    return x

# New Section

In [None]:
#to test different seeds
from time import time
import itertools
#lhs initialization
def lhs_initialization(N, D):
    sampler = qmc.LatinHypercube(d=D)
    lhs_samples = sampler.random(n=N)
    x = [[1 if lhs_samples[i][j] > 0.5 else 0 for j in range(D)] for i in range(N)]
    return x

def pso_feature_selection(seed, initialization_method, N, T, C1, C2, V_max):
    random.seed(seed)
    np.random.seed(seed)

    begin_time=time()
    #particle Initialization
    D = X.shape[1]
    if initialization_method == 'uniform':
        x = init_x(N, D)
    elif initialization_method == 'lhs':
        x = lhs_initialization(N, D)
    v = init_v(N, D, V_max, -V_max) #velocity initialization
    ##############################
    fv = fitness(x, N, D, X, y)
    ##individual best
    pb, px=fv, [particle.copy() for particle in x]
    gb, g= update_gbest(x, pb, 0, [], N) #global best
    item = [gb] #fitness hitstroy
    itemg = [g.copy] #solution history
    #main loop
    for i in range(T):
        fv = fitness(x, N, D, X, y)
        pb, px = update_pbest(x, fv, pb, px, N)
        gb, g = update_gbest(x, pb, gb, g, N)
        item.append(gb)
        itemg.append(g.copy())
        v = update_v(v, x, N, D, px, g, C1, C2, V_max, V_min)
        x = update_x(x, v, N, D)
    end_time = time()
    run_time = end_time - begin_time
    #extracting the indexes of selected features
    selected_features = [i for i, val in enumerate(g) if val == 1]
    #printing results
    print(f"Seed: {seed}")
    print(f"Best Fitness (Accuracy): {gb:.4f}")  # 4 decimals of classification accuracy
    print(f"Selected Features (INDEXES): {selected_features}")
    print(f"Runtime: {run_time:.2f} seconds")  # Runtime in seconds (2 decimals)
    #plotting the fitness history
    plt.plot(item)
    plt.xlabel("Iteration")
    plt.ylabel("Accuracy")
    plt.title(f"PSO Feature Selection (Seed {seed})")
    plt.show()
    return gb, g, run_time


#hyperparameter tuning
hyperparameter_grid = {
    'N': [10, 20, 30],
    'T': [30, 50, 70],
    'C1': [1.0, 1.5, 2.0],
    'C2': [1.0, 1.5, 2.0],
    'V_max': [5, 10, 15],
    'initialization_method': ['uniform', 'lhs']
}

keys = hyperparameter_grid.keys()
combinations = [dict(zip(keys, values)) for values in itertools.product(*hyperparameter_grid.values())]

results = []
for params in combinations:
    for seed in range(5):
        gb, g, run_time = pso_feature_selection(seed, **params)
        results.append({
            'seed': seed,
            **params,
            'accuracy': gb,
            'runtime': run_time,
            'selected_features': g  # Added to track features
        })

results_df = pd.DataFrame(results)

# Analysis
grouped_results = results_df.groupby(['N', 'T', 'C1', 'C2', 'V_max', 'initialization_method']).agg({
    'accuracy': 'mean',
    'runtime': 'mean'
}).reset_index()

best_params = grouped_results.loc[grouped_results['accuracy'].idxmax()]
print("Best Hyperparameters:")
print(best_params)

# Visualization
sns.lineplot(data=results_df, x='N', y='accuracy', hue='initialization_method')
plt.title("Accuracy vs. Number of Particles")
plt.show()

In [None]:
#defingin hyperparameters and initialization method
N = 20        #Number of particles
T = 50        #Number of iterations
C1 = 1.5      #Cognitive coefficient
C2 = 1.5      #Social coefficient
V_max = 10    #Maximum velocity
initialization_method = 'uniform'
for seed in range(3):  #testing with 3 seed (0-1-2 times)
    print(f"\nRunning PSO with seed {seed}")
    pso_feature_selection(seed, initialization_method, N, T, C1, C2, V_max)