<a href="https://colab.research.google.com/github/vothane/whistling-birds/blob/master/whistling_birds.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Replacing back-propogation with PSO (Particle Swarm Optimization)

In [0]:
from keras import backend as K
from __future__ import print_function, division
import numpy as np
import copy

class ParticleSwarmOptimizedNN():
  def __init__(self, population_size, model_builder, inertia_weight=0.8, cognitive_weight=2, social_weight=2, max_velocity=20):
    self.population_size = population_size
    self.model_builder = model_builder
    self.best_individual = None
    # Parameters used to update velocity
    self.cognitive_w = cognitive_weight
    self.inertia_w = inertia_weight
    self.social_w = social_weight
    self.min_v = -max_velocity
    self.max_v = max_velocity

  def _build_model(self, id):
    """ Returns a new particle"""
    class ParticleSwarmOptimizer: 
      pass
    pso = ParticleSwarmOptimizer() 
    pso.model = self.model_builder(n_inputs=self.X.shape[1], n_outputs=self.y.shape[1])
    pso.id = id
    pso.fitness = 0
    pso.highest_fitness = 0
    pso.accuracy = 0
    # Set intial best as the current initialization
    pso.best_layers = copy.copy(pso.model.layers)

    # Set initial velocity to zero
    pso.velocity = []
    for layer in pso.model.layers:
      velocity = {"W": 0, "w0": 0}
      weights = layer.get_weights()[0]
      biases = layer.get_weights()[1]
      velocity = {"W": np.zeros_like(weights), "w0": np.zeros_like(biases)}
      pso.velocity.append(velocity)

    return pso

  def _initialize_population(self):
    """ Initialization of the neural networks forming the population"""
    self.population = []
    for i in range(self.population_size):
      model = self._build_model(id=i)
      self.population.append(model)
 
  def _update_weights(self, individual):
    """ Calculate the new velocity and update weights for each layer """
    # Two random parameters used to update the velocity
    r1 = np.random.uniform()
    r2 = np.random.uniform()

    for i, layer in enumerate(individual.model.layers):
      # Layer weights velocity
      weights = layer.get_weights()[0]
      biases = layer.get_weights()[1]
             
      first_term_W = self.inertia_w * individual.velocity[i]["W"]
      second_term_W = self.cognitive_w * r1 * (individual.best_layers[i].get_weights()[0] - weights)
      third_term_W = self.social_w * r2 * (self.best_individual.model.layers[i].get_weights()[0] - weights)
      new_velocity = first_term_W + second_term_W + third_term_W
      individual.velocity[i]["W"] = np.clip(new_velocity, self.min_v, self.max_v)

      # Bias weight velocity
      first_term_w0 = self.inertia_w * individual.velocity[i]["w0"]
      second_term_w0 = self.cognitive_w * r1 * (individual.best_layers[i].get_weights()[1] - biases)
      third_term_w0 = self.social_w * r2 * (self.best_individual.model.layers[i].get_weights()[1] - biases)
      new_velocity = first_term_w0 + second_term_w0 + third_term_w0
      individual.velocity[i]["w0"] = np.clip(new_velocity, self.min_v, self.max_v)

      # Update layer weights with velocity
      weights += individual.velocity[i]["W"]
      K.set_value(layer.weights[0], weights)
      biases += individual.velocity[i]["w0"]
      K.set_value(layer.weights[1], biases)
        
  def _calculate_fitness(self, individual):
    """ Evaluate the individual on the test set to get fitness scores """
    loss, acc = individual.model.test_on_batch(self.X, self.y)
    individual.fitness = 1 / (loss + 1e-8)
    individual.accuracy = acc

  def optimize(self, X, y, n_generations):
    """ Will evolve the population for n_generations based on dataset X and labels y"""
    self.X, self.y = X, y
    self._initialize_population()

    # The best individual of the population is initialized as population's first ind.
    self.best_individual = copy.copy(self.population[0])

    for epoch in range(n_generations):
      for individual in self.population:
        # Calculate new velocity and update the NN weights
        self._update_weights(individual)
        # Calculate the fitness of the updated individual
        self._calculate_fitness(individual)

        # If the current fitness is higher than the individual's previous highest
        # => update the individual's best layer setup
        if individual.fitness > individual.highest_fitness:
          individual.best_layers = copy.copy(individual.model.layers)
          individual.highest_fitness = individual.fitness

        # If the individual's fitness is higher than the highest recorded fitness for the
        # whole population => update the best individual
        if individual.fitness > self.best_individual.fitness:
          self.best_individual = copy.copy(individual)

      print ("[%d Best Individual - ID: %d Fitness: %.5f, Accuracy: %.1f%%]" % 
             (epoch,
              self.best_individual.id,
              self.best_individual.fitness,
              100*float(self.best_individual.accuracy)))
      
    return self.best_individual

