<a href="https://colab.research.google.com/github/muditsatija08/genetic-evolution/blob/main/ExcerQ2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Install DEAP.

In [None]:
!pip install deap



In [None]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

Import our tools.

In [None]:
import operator
import math
import random

import numpy

from deap import algorithms
from deap import base
from deap import creator
from deap import tools
from deap import gp

import csv
import pandas as pd
import matplotlib.pyplot as plt

import itertools
import networkx as nx

Set our Genetic Programming parameters, one of which is now the number of runs.

In [None]:
# Genetic Programming constants:
POPULATION_SIZE = 50
P_CROSSOVER = 0.9
P_MUTATION = 0.01
MAX_GENERATIONS = 50
HALL_OF_FAME_SIZE = 10

N_RUNS = 30

Set the random seed.

In [None]:
RANDOM_SEED = 412
random.seed(RANDOM_SEED)

GP-Specific constants.

In [None]:
MIN_TREE_HEIGHT = 3
MAX_TREE_HEIGHT = 5
LIMIT_TREE_HEIGHT = 17
MUT_MIN_TREE_HEIGHT = 0
MUT_MAX_TREE_HEIGHT = 2

Reading the data

In [None]:
data = pd.read_csv('Input_File.csv',names = ['Input','Target'])
data.head()

Define our fitness function. In this fitness function we are calculating mean square error using


In [None]:
def evalSymbReg(individual,data):
    # Transform the tree expression in a callable function
    func = toolbox.compile(expr=individual)
    # Evaluate the sum of squared difference between the expression and the target values
    # diff = sum((func(*row[:-1]) - row[-1])**2 for row in data.values)

    target_data = data['Target']
    predicted_data = np.array([func(x) for x in data['Input']])

    # Calculate the mean squared error between predicted and target data
    mse = sum((predicted - target) ** 2 for predicted, target in zip(predicted_data, target_data))

    error = mse/len(data)

    if (error>10):
        error=10

    #return error, individual.height
    nodes, edges, labels = gp.graph(individual)
    return error, len(nodes)

Because GP can mix and match operators and operands in an unconstrained way, we need to protect some our operators to prevent them from causing a crash.

In [None]:
# Protected functions
def protectedDiv(left, right):
    try:
        return left / right
    except ZeroDivisionError:
        return 1

Add our functions and terminals.

In [None]:
pset = gp.PrimitiveSet("MAIN", 1) # number of inputs!!!
pset.addPrimitive(operator.add, 2)
pset.addPrimitive(operator.sub, 2)
pset.addPrimitive(operator.mul, 2)
pset.addPrimitive(protectedDiv, 2)
pset.addPrimitive(operator.neg, 1)
pset.addPrimitive(math.cos, 1)
pset.addPrimitive(math.sin, 1)
pset.addEphemeralConstant("rand101", lambda: random.random())


Create our toolbox.

In [None]:
toolbox = base.Toolbox()

creator.create("FitnessMin", base.Fitness, weights=(-1.0,1.0))
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMin)

toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=2)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)

toolbox.register("evaluate", evalSymbReg,data = data)
#toolbox.register("select", tools.selNSGA2)
toolbox.register("select", tools.selTournament, tournsize=5)

toolbox.register("mate", gp.cxOnePoint)
toolbox.register("expr_mut", gp.genFull, min_=0, max_=5)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)

toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17))
toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17))

Create our statistics. These are a bit more complex than the GA ones because we want to keep track of fitness and size for all runs.

In [None]:
maxListFitness = []
avgListFitness = []
minListFitness = []
stdListFitness = []

maxListSize = []
avgListSize = []
minListSize = []
stdListSize = []

Now the magic happens and we run **N_RUNS** times. Always start with a small number of runs and generations to make sure that everything is working properly before you commit to a larger number. That way, if something goes horribly wrong, Python won't replicate it 30 times before giving you back control!

In [None]:
best_individuals = []
for r in range(0, N_RUNS):
    population = toolbox.population(n=POPULATION_SIZE)
    # define the hall-of-fame object:
    hof = tools.HallOfFame(HALL_OF_FAME_SIZE)


    # Create our statistics
    stats_fit = tools.Statistics(lambda ind: ind.fitness.values)
    stats_size = tools.Statistics(len)
    mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
    mstats.register("avg", numpy.mean)
    mstats.register("std", numpy.std)
    mstats.register("min", numpy.min)
    mstats.register("max", numpy.max)


    # Which run are we on?
    print("\n\nCurrently on run", r, "of",N_RUNS)


    # It's usually a good idea to turn off verbose when conducting multiple runs
    population, logbook = algorithms.eaSimple(population,
                                                  toolbox,
                                                  cxpb=P_CROSSOVER,
                                                  mutpb=P_MUTATION,
                                                  ngen=MAX_GENERATIONS,
                                                  stats=mstats,
                                                  halloffame=hof,
                                                  verbose=False)

    #maxFitnessValues, meanFitnessValues = logbook.chapters['fitness'].select("min", "avg")
    meanFitnessValues, stdFitnessValues, minFitnessValues, maxFitnessValues  = logbook.chapters['fitness'].select("avg", "std", "min", "max")
    meanSizeValues, stdSizeValues, minSizeValues, maxSizeValues  = logbook.chapters['size'].select("avg", "std", "min", "max")


    # Save statistics for this run:
    avgListFitness.append(meanFitnessValues)
    stdListFitness.append(stdFitnessValues)
    minListFitness.append(minFitnessValues)
    maxListFitness.append(maxFitnessValues)

    avgListSize.append(meanSizeValues)
    stdListSize.append(stdSizeValues)
    minListSize.append(minSizeValues)
    maxListSize.append(maxSizeValues)

    # print info for best solution found:
    best = hof.items[0]
    best_individuals.append(best)
    print("-- Best Individual = ", best)
    print("-- length={}, height={}".format(len(best), best.height))
    print("-- Best Fitness = ", best.fitness.values[0])


In [None]:
best_ind = min(best_individuals, key=lambda x: x.fitness.values[0])
best_ind_func = toolbox.compile(best_ind)
print(f"The best individual is:\n {best_ind}")
print(f"Best Fitness:\n {best_ind.fitness.values[0]}")

In [None]:
predictions = [best_ind_func(x) for x in data.iloc[:, 0]]
plt.scatter(data.iloc[:, 0], data.iloc[:, 1], label="Expected curve", c = 'red')
plt.scatter(data.iloc[:, 0], predictions, label="Predicted curve", c = 'green')
plt.show()