## GA Class Allowing for Standard, EIGA, RIGA, HIGA and Step by Step Simulation

In [8]:
import time 
import random
import numpy as np
import itertools
import networkx as nx
from statistics import mean
import matplotlib.pyplot as plt
import heapq

In [109]:
class GA:
    # CLASS WIDE PROPERTIES 
    # Reproduction rate (half of population used as parents)
    R_RATE = 0.5 
    
    # ***
    # Constructor 
    def __init__(self, ITERATIONS = 30, THRESHOLD = 12, 
                    P_SIZE = 100, M_RATE = 0.03):
        self.ITERATIONS = ITERATIONS
        self.THRESHOLD = THRESHOLD 
        self.P_SIZE = P_SIZE
        self.M_RATE = M_RATE
        
        # Flag which immigrant scheme to use 
        self.flag_riga = False
        self.flag_eiga = False
        
        # Immigrant scheme variables 
        self.RI = 0.1   # Proportion of population replaced with Random Immigrants
        self.EI = 0.1   # Proportion of population replaced with Elitist Immigrants 
        self.IM = 0.8   # Probability to mutate elitist immigrant 
        
        # Graph
        self.G = None 
        self.src = 0 
        self.dst = 0
        
        # Generation Variables 
        self.reset_generation() 
    
    
    # ***
    # Setter methods for RIGA/EIGA/HIGA configuration 
    def set_riga(self):
        self.flag_riga, self.flag_eiga = True, False 
    
    def set_eiga(self):
        self.flag_riga, self.flag_eiga = False, True 
        
    def set_higa(self):
        self.flag_eiga, self.flag_riga = True, True 
    
    def set_standard(self):
        self.flag_eiga, self.flag_riga = False, False 
    
    
    # ***
    def path_length(self, p):
        return sum(self.G[i][j]['weight'] for i, j in zip(p, p[1:]))
    
    # ***
    # Return fitness given path and graph 
    def fit(self, p):
        # Zero fitness for invalid path 
        if not nx.is_path(self.G, p):
            return 0

        # Fitness for valid path 
        return self.path_length(p) ** (-1)
    
    
    # ***
    # Function to define a single new random path from node s to node r
    # in a network supplied as a Networkx graph 
    # G: Graph / network as networkx
    # s: Start node 
    # r: Destination node 
    def gen_path(self, s, r):
        path = [s]

        current = s
        while current != r: 
            # Neighbours exluding nodes in path 
            neighbours = [n for n in self.G[current] if n not in path]

            # Sometimes, neighbours is empty 
            # In which case, retry 
            if not neighbours:
                return self.gen_path(s, r)

            # Select neighbour node with uniform probability  
            nxt = np.random.choice(neighbours) 

            # List and visit next node 
            path.append(nxt)
            current = nxt 

        return path
    
    
    # ***
    # Function to generate a population of n random paths s->r
    # G: Graph as networkx
    # s: Start node
    # r: Destination node
    # n: Number of paths / size of population 
    def gen_population(self, size):
        population = []
        for i in range(size):
            # Generate a new candidate path
            candidate = self.gen_path(self.src, self.dst)
            # Generate a new candidate 
            population.append(candidate)
        
        return population 
    
    
    # ***
    # Perform crossover between two paths s->r
    # The two paths must have at least one common element 
    # From the set of common elements, one is selected, v 
    # The paths are redifined as s -> v, v -> r  
    # The path segments are recombined to create two children
    # a: The first path
    # b: The second path 
    def crossover(self, a, b):
        common = [] 
        for ind_a, i in enumerate(a):
            for ind_b, j in enumerate(b):
                if i == j:
                    common.append((ind_a, ind_b))

        # Randomly select a one common element to exchange
        target = np.random.choice(len(common))

        # Retrieve indexes
        ind_a, ind_b = common[target]

        # Create new arrays
        chd_a = a[:ind_a] + b[ind_b:]
        chd_b = b[:ind_b] + a[ind_a:]

        return chd_a, chd_b
    
    
    # ***
    def mutate(self, path):
        # Select a random element in path, v
        ind = np.random.choice(len(path) - 1)

        # Create a new random path from v to destination 
        v = path[ind]
        rnd_path = self.gen_path(v, self.dst)

        return path[:ind] + rnd_path
    
    
    # ***
    # Compute the fitness value of each hypotheses 
    # p: Population
    # g: Graph 
    # Return: fitness list, maximum fitness value, sum of fitness values 
    def evaluate(self, p):
        # Evaluate the fitness of the population 
        fitness = [self.fit(h) for h in p]

        return fitness, max(fitness), sum(fitness), mean(fitness)
    
    
    # ***
    # Stochastic universal selection 
    #     - Uses a single random value to sample all solutions 
    # n: Number of hypotheses to select
    # population: Population of hypotheses
    # fitness: Fitness array
    # ftotal: Sum of fitness values 
    # target: Target array to append selected hypotheses 
    def stochastic_universal(self, n, target):
        step = self.ftotal / n                       # Size of selection bins 
        start = random.uniform(0, step)     # Random in range [0, step)

        # Each pointer value indicates a selection threshold
        # When the sum of probabilities exceeds a pointer, the current hypotheses is selected 
        pointers = [(start + (i * step)) for i in range(0, n)]

        for ptr in pointers:
            ind = 0               # Current index 
            running_sum = 0.0       # Sum of probabilities (fit(h) / total_fitness)

            # Until the threshold is exceeded 
            while running_sum < ptr: 
                running_sum += self.fitness[ind]
                ind += 1 

            target.append(self.population[ind - 1])
    
    
    # ***
    # Set all generation parameters to zero/empty/none before
    # initialisation and beginning a new run 
    def reset_generation(self):
        self.start, self.end = 0, 0
        
        # GA Variables 
        self.elite = []
        self.fitness = []
        self.population = [] 
        
        # Generation stats
        self.generation_count = 0 
        self.num_stable_generations = 0 
        self.fmax, self.favg, self.ftotal = 0, 0, 0

        self.history = {"max": [], "avg": []}
    
    
    # ***
    # Function to initialise the population 
    # with starting graph G, src and destination 
    def initialise(self, G, src, dst):
        self.G = G
        self.src = src
        self.dst = dst
        
        # Reset stats
        self.reset_generation()
        
        # Generate a random population 
        # Graph; start node; dest node; size of population 
        self.population = self.gen_population(self.P_SIZE) 
        self.fitness, self.fmax, self.ftotal, self.favg = self.evaluate(self.population)
        
        # Initialise previous generation elite at P(0) 
        self.elite = self.population[np.argmax(self.fitness)]
    
    
    # ***
    # Return the best candidate in current population 
    def get_best_candidate(self): 
        if (not self.fitness) or (not self.population):
            return [] 
        
        ind = np.argmax(self.fitness)
        best = self.population[ind]
        return (self.path_length(best), 
                self.fitness[ind], best)
        
        
    # ***
    # Get performance results from current generation 
    def get_results(self):
        # Return stats and performance 
        length, fit, best = self.get_best_candidate()
        runtime = self.end - self.start 
        
        results = {"generations": self.generation_count, 
                   "runtime": runtime,
                   "history": self.history, 
                   "solution": best, 
                   "length": length, 
                   "fitness": fit}
        
        return results 
    
    
    # *** 
    # Generate elitist immigrants from P(t-1)
    def gen_ei(self, gen):
        ei_list = []
        n_ei = int(self.EI * self.P_SIZE)

        # Generate ei*p elitist immigrants 
        for i in range(n_ei):
            # Mutate elite from p(t - 1)
            if random.random() < self.IM:
                ei_list.append(self.mutate(self.elite)) 
            else:
                ei_list.append(self.elite)

        return ei_list 
    
    
    # ***
    # Generate random immigrants
    def gen_ri(self, gen):
        n_ri = int(self.RI * self.P_SIZE)
        return self.gen_population(n_ri)
    
    
    # ***
    # Run a single generation of GA 
    def run_generation(self):
         # Append to training history 
        self.history["max"].append(self.fmax)
        self.history["avg"].append(self.favg)
        
        # Create new generation 
        gen = [] 
        
        # Select ((1 - r) * p) hypotheses to persist into the next generation
        # Hypotheses are selected with fitness proportional probability 
        # Pr(hi) = fit(hi) / sumj(fit(hj))
        num_hypotheses = int((1 - self.R_RATE) * self.P_SIZE)
        self.stochastic_universal(num_hypotheses, gen)
        
        # Select (r * p) / 2 pairs of parents for crossover 
        # Parent hypotheses are chosen with fitness proportional probability 
        # Pr(hi) = fit(hi) / sumj(fit(hj))
        parents = [] 
        num_parents = int(self.R_RATE * self.P_SIZE) 
        self.stochastic_universal(num_parents, parents)
        
        # Crossover pairs of parents 
        pairs = zip(parents, parents[1:])  # Zip into pairs [1, 2, 3] -> (1, 2), (2, 3)
        for a, b in pairs:
            e, f = self.crossover(a, b)   # Crossover parents a,b to get children c,d 
            gen.append(e) 
            gen.append(f)
            
        # Choose m percent of new population uniformly to mutate 
        m_size = int(self.M_RATE * self.P_SIZE)
        for i in np.random.choice(len(gen), m_size):
            gen[i] = self.mutate(gen[i]) 
            
        # *** BEGIN HIGA SCHEME ***
        
        # Evaluate interim population 'gen'
        (gf, gmax, gtotal, gavg) = self.evaluate(gen)
        
        immigrants = [] 
        
        # If EIGA is true 
        if self.flag_eiga:
            immigrants = immigrants + self.gen_ei(immigrants)
        
        if self.flag_riga:
            immigrants = immigrants + self.gen_ri(immigrants)
    
        # Replace the ri*p worst candidates with immigrants 
        # O(n log k) to find k smallest elements in array of n 
        #   - Use min heap of size k, compare each element to heap min, i
        #        if smaller then remove max and replace 
        n_immigrants = len(immigrants)
        if n_immigrants:
            heap = []
            for i in range(n_immigrants):
                # Tuple (fitness, index) and order heap on first element
                heapq.heappush(heap, (-(gf[i]), i)) 
            heapq.heapify(heap)

            # Add smaller elements and maintian heap 
            for i in range(n_immigrants, len(gen)):
                if -(gf[i]) < heap[0][0]:
                    heap[0] = heap[-1]
                    heap.pop()
                    heap.append(((-gf[i]), i))
                    heapq.heapify(heap)

            # Replace worst candidates 
            for i in range(n_immigrants):
                ind = heap[i][1]
                gen[ind] = immigrants[i]
    
            # Set elite 
            # Elite is best of P(t - 1), 
            # Each generation G(t) works on P(t) to Generate P(t + 1)
            self.elite = self.population[np.argmax(self.fitness)]
    
        # *** END of HI SCHEME *** 
        
        # Population equal to copy of generation 
        self.population = list(gen) 
        
        # Evaluate the new population fitness
        _fmax = self.fmax
        self.fitness, self.fmax, self.ftotal, self.favg = self.evaluate(self.population)
        
        # Check if maximum fitness solution has changed 
        if _fmax == self.fmax:
            self.num_stable_generations += 1 
        else:
            # Reset when max fitness changes 
            self.num_stable_generations = 0 
        
        # Increment generation count
        self.generation_count += 1 
        
    
    # ***
    def get_generation_stats():
        return (self.fitness, self.fmax, self.favg)
        
        
    # ***
    # Run full GA to ITERATIONS or THRESHOLD 
    def run(self):
        self.start = time.time()
        while (self.num_stable_generations < self.THRESHOLD) and (self.generation_count < self.ITERATIONS):
            self.run_generation()
        self.end = time.time()
        # Return final results
        return self.get_results() 
    
    
    # ***
    def set_iterations(itr):
        self.ITERATIONS = itr 
    
    
    # ***
    def set_threshold(thr):
        self.THRESHOLD = thr

**Testing This GA**

In [117]:
# Try a simple run and see what break
G = nx.erdos_renyi_graph(250, 0.25)

for (i, j) in G.edges:
    G[i][j]['weight'] = np.random.randint(0, 100)

ga = GA()

In [118]:
ga.M_RATE = 0.03
ga.set_riga()
ga.initialise(G, 0, 69)
results = ga.run()

In [119]:
results

{'generations': 12,
 'runtime': 2.5925395488739014,
 'history': {'max': [0.00641025641025641,
   0.00641025641025641,
   0.00641025641025641,
   0.00641025641025641,
   0.00641025641025641,
   0.00641025641025641,
   0.00641025641025641,
   0.00641025641025641,
   0.00641025641025641,
   0.00641025641025641,
   0.00641025641025641,
   0.00641025641025641],
  'avg': [0.0004536648932897524,
   0.002313397321879178,
   0.004491237987957272,
   0.0052435080636817075,
   0.005432891927171749,
   0.005601664887188991,
   0.005712451910118738,
   0.005665373402369671,
   0.005676371678564946,
   0.005708208763501877,
   0.005741244571190307,
   0.005799304603056402]},
 'solution': [0, 214, 69],
 'length': 156,
 'fitness': 0.00641025641025641}