In [1]:
%load_ext autoreload
%autoreload 2

In [1]:
import os
import numpy as np 
from tqdm import tqdm
import operator
import random 
from typing import Tuple, List
import operator 

In [2]:
# deap-specific 
from deap import base, creator, tools, algorithms, gp
from deap import benchmarks

## Symbolic Regression

Genereate/search a mathematical expression that best fits a given dataset 

In [3]:
pop_size = 100
max_gen = 50
cx_rate = 1.0
mut_rate = 0.2

In [4]:
import math
def protect_div(a, b): return 1 if b == 0 else a/b
def protect_sqrt(a): return math.sqrt(abs(a))

In [5]:
## generate primtives, the building block
#pset = gp.PrimitiveSetTyped("main", [float], float)
#
#pset.addPrimitive(np.add, [float,float], float, name = "add") # pset.addPrimitive(, )
#pset.addPrimitive(np.subtract, [float,float], float, name = "sub")
#pset.addPrimitive(np.multiply, [float,float], float, name = "mul")
#pset.addPrimitive(np.negative, [float,float], float, name = "gp_neg")
#pset.addPrimitive(protect_div, [float,float], float, name = "gp_div")
#pset.addPrimitive(protect_sqrt, [float], float, name = "gp_sqrt")
#pset.addPrimitive(math.cos, [float], float, name = 'cos')
#pset.addPrimitive(math.sin, [float], float, name = 'sin')
#
#pset.addTerminal(1.0, float)
#pset.addEphemeralConstant('rand101', lambda: random.randint(-1,1), float)

In [5]:
pset = gp.PrimitiveSet("main", 1)

pset.addPrimitive(operator.add, 2)
pset.addPrimitive(operator.sub, 2)
pset.addPrimitive(operator.mul, 2)
pset.addPrimitive(protect_div, 2)
pset.addPrimitive(operator.neg, 1)
pset.addPrimitive(math.cos, 1)
pset.addPrimitive(math.sin, 1)
pset.addEphemeralConstant("rand101", lambda: random.randint(-1,1))
pset.renameArguments(ARG0='x')

In [6]:
# define types 
creator.create("FitnessMin", base.Fitness, weights = (-1.0,))
creator.create("Individual", gp.PrimitiveTree, fitness = creator.FitnessMin)

In [7]:
# initialisation 
toolbox = base.Toolbox() # toolbox container that contains all sort of functions to be used later
toolbox.register("expr", gp.genHalfAndHalf, pset = pset, min_ = 1, max_ = 2)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr) 
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset = pset)

In [8]:
# define & register operators
def evalSymbReg(individual, points):
    # Transform the tree expression in a callable function
    func = toolbox.compile(expr=individual)
    # Evaluate the mean squared error between the expression
    # and the real function : x**4 + x**3 + x**2 + x
    sqerrors = ((func(x) - x**4 - x**3 - x**2 - x)**2 for x in points)
    return math.fsum(sqerrors) / len(points),


toolbox.register("evaluate", evalSymbReg,  points=[x/10. for x in range(-10,10)]) # [-1,1] # => not the best form 
# for (parent) selection, mutation, crossover
toolbox.register("select", tools.selTournament, tournsize = 3) # k can be 2 or pop_size, # tools.selRoulette
toolbox.register("mate", gp.cxOnePoint) # toolx.cxOnePoint
toolbox.register("mut_expr", gp.genFull, min_ = 0, max_ = 2)
toolbox.register("mutate", gp.mutUniform, expr = toolbox.mut_expr, pset = pset) # or toolbox.expr

# decorate:e.g., set limit
toolbox.decorate("mate", gp.staticLimit(key = operator.attrgetter("height"), max_value = 17))
toolbox.decorate("mutate", gp.staticLimit(key = operator.attrgetter("height"), max_value = 17))

In [11]:
def check_dup(ind, pop):
    import operator
    for p in pop:
        if str(ind) == str(p):
            return True 
    return False 

