In [10]:
import pandas as pd

# Demand prediction problem
class DemandPrediction:
    N_DEMAND_INDICATORS = 13;
    # Parameters consist of a bias (intercept) for the sum and one weight for
    # each demand indicator.
    N_PARAMETERS = N_DEMAND_INDICATORS + 1;

    # Construct a Demand prediction problem instance.
    # The parameter "dataset_name" specifies which dataset to use. Valid values
    # are "train" or "test".
    def __init__(self,dataset_name):
      #Load the specified dataset
      if(dataset_name == "train"):
        self.__X, self.__y = DemandPrediction.__load_dataset("data/train.csv");
      elif(dataset_name == "test"):
        self.__X, self.__y = DemandPrediction.__load_dataset("data/test.csv");
      else:
        raise Exception("Only permitted arguments for " +
          "DemandPrediction::__init__ are train and test.")

    # Rectangular bounds on the search space.
    # Returns a 2D array b such that b[i][0] is the minimum permissible value
    # of the ith solution component and b[i][1] is the maximum.
    def bounds():
        return [[-100,100] for i in range(DemandPrediction.N_PARAMETERS)]

    # Check whether the function parameters (weights) lie within the
    # problem's feasible region.
    # There should be the correct number of weights for the predictor function.
    # Each weight should lie within the range specified by the bounds.
    def is_valid(self, parameters):
        if(len(parameters) != DemandPrediction.N_PARAMETERS):
          return False
        #All weights lie within the bounds.
        b = DemandPrediction.bounds();
        for i in range(len(b)):
          if(parameters[i] < b[i][0] or parameters[i] > b[i][1] ):
            return False
        return True;

    # Evaluate a set of parameters on the dataset used by the class instance
    # (train/test).
    # @param parameters An containing the bias and weights to be used to
    #   predict demand.
    # @return The mean absolute error of the predictions on the selected
    # dataset.
    def evaluate(self, parameters):
        abs_error = 0.0;
        for (x, y) in zip(self.__X,self.__y):
            #print(list(self.__X.values))
            #print(x)
            #print(y)
            #print('--')
            y_pred = DemandPrediction.__predict(x,parameters);
            abs_error += abs(y-y_pred);
        abs_error /= len(self.__X);
        return abs_error;

    def __load_dataset(filename):
        if "train.csv" in filename:
            df = pd.read_csv(filename)
            df = df.iloc[:, 1:]
        else:
            df = pd.read_csv(filename,header=None)
            # print(df.head())
        y = df.iloc[:,0].values
        X = df.iloc[:,1:].values
        return X, y

    # Predicts demand based on a weighted sum of demand indicators. You may
    # replace this with something more complex, but will likely have to change
    # the form of the parameters array as well.
    def __predict(demand_indicators, parameters):
        prediction = parameters[0];

        for i in range(1, len(demand_indicators)):
            prediction += demand_indicators[i] * parameters[i];

        return prediction;

In [11]:
import numpy as np
#from DemandPrediction_v2 import DemandPrediction

# Define cost function
def cost_function(x):
    training_problem = DemandPrediction("train")
    error = training_problem.evaluate(x)
    return error

def pso(cost_function, bounds, n_particles, n_iterations, c1, c2, w, verbose = True):
    dimensions = len(bounds)
    
    # Initialize particle positions and velocities
    positions = np.array([np.random.uniform(bounds[:, 0], bounds[:, 1], dimensions) for _ in range(n_particles)])
    velocities = np.array([np.random.uniform(-1, 1, dimensions) for _ in range(n_particles)])
    
    # Initialize personal best and global best
    personal_best_positions = np.copy(positions)
    personal_best_costs = np.array([cost_function(p) for p in personal_best_positions])
    global_best_position = personal_best_positions[np.argmin(personal_best_costs)]
    global_best_cost = np.min(personal_best_costs)

    error_list = []
    # PSO main loop
    for t in range(n_iterations):
        for i in range(n_particles):
            # Update particle velocity
            r1, r2 = np.random.rand(2)
            velocities[i] = w * velocities[i] + c1 * r1 * (personal_best_positions[i] - positions[i]) + c2 * r2 * (global_best_position - positions[i])

            # Update particle position
            positions[i] += velocities[i]

            # Update personal best
            current_cost = cost_function(positions[i])
            if current_cost < personal_best_costs[i]:
                personal_best_positions[i] = positions[i]
                personal_best_costs[i] = current_cost

                # Update global best
                if current_cost < global_best_cost:
                    global_best_position = positions[i]
                    global_best_cost = current_cost

        # Optional: print progress
        if verbose:
            print("Iteration {}/{}, Global Best Cost: {}".format(t + 1, n_iterations, global_best_cost))
        error_list.append(global_best_cost)

    return global_best_cost, global_best_position, error_list




In [12]:
import numpy as np
#from DemandPrediction_v2 import DemandPrediction

def cost_function(x):
    training_problem = DemandPrediction("train")
    error = training_problem.evaluate(x)
    return error

def initialize_population(pop_size, bounds):
    dimensions = len(bounds)
    return np.array([np.random.uniform(bounds[:, 0], bounds[:, 1], dimensions) for _ in range(pop_size)])

def selection(population, fitness, num_parents):
    parents = np.empty((num_parents, population.shape[1]))
    for i in range(num_parents):
        min_index = np.argmin(fitness)
        parents[i] = population[min_index]
        fitness[min_index] = float('inf')
    return parents

