In [18]:
from utils import *
import pygad

In [19]:
X = load_data("dataset.csv")
X = normalize_data(X)

In [20]:
import numpy as np
from utils import *

class DESWSA:

    def __init__(self, X, num_elephants, p, t_max, w_min, w_max, num_clusters):
        self.num_elephants = num_elephants
        self.p = p
        self.X = X
        self.t_max = t_max
        self.w_min = w_min
        self.w_max = w_max
        self.num_clusters = num_clusters
        self.num_clusters_max = 10
        self.best_global_fitness = -np.inf
        self.best_global_solution = None
        self.best_local_fitness = np.full(num_elephants, -np.inf)
        self.best_local_solution = [None] * num_elephants

    def initialize_positions(self, num_clusters):
        positions = np.zeros((self.num_elephants, 2, self.num_clusters_max))
        for i in range(self.num_elephants):
            ids = np.random.choice(self.X.shape[0], size=self.num_clusters_max, replace=False)
            positions[i, 0, :self.num_clusters_max] = ids
            ones_ids = np.random.choice(self.num_clusters_max, size=num_clusters, replace=False)
            positions[i, 1, ones_ids] = 1
        return positions
    
    def initialize_velocities(self):
        velocities = np.zeros((1,self.num_elephants))
        for i in range(self.num_elephants):
           velocities[0, i] = np.random.randint(1, self.num_clusters_max) 
        return velocities
    
    def update_position(self, position, velocity, num_clusters, num_elephant):
        indices = np.arange(num_clusters)
        for _ in range(round(velocity[0, num_elephant])):
            id1, id2 = np.random.choice(indices, size=2, replace=False)
            position[num_elephant, 1, id1], position[num_elephant, 1, id2] = position[num_elephant, 1, id2], position[num_elephant, 1, id1]
        return position
    
    def update_velocity(self, velocity, position, local_best_position, global_best_position, p,i):
        w = self.w_max - ((self.w_max - self.w_min) / self.t_max)
        rand=np.random.uniform(1, self.num_clusters_max)
        if rand > p:
                velocity[0][i] = velocity[0][i] * w +  rand * np.linalg.norm(global_best_position-position[i][1])%self.num_clusters_max
        else:
                velocity[0][i] = velocity[0][i] * w  + rand * np.linalg.norm(local_best_position[i][1]-position[i][1])%self.num_clusters_max
        return velocity
    
    def create_clusters(self, centroids, distance_method):
        distances = {}
        for data_point in self.X:
            min_distance = float('inf')  
            closest_centroid = None
            for centroid in centroids:
                dist = distance_method(centroid, data_point)
                if dist < min_distance:
                    min_distance = dist
                    closest_centroid = tuple(centroid)  
            if closest_centroid in distances:
                distances[closest_centroid].append(data_point)
            else:
                distances[closest_centroid] = [data_point]  
        return distances
    
    def calculate_fitness (self, clusters, distance_method):
        dist =0
        for centroid in clusters.keys():
            for datapoint in clusters[centroid]:
                dist += distance_method(centroid, datapoint)
        return 1/dist
    
    def get_centroids(self, position):
        ids = np.where(position[1] == 1)[0]
        ids_centroids = position[0, ids]
        centroids = self.X[ids_centroids.astype(int), :]
        return centroids
    
    def fitness (self, position, distance_method):
        centroids = self.get_centroids(position)
        clusters = self.create_clusters(centroids, distance_method) 
        fitness_score = self.calculate_fitness(clusters, distance_method)
        return fitness_score
    
    def run(self):
        best_solutions = {}
        for i in range(self.num_elephants):
                positions = self.initialize_positions(self.num_clusters)
                velocities = self.initialize_velocities()
                fitness = self.fitness(positions[i],distance_manhattan)
                self.best_local_fitness[i] = fitness
                self.best_local_solution[i] = positions[i].copy()
                if fitness > self.best_global_fitness:
                   self.best_global_fitness = fitness
                   self.best_global_solution = positions[i].copy()
        for _ in range(self.t_max):
                for i in range(self.num_elephants):
                    velocities = self.update_velocity(velocities, positions, self.best_local_solution, self.best_global_solution, self.p,i)
                    positions = self.update_position(positions, velocities, self.num_clusters, i)
                    fitness = self.fitness(positions[i],distance_manhattan)
                    if fitness > self.best_local_fitness[i]:
                        self.best_local_fitness[i] = fitness
                        self.best_local_solution[i] = positions[i].copy()
                    if fitness > self.best_global_fitness:
                        self.best_global_fitness = fitness
                        self.best_global_solution = positions[i].copy()
        best_solutions[self.num_clusters] = (self.best_global_solution, self.best_global_fitness)
        return best_solutions

In [21]:
def fitness_func(ga_instance, solution, solution_idx):
        num_elephants = round(solution[0])
        if num_elephants < 0:
            num_elephants = -num_elephants
        if num_elephants < 2:
           num_elephants = 2
        elif num_elephants > 5:
           num_elephants = 5

        p = round(solution[1], 1)
        if p < 0:
            p = 0
        if p > 1:
           p = 1

        t_max = round(solution[2])
        if t_max < 0:
            t_max = -t_max
        if t_max < 2:
           t_max = 2
        elif t_max > 5:
           t_max = 5

        w_min = round(solution[3], 1)
        if w_min < 0:
            w_min = 0
        if w_min > 1:
           w_min = 1

        w_max = round(solution[4], 1)
        if w_max < 0:
            w_max = 0
        if w_max > 1:
           w_max = 1

        if w_max < w_min:
           w_min, w_max = w_max, w_min

        num_clusters = int(round(solution[5]))
        if num_clusters < 0:
            num_clusters = -num_clusters
        if num_clusters < 2:
           num_clusters = 2
        elif num_clusters > 5:
           num_clusters = 5

        deswsa = DESWSA(X, num_elephants, p, t_max, w_min, w_max, num_clusters)
        best_solutions = deswsa.run()
        best_fitness = best_solutions[num_clusters][1]
        return best_fitness

def transform_solution(solution):
    solution[0] = round(solution[0])
    if solution[0] < 0:
        solution[0] = -solution[0]
    if solution[0] < 2:
        solution[0] = 2
    elif solution[0] > 5:
        solution[0] = 5

    solution[1] = round(solution[1], 1)
    if solution[1] < 0:
        solution[1] = 0
    if solution[1] > 1:
        solution[1] = 1

    solution[2] = round(solution[2])
    if solution[2] < 0:
        solution[2] = -solution[2]
    if solution[2] < 2:
        solution[2] = 2
    elif solution[2] > 5:
        solution[2] = 5

    solution[3] = round(solution[3], 1)
    if solution[3] < 0:
        solution[3] = 0
    if solution[3] > 1:
        solution[3] = 1

    solution[4] = round(solution[4], 1)
    if solution[4] < 0:
        solution[4] = 0
    if solution[4] > 1:
        solution[4] = 1

    if solution[4] < solution[3]:
        solution[3], solution[4] = solution[4], solution[3]

    solution[5] = int(round(solution[5]))
    if solution[5] < 0:
        solution[5] = -solution[5]
    if solution[5] < 2:
        solution[5] = 2
    elif solution[5] > 5:
        solution[5] = 5

    return solution

In [22]:
ga_instance = pygad.GA(num_generations=20,
                       sol_per_pop=10,
                       num_parents_mating=5,
                       keep_parents=2,
                       num_genes=6,
                       fitness_func=fitness_func,
                       suppress_warnings=True,
                       init_range_low=[2, 0, 2, 0, 0, 2],
                       init_range_high=[5, 1, 5, 1, 1, 5])  

ga_instance.run()

best_solution, best_solution_fitness, best_solution_idx = ga_instance.best_solution()
best_solution = transform_solution(best_solution)
print("Best solution is {bs}".format(bs=best_solution))
print("Fitness of the best solution is {bsf}".format(bsf=best_solution_fitness))
print("Best solution found after {gen} generations".format(gen=ga_instance.best_solution_generation))

Best solution is [4.  0.3 5.  0.6 0.8 5. ]
Fitness of the best solution is 0.005615061926050703
Best solution found after 1 generations
