# Research Experiments on the effect of Selection on Generalization for symbolic Regression in GP

* Masterseminar: SoSe 2022
* JGU Mainz
* FB 03 Recht-und Wirtschaftswissenschaften
* Lehrstuhl für Wirtschaftsinformatik und BWL

## Dependencies

In [100]:
import numpy as np
import pandas as pd
import operator
import os
import math
from copy import deepcopy
from deap import gp, tools, creator, base, algorithms
from sklearn.model_selection import train_test_split
from typing import Tuple, Dict, Callable
from random import randint
from sys import stderr

## Energy efficiency Data Set

Source: https://archive.ics.uci.edu/ml/datasets/energy+efficiency

In [21]:
if not os.path.exists("./ENB2012_data.xlsx"):
    os.system("wget https://archive.ics.uci.edu/ml/machine-learning-databases/00242/ENB2012_data.xlsx -P ./data")

--2022-05-29 11:36:50--  https://archive.ics.uci.edu/ml/machine-learning-databases/00242/ENB2012_data.xlsx
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 76189 (74K) [application/x-httpd-php]
Saving to: ‘./data/ENB2012_data.xlsx’

     0K .......... .......... .......... .......... .......... 67%  122K 0s
    50K .......... .......... ....                            100%  133M=0,4s

2022-05-29 11:36:51 (182 KB/s) - ‘./data/ENB2012_data.xlsx’ saved [76189/76189]



In [23]:
def get_datasets() -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Read .xlsx dataset at <D_PATH> and return two randomly split DFs for training/testing"""

    D_PATH = "data/ENB2012_data.xlsx"
    TRAINING_D_SPLITSIZE = 0.5

    df = pd.read_excel(D_PATH)

    return train_test_split(df, train_size=TRAINING_D_SPLITSIZE, test_size=(1-TRAINING_D_SPLITSIZE))    


trainDF, testDF = get_datasets()

In [24]:
trainDF.head()

Unnamed: 0,X1,X2,X3,X4,X5,X6,X7,X8,Y1,Y2
354,0.79,637.0,343.0,147.0,7.0,4,0.25,2,39.97,36.77
111,0.82,612.5,318.5,147.0,7.0,5,0.1,2,22.79,28.79
425,0.64,784.0,343.0,220.5,3.5,3,0.25,3,16.93,20.03
141,0.62,808.5,367.5,220.5,3.5,3,0.1,2,13.0,14.57
737,0.79,637.0,343.0,147.0,7.0,3,0.4,5,41.96,37.7


In [25]:
trainDF.describe()

Unnamed: 0,X1,X2,X3,X4,X5,X6,X7,X8,Y1,Y2
count,384.0,384.0,384.0,384.0,384.0,384.0,384.0,384.0,384.0,384.0
mean,0.767917,668.071615,319.074219,174.498698,5.341146,3.492188,0.238672,2.796875,22.968594,25.139792
std,0.104626,86.655605,44.239605,45.04631,1.749905,1.095894,0.13264,1.539859,10.417982,9.812793
min,0.62,514.5,245.0,110.25,3.5,2.0,0.0,0.0,6.04,11.17
25%,0.69,588.0,294.0,122.5,3.5,3.0,0.1,1.0,12.9675,15.4625
50%,0.76,661.5,318.5,147.0,7.0,4.0,0.25,3.0,23.63,25.02
75%,0.86,735.0,343.0,220.5,7.0,4.0,0.4,4.0,32.135,33.79
max,0.98,808.5,416.5,220.5,7.0,5.0,0.4,5.0,43.1,47.59


## Data Visualization

In [28]:
from matplotlib import pyplot as plt
import networkx as nx
import pygraphviz as pgv

%matplotlib inline

def plot_exprTree(expr_tree, title:str) -> None:
    """plots an expression tree"""
    nodes, edges, labels = gp.graph(expr_tree)

    g = nx.Graph()
    g.add_nodes_from(nodes)
    g.add_edges_from(edges)
    
    pos = nx.drawing.nx_agraph.graphviz_layout(g, prog="dot")

    nx.draw_networkx_nodes(g, pos)
    nx.draw_networkx_edges(g, pos)
    nx.draw_networkx_labels(g, pos, labels)

    plt.title(title)
    plt.show()

## Implementing protected functions for GP

Source:
J.  Koza,  Genetic Programming: On the Programming of Computers by Means of Natural Selection (MIT Press, Cambridge, 1992)

In [107]:
def pdiv(lhs: float, rhs: float) -> float:
    """
    Koza Style implementation of division
    [@Koza2005]
    """
    if rhs == 0:
        return 1
    return lhs / rhs

def plog(x: float) -> float:
    """
    Koza Style implementation of natural logarithm
    [@Koza2005]
    """
    if x == 0:
        return 0
    return math.log(abs(x))
    

def psqrt(x: float) -> float:
    """
    Koza Style implementation of square root
    [@Koza2005]
    """
    return math.sqrt(abs(x))


def ppow(base: float, power: float) -> float:
    """
    Adjusted Implementation of power operator
    [@fsets_generalisation]
    """
    if (base != 0) or (base == power == 0):
        return abs(base) ** power
    return 0

## GP System Setup

### Primitive set

In [30]:
UVS = {
    "ARG0" : "X1",
    "ARG1" : "X2",
    "ARG2" : "X3",
    "ARG3" : "X4",
    "ARG4" : "X5",
    "ARG5" : "X6",
    "ARG6" : "X7",
    "ARG7" : "X8",
}

# register the Primitive Set
PSET = gp.PrimitiveSet("MAIN", arity=len(UVS))

# rename ARGS to match the dataset
for arg, des in UVS.items():
    pset.renameArguments(arg=des)



# adding to pset

operators = (
    (operator.add, 2),
    (operator.sub, 2),
    (operator.mul, 2),
    (math.sin, 1),
    (math.cos, 1),
    (operator.neg, 1)
    (pdiv, 2),
    (plog, 1),
    (psqrt, 1),
    (ppow, 2)
)

for (func, arity) in operators:
    pset.addPrimitive(func, arity)

pset.addEphemeralConstant("rand1", lambda: randint(-1,1))


# min fitness object
# objective: minimize mse/mae for y1^/y2^
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))

# individuals program
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMin)



# TODO: research optimal configuration from literature
toolbox = base.Toolbox()
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=2)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)

# Fitness Functions

In [109]:
def evaluate_single_case(func:Callable, case:pd.core.series.Series, target_var:str, err_metric:str) -> float:
    """
    Evaluates an individual, compiled program for a single fitness case (=row of pd.dataframe), computes and returns error for prediction and outcome for target_var and model prediction

    Options:

        target_var:
            "y1" (heating load)
            "y2" (cooling load)

        err_metric:
            "squared" (error)
            "absolute" (error)

    """
    assert (target_var.lower() == "y1") or (target_var.lower() == "y2")

    # compute individual with case variables
    prediction = func(*case[0:8:].values)

    # optimal value:
    if target_var.lower() == "y1":
        value = case.values[0][8]
    elif target_var.lower() == "y2":
        value = case.values[0][9]

    # compute and return error as defined by err_metric
    if err_metric.lower() == "squared":
        return ((prediction - value) ** 2)

    elif err_metric.lower() == "absolute":
        return abs(prediction - value)
        
    else:
        print(f'invalid input for err_metric! Must be "squared" or "absolute"', file=stderr)
        raise ValueError


In [108]:
# fitness function for all fitness case:
def evaluate_all_cases (individual:creator.Individual, df:pd.core.frame.DataFrame, target_var:str, err_metric:str) -> tuple[float]:
    """
    Evaluates an individual program for all fitness cases (=rows of pd.dataframe) inside the dataframe, computes and returns the mean for err_metric of prediction and target_var 
    """
    # Transform the tree expression in a callable function
    compiled_individual = toolbox.compile(expr=individual)
    
    n = len(df)
    error_aggregate = 0.0

    # iterate through all fitness cases and aggregate absolute errors
    for _, fitness_case in df.iterrows():
        error_aggregate += evaluate_single_case(func=compiled_individual, case=fitness_case, target_var=target_var, err_metric=err_metric)
    
    # compute and return MAE
    mean_error = error_aggregate / n
    return (
        mean_error,
        )


In [111]:
#TODO: test fitness functions

## Statistics

In [None]:
stats_fit = tools.Statistics(lambda ind: ind.fitness.values)
stats_size = tools.Statistics(len)
mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
mstats.register("avg", np.mean)
mstats.register("std", np.std)
mstats.register("min", np.min)
mstats.register("max", np.max)

In [None]:
# GP system setup



def train_tournament(target_val:str, err_metric_str):

    toolbox.register("evaluate", evaluate_all_cases, df=trainDF)

    # registration:
    toolbox.register("select", tools.selTournament, tournsize=3)
    toolbox.register("mate", gp.cxOnePoint)
    toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
    toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)

    # decoration:
    toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17))
    toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17))


    pop = toolbox.population(n=300)
    hof = tools.HallOfFame(1)
    pop, log = algorithms.eaSimple(pop, toolbox, 0.5, 0.1, 40, stats=mstats, halloffame=hof, verbose=True)

    for elite in hof:
        winner = elite
        print (elite)
        plot_exprTree(elite, "Best Solution")


    winner_func = gp.compile(winner, pset)

    abs_err_agg = 0.0
    n = len(testDF)

    for _, case in testDF.iterrows():
        abs_err_agg += abs(winner_func(*case[0:8:].values) - case[8:9:].values[0])

    MAE = abs_err_agg / n

    print("Mean absolute error for unknown Dataset = ", MAE)

In [1]:
mstats?

Object `mstats` not found.