# one-max ga algorithm 
def main(
    toolbox, 
    creator, 
    pop_size, 
    cx_rate, 
    mut_rate
) -> list:
    # init population 
    outs = []
    pop = toolbox.population(n = pop_size)
    # fitness computation 
    fitnesses = map(toolbox.evaluate, pop)
    for ind,fit in zip(pop, fitnesses):
        ind.fitness.values = fit 
    
    best = tools.HallOfFame(maxsize = 1)#, similar = ) 
    best.update(pop)
    
    # statistics -> average, max, min
    stats = tools.Statistics(lambda ind: ind.fitness.values[0])
    stats.register("mean", np.mean, axis = 0)
    stats.register("max", np.max, axis = 0)
    stats.register("min", np.min, axis = 0)
    stats_result = stats.compile(pop)
    
    # for logging
    logbook = tools.Logbook()
    logbook.record(
        gen = 0, 
        max = stats_result["max"],
        mean = stats_result["mean"], 
        min = stats_result["min"]
    )

    # loop 
    for idx in range(max_gen):
        next_pop = [] 
        # crossover & mutate  
        while len(next_pop) < pop_size: 
            p1, p2 = toolbox.select(pop, k = 2) 
            offsprings = algorithms.varAnd([p1, p2], toolbox, cx_rate, mut_rate) 
            for offspring in offsprings: 
                if len(next_pop) < pop_size and not check_dup(offspring, next_pop): 
                    next_pop.append(offspring)  

        # fitness evaluation 
        fitnesses = map(toolbox.evaluate, next_pop)
        for ind,fit in zip(next_pop, fitnesses):
            ind.fitness.values = fit 
        fitness_vs = [ind.fitness.values[0] for ind in next_pop]
        
        # next gen population selection 
        pop = tools.selBest(pop + next_pop, k = pop_size) 
        #pop[:] = next_pop # replacing
        
        best.update(pop) 
        #print (f"Gen {idx}, \n\tmin:{np.min(fitness_vs)}, max:{np.max(fitness_vs)}, avg:{np.round(np.mean(fitness_vs))}")
        			#Logging current status the pop
        stats_result = stats.compile(pop)
        max_v, min_v, mean_v = stats_result["max"], stats_result["min"], stats_result["mean"]
        logbook.record(gen = idx, max = max_v, mean = mean_v, min = min_v)
        print (logbook)
        
        print (f"\tGeneration {idx}: {max_v}(Max), {min_v}(AVG), {mean_v}(MIN)")
        #print ("\t\tbest individual: {} ({})".format(str(best[0]),best[0].fitness.values[0]))
        outs.append(fitness_vs)
        if np.max(fitness_vs) == len(pop[0]):
            return best, outs

    return best, outs 

founded, outs = main(
    toolbox, 
    creator, 
    pop_size, 
    cx_rate, 
    mut_rate
)

gen	max    	mean   	min     
0  	14.4187	1.87259	0.403128
0  	1.15624	0.88712	0.403128
	Generation 0: 1.1562428329999999(Max), 0.4031275340904375(AVG), 0.887119950809819(MIN)
gen	max     	mean    	min     
0  	14.4187 	1.87259 	0.403128
0  	1.15624 	0.88712 	0.403128
1  	0.924508	0.671553	0.328818
	Generation 1: 0.9245079952640655(Max), 0.32881776467725155(AVG), 0.671553313310067(MIN)
gen	max     	mean    	min     
0  	14.4187 	1.87259 	0.403128
0  	1.15624 	0.88712 	0.403128
1  	0.924508	0.671553	0.328818
2  	0.647008	0.552702	0.328818
	Generation 2: 0.647007772974688(Max), 0.32881776467725155(AVG), 0.5527022920821951(MIN)
gen	max     	mean    	min     
0  	14.4187 	1.87259 	0.403128
0  	1.15624 	0.88712 	0.403128
1  	0.924508	0.671553	0.328818
2  	0.647008	0.552702	0.328818
3  	0.584027	0.519179	0.328818
	Generation 3: 0.5840266079868149(Max), 0.32881776467725155(AVG), 0.5191791571454748(MIN)
gen	max     	mean    	min     
0  	14.4187 	1.87259 	0.403128
0  	1.15624 	0.88712 	0.403128

In [14]:
str(founded[0])

'add(mul(add(x, add(0, mul(add(mul(x, x), mul(mul(x, x), protect_div(1, x))), add(x, 0)))), x), x)'

In [10]:
#pop = toolbox.population(n=300)
#hof = tools.HallOfFame(1)
#pop, log = algorithms.eaSimple(pop, toolbox, 0.5, 0.1, 40,  halloffame=hof, verbose=True)

TypeError: <lambda>() missing 3 required positional arguments: 'ARG1', 'ARG2', and 'ARG3'