def crossover(parents, offspring_size):
    offspring = np.empty(offspring_size)
    crossover_point = np.uint32(offspring_size[1] / 2)
    for k in range(offspring_size[0]):
        parent1_index = k % parents.shape[0]
        parent2_index = (k + 1) % parents.shape[0]
        offspring[k, 0:crossover_point] = parents[parent1_index, 0:crossover_point]
        offspring[k, crossover_point:] = parents[parent2_index, crossover_point:]
    return offspring

def mutation(offspring, bounds, mutation_rate):
    for i in range(offspring.shape[0]):
        for j in range(offspring.shape[1]):
            if np.random.rand() < mutation_rate:
                random_value = np.random.uniform(bounds[j, 0], bounds[j, 1])
                offspring[i, j] = random_value
    return offspring

def ga(cost_function, bounds, pop_size, num_generations, mutation_rate, verbose = True):
    # GA parameters
    # pop_size = 100
    # num_generations = 1000
    num_parents = int(pop_size / 2)
    # mutation_rate = 0.5

    # bounds = np.array(DemandPrediction.bounds())

    # Initialize population
    population = initialize_population(pop_size, bounds)

    error_list = []
    # Run GA optimization
    for generation in range(num_generations):
        fitness = np.array([cost_function(individual) for individual in population])
        parents = selection(population, fitness.copy(), num_parents)
        offspring = crossover(parents, offspring_size=(pop_size - num_parents, bounds.shape[0]))
        offspring = mutation(offspring, bounds, mutation_rate)
        population[0:parents.shape[0]] = parents
        population[parents.shape[0]:] = offspring

        # Optional: print progress
        best_individual = population[np.argmin(fitness)]
        best_fitness = np.min(fitness)
        if verbose:
            print("Generation {}/{}, Best Fitness: {}".format(generation + 1, num_generations, best_fitness))
        error_list.append(best_fitness)

    # Get the best solution
    best_solution = population[np.argmin(fitness)]
    best_cost = np.min(fitness)

    print("Best training error after {} generations: {}".format(num_generations, best_cost))

    # Check the MAE of the best solution on the test problem
    test_problem = DemandPrediction("test")
    test_error = test_problem.evaluate(best_solution)
    print("Test error of best solution found while training: {}".format(test_error))
    return best_solution, best_cost, error_list


In [20]:
import numpy as np
import multiprocessing as mp
from functools import partial
#from DemandPrediction_v2 import DemandPrediction
from scipy.stats import f_oneway
#from Main_PSO_v4 import pso
#from Main_Genetic import ga

bounds = np.array(DemandPrediction.bounds())

# Define cost function
def cost_function(x):
    training_problem = DemandPrediction("train")
    error = training_problem.evaluate(x)
    return error

def run_pso(params):
    cost_function, bounds, n_particles, n_iterations, c1, c2, w = params
    best_cost, best_position ,err = pso(cost_function, bounds, n_particles, n_iterations, c1, c2, w, verbose=False)
    test_problem = DemandPrediction("test")
    test_error = test_problem.evaluate(best_position)
    return test_error

def run_genetic(params):
    cost_function, bounds, pop_size, num_generations, mutation_rate = params
    best_solution, best_cost , err = ga(cost_function, bounds, pop_size, num_generations, mutation_rate, verbose=False)
    test_problem = DemandPrediction("test")
    test_error = test_problem.evaluate(best_solution)
    return test_error

def compare_models(num_runs, pso_params, genetic_params):
    #pool = mp.Pool(mp.cpu_count())

    #pso_results = pool.map(run_pso, [pso_params] * num_runs)
    #genetic_results = pool.map(run_genetic, [genetic_params] * num_runs)
    genetic_results=[]    
    pso_results=[]
    for param in range(num_runs):
        pso_results.append(run_pso(pso_params))
    for param in range(num_runs):
        genetic_results.append(run_genetic(genetic_params))
    
    #pool.close()
    #pool.join()

    f_stat, p_value = f_oneway(pso_results, genetic_results)
    
    print("PSO Results: ", pso_results)
    print("Genetic Algorithm Results: ", genetic_results)
    print("ANOVA F Statistic: ", f_stat)
    print("ANOVA P Value: ", p_value)

In [23]:
if __name__ == '__main__':
    #mp.set_start_method('spawn')
    
    #num_runs = 30
    num_runs = 5
    #pso_params = (cost_function, bounds, 100, 1000, 0.7, 0.7, 0.7)
    #genetic_params = (cost_function, bounds, 150, 1500, 0.2)
    
    pso_params = (cost_function, bounds, 100, 2, 0.7, 0.7, 0.7)
    genetic_params = (cost_function, bounds, 150, 2, 0.2)

    compare_models(num_runs, pso_params, genetic_params)


Best training error after 2 generations: 386.1773025277074
Test error of best solution found while training: 6461.819628666617
Best training error after 2 generations: 677.3095807441819
Test error of best solution found while training: 3445.7168319644593
Best training error after 2 generations: 685.6034412532098
Test error of best solution found while training: 8228.152009170208
Best training error after 2 generations: 858.4638311661172
Test error of best solution found while training: 20748.259760565634
Best training error after 2 generations: 800.3790944965068
Test error of best solution found while training: 2936.6133702349516
PSO Results:  [1212.486042358312, 1402.334719363462, 5369.377301168657, 556.9886884262561, 4004.4320489236256]
Genetic Algorithm Results:  [6461.819628666617, 3445.7168319644593, 8228.152009170208, 20748.259760565634, 2936.6133702349516]
ANOVA F Statistic:  3.009910903408915
ANOVA P Value:  0.12097650692546556