In [0]:
import keras
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam

import numpy as np
from sklearn import datasets
from sklearn import preprocessing
from sklearn.model_selection import train_test_split

from __future__ import print_function
import matplotlib.pyplot as plt

def main():
  iris = datasets.load_iris()
  X = iris['data']
  y = iris['target']
  names = iris['target_names']
  feature_names = iris['feature_names']

  # One hot encoding
  enc = preprocessing.OneHotEncoder()
  Y = enc.fit_transform(y[:, np.newaxis]).toarray()

  # Scale data to have mean 0 and variance 1 
  # which is importance for convergence of the neural network
  scaler = preprocessing.StandardScaler()
  X_scaled = scaler.fit_transform(X)
  X_train, X_test, y_train, y_test = train_test_split(X_scaled, Y, test_size=0.5, random_state=2)

  n_features = X.shape[1]
  n_classes = Y.shape[1]

  # Model builder
  def model_builder(n_inputs, n_outputs): 
    model = Sequential()
    model.add(Dense(10, input_shape=(4,), activation='relu'))
    model.add(Dense(10, activation='relu'))
    model.add(Dense(3, activation='softmax'))

    # Adam optimizer with learning rate of 0.001
    optimizer = Adam(lr=0.001)
    model.compile(optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
    return model

  # Print the model summary of a individual in the population
  print ("")
  model_builder(n_inputs=n_features, n_outputs=n_classes).summary()

  population_size = 100
  n_generations = 10

  inertia_weight = 0.8
  cognitive_weight = 0.8
  social_weight = 0.8

  print ("Population Size: %d" % population_size)
  print ("Generations: %d" % n_generations)
  print ("")
  print ("Inertia Weight: %.2f" % inertia_weight)
  print ("Cognitive Weight: %.2f" % cognitive_weight)
  print ("Social Weight: %.2f" % social_weight)
  print ("")

  pso = ParticleSwarmOptimizedNN(
          population_size=population_size, 
          inertia_weight=inertia_weight,
          cognitive_weight=cognitive_weight,
          social_weight=social_weight,
          max_velocity=5,
          model_builder=model_builder)
    
  pso = pso.optimize(X_train, y_train, n_generations=n_generations)

  loss, accuracy = pso.model.test_on_batch(X_test, y_test)

  print ("Accuracy: %.1f%%" % float(100*accuracy))

  # Reduce dimension to 2D using PCA and plot the results
  y_pred = np.argmax(pso.model.predict(X_test), axis=1)
  #Plot().plot_in_2d(X_test, y_pred, title="Particle Swarm Optimized Neural Network", accuracy=accuracy, legend_labels=range(y.shape[1]))


if __name__ == "__main__":
  main()

In case you used a LabelEncoder before this OneHotEncoder to convert the categories to integers, then you can now use the OneHotEncoder directly.



Model: "sequential_312"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_934 (Dense)            (None, 10)                50        
_________________________________________________________________
dense_935 (Dense)            (None, 10)                110       
_________________________________________________________________
dense_936 (Dense)            (None, 3)                 33        
Total params: 193
Trainable params: 193
Non-trainable params: 0
_________________________________________________________________
Population Size: 100
Generations: 10

Inertia Weight: 0.80
Cognitive Weight: 0.80
Social Weight: 0.80

[0 Best Individual - ID: 99 Fitness: 1.03064, Accuracy: 58.7%]
[1 Best Individual - ID: 91 Fitness: 1.19143, Accuracy: 53.3%]
[2 Best Individual - ID: 76 Fitness: 1.25379, Accuracy: 62.7%]
[3 Best Individual - ID: 3 Fitness: 1.50093, Accuracy: 62.7%]
[4 Best Individual - ID: 62 F

# Summary
The best individual in generation 6 got an accuracy score of __82%__ which is pretty good. The whole process takes a while since we are using just the CPU.

In [1]:
import random
import functools

def search(epochs, search_space, num_bees, num_sites, best_sites, patch_size, elites, workers):
    best = None
    population = []
    for _ in range(num_bees):
        population.append(create_random_bee(search_space))

    for epoch in range(epochs):
        for bee in population:
            bee.update({'fitness': objective_fn(bee['vector'])})
        population.sort(key=lambda bee: bee['fitness'])
        candidate = population[0]
        if best is None or candidate['fitness'] < best['fitness']:
            best = population[0]

        next_gen = []
        for i, parent in enumerate(population[:num_sites]):
            neighbor_size = elites if i < best_sites else workers
            next_gen.append(search_neighbor(parent, neighbor_size, patch_size, search_space))

        scouts = create_scout_bees(search_space, (num_bees - num_sites))
        population = next_gen + scouts
        patch_size = patch_size * 0.98

    return best


def objective_fn(X):
    objective_value = functools.reduce(lambda sum, x: sum + (x ** 2.0), X)
    return objective_value

def create_random_bee(search_space):
    return {'vector': random_vector(search_space)}

def random_vector(minmax):
    return [minmax[i][0] + ((minmax[i][1] - minmax[i][0]) * random.random()) for i in range(len(minmax))]

def create_scout_bees(search_space, num_scouts):
    return [create_random_bee(search_space) for i in range(num_scouts)]

def search_neighbor(parent, neighbor_size, patch_size, search_space):
    neighbors = []

    for _ in range(neighbor_size):
        neighbors.append(create_neighbor_bee(parent['vector'], patch_size, search_space))

    for bee in neighbors:
        bee.update({'fitness': objective_fn(bee['vector'])})

    neighbors.sort(key=lambda bee: bee['fitness'])
    return neighbors.pop()

def create_neighbor_bee(space, patch_size, search_space):
    vectors = []
    for i, vec in enumerate(space):
        vec = (vec + random.random() * patch_size) if random.random() < 0.5 else (vec - random.random() * patch_size)
        vec = search_space[i][0] if vec < search_space[i][0] else search_space[i][1]
        vectors.append(vec)

    bee = {}
    bee['vector'] = vectors
    return bee

if __name__ == "__main__":
    problem_size = 3
    search_space = [[0, 1] for _ in range(problem_size)]

    epochs = 500
    num_bees = 45
    num_sites = 3
    best_sites = 1
    patch_size = 3.0
    elites = 7
    workers = 2
    best_bee = search(epochs, search_space, num_bees, num_sites, best_sites, patch_size, elites, workers)
    print(best_bee)

{'vector': [0, 0, 0], 'fitness': 0.0}


The best bee should always be with a fitness value of 0 since we optiminizing (minimizing) the function *sum + (x ** 2.0)*. Later we will train weights on a artificial neural network as we did with PSO.


>.    
>.  
>.  
>.  
>.  
>.  
>.  
>.  

> __them__: *Knock Knock*  
> __me__: *\"Who's there\?"*  
> __them__: *\"Couple of Ruby dudes checking out your shitty code and negatively criticizing it. Because we're experts on everything. EVERYTHING\"*  
> __me__: 

![](https://media.giphy.com/media/j6qnuNv4HoxrlUcjSR/giphy-downsized.gif)



# ABC Artificial Bee Colony