In [None]:
# Question 4: Symbolic Regression (20%)

# This is an implementation of m-HGPPSO (section 3) with additional features A, B, C (section 2)
# A: hillclimbing, B: vertical shift fitness term (weight 0.01), C: elitism


# Packages
import time                    # For timing five minutes
import numpy as np             # For mathematics
import random as rand          # For random numbers
from copy import deepcopy      # For cloning individuals
from inspect import signature  # For handling functions with one or two arguments

# Let numpy throw errors
np.seterr(all='raise')

# Read input
vals = np.array([np.array([float(v) for v in s.split(" ")]) for s in open("datafile.txt", "r").read().splitlines()])
X = vals[:,0:2]
Y = vals[:,2]

# Constants
d = len(X[0]) # Dimension of unknown f
D = len(X)    # Number of datapoints          
trials = 10   # Number of times PSO is run    (+) Re-seed the population as much as possible
POP = 200     # Population size               (+) POP size informed by section 2 performance
XOR = 0.8     # Crossover rate                (+) Very frequent crossover to encourage exploration 
MUR = 1/32    # Grow Mutation rate            (+) Encourages tree depth of log_2(32) = 5, bit of a random guess.
PMR = 0.1     # Point mutation rate           (+) Very frequent mutations to encourage exploration
GEN = 30      # Maximum number of generations (+) Don't waste too much time before re-seeding
w_B = 0.01    # Weight of v-shift fitness term
MINDEPTH = 2  # Minimum initial tree depth
MAXDEPTH = 5  # Maximum initial tree depth
MAXDHARD = 9  # Hard limit on depth           (+) Assume there is no need for very deep trees.

# Termination criterion | Unimportant, 0.01 is close enough.
def epsilon_close(x, y):
    return abs(x - y) < 0.01

# Node values - one-argument non-terminals read LEFT node only.
def add(x, y): return x + y
def mul(x, y): return x * y
def sub(x, y): return x - y
def div(x, y): return x / y
def sin(x):    return np.sin(x)
def cos(x):    return np.cos(x)
def exp(x):    return np.exp(x)
def pwr(x, n): return np.power(x, n)
def rut(x, n): return np.power(x, 1 / n)
F = [add, mul, sub, div, sin, cos, exp, pwr, rut]
T = ['x', 'y', 1, 2, 3, 4, 10, 100, np.pi]

# Genetic programme class, adapted from tutorial [1]
class GP: 
    
    # Constructor: 
    def __init__(self, p = None, pos = None, v = None, l = None, r = None): 
        self.p = p        # parent function
        self.pos = pos    # left/right
        self.v = v        # node value
        self.l = l        # left arg
        self.r = r        # right arg
        
    # String repr. of tree root node
    def to_str(self): 
        if (self.v in F): return self.v.__name__
        else: return str(self.v)
    
    # Print whole tree
    def tree_print(self, prefix=""): # Print program tree
        print("%s%s" % (prefix, self.to_str()))        
        if self.l: self.l.tree_print(prefix + "  ")
        if self.r: self.r.tree_print(prefix + "  ")

    # Run the program on input
    def apply(self, x, y):
        if self.v in F:  
            try:
                if len(signature(self.v).parameters) == 2:                # Two arguments - add, mul, sub, div, pwr, rut
                    return self.v(self.l.apply(x, y), self.r.apply(x, y))
                else: return self.v(self.l.apply(x, y))                   # One argument  - sin, cos, exp
                
            # Catch numerical or closure errors
            except FloatingPointError: return float("inf") # ex- x**np.pi
            except ZeroDivisionError:  return float("inf") # ex- x / 0
            except ValueError:         return float("inf") # ex- x**(-1)
        elif self.v == 'x': return x
        elif self.v == 'y': return y
        else: return self.v
    
    # Constrained choices for error avoidance
    def __constrained(self, N):
        # Error type - negative value to fractional power, ex: (-1)^np.pi
        if self.pos == 1 and self.p == pwr: return ['x', 'y', 1, 2, 3, 4, 10, 100]
        else: return N
    
    # Generate random tree, "full" or "grow" method
    def random_tree(self, grow, max_depth, depth = 0):
        if depth < MINDEPTH or (depth < max_depth and not grow): 
            self.v = rand.choice(self.__constrained(F))
        elif depth >= max_depth:   
            self.v = rand.choice(self.__constrained(T))
        else:
            if rand.uniform(0, 1) > 0.5:
                self.v = rand.choice(self.__constrained(T))
            else: self.v = rand.choice(self.__constrained(F))
        if self.v in F:
            self.l = GP(p = self.v, pos = 0)
            self.l.random_tree(grow, max_depth, depth = depth + 1)    
            if len(signature(self.v).parameters) == 2:
                self.r = GP(p = self.v, pos = 1)
                self.r.random_tree(grow, max_depth, depth = depth + 1)
    
    # Grow mutate
    def mutate(self): 
        if rand.uniform(0, 1) < MUR:
            self.random_tree(grow = True, max_depth = 2)
        elif self.l: self.l.mutate()
        elif self.r: self.r.mutate() 
            
    # Point mutate
    def point_mutate(self):
        if rand.uniform(0, 1) < PMR:
            if self.v in T: self.v = rand.choice(self.__constrained(T))
            else: 
                if len(signature(self.v).parameters) == 2:
                    self.v = rand.choice([add, mul, sub, div, pwr, rut])
                else: self.v = rand.choice([sin, cos, exp])
        if self.l: self.l.point_mutate()
        if self.r: self.r.point_mutate()
            
     # Tree depth
    def depth(self, curr_depth=0):
        below = curr_depth
        if self.l: below = max(below, self.l.depth(curr_depth + 1))
        if self.r: below = max(below, self.r.depth(curr_depth + 1))
        return below
    
    # Tree size in nodes
    def size(self): 
        if self.v in T: return 1
        l = self.l.size() if self.l else 0
        r = self.r.size() if self.r else 0
        return 1 + l + r

    # Builds an identical tree
    def build_subtree(self): 
        t = GP()
        t.v = self.v     # Function
        t.p = self.p     # Parent
        t.pos = self.pos # Position
        if self.l: t.l = self.l.build_subtree()
        if self.r: t.r = self.r.build_subtree()
        return t
    
    # Scan tree
    def scan_tree(self, count, second): 
        count[0] -= 1
        if count[0] <= 1: 
            if not second:
                return self.build_subtree()
            else:
                self.v = second.v
                self.l = second.l
                self.r = second.r
        else:  
            ret = None              
            if self.l and count[0] > 1: ret = self.l.scan_tree(count, second)  
            if self.r and count[0] > 1: ret = self.r.scan_tree(count, second)  
            return 
        
    # Crossover two trees at random
    def crossover(self, other):
        if rand.random() < XOR:
            second = other.scan_tree([rand.randint(1, other.size())], None) # 2nd random subtree
            self.scan_tree([rand.randint(1, self.size())], second) # 2nd subtree "glued" inside 1st tree

# Raw fitness
def fitness(gp):
    try:
        Z = [gp.apply(X[i][0], X[i][1]) for i in range(D)]
        
        # The function fits the data well
        DIS = 1 / (1 + np.sum([abs(Z[i] - Y[i]) for i in range(D)]))
        # The function is a vertical-shifted version the correct function
        LIN = 1 / (1 + np.std([Z[i] - Y[i] for i in range(D)]))
        return (1- w_B) * DIS + w_B * LIN
    except FloatingPointError: 
        return 0.0
            
# Selection
def max_select(gps): 
    fitnesses = np.array([fitness(gp) for gp in gps])
    return deepcopy(gps[np.argmax(fitnesses)])

# Roulette-wheel selection
def roulette_select(gps):
    fitnesses = np.array([fitness(gp) for gp in gps])
    sum_fitness = np.sum(fitnesses)
    if sum_fitness == 0.0:
        return deepcopy(rand.choice(gps))
    else:
        norm_fitnesses = fitnesses / sum_fitness
        choice = rand.choices(gps, weights=norm_fitnesses, k=1)[0]
        return deepcopy(choice)

# Local search of terminal values
def hillclimb(root, gp):
    if gp.v in T:
        if rand.uniform(0, 1) > 0.5:
            ter = gp.v
            for t in T:
                f1 = fitness(root)
                gp.v = t
                f2 = fitness(root)
                if f1 > f2: gp.v = ter
    if gp.l: hillclimb(root, gp.l)
    if gp.r: hillclimb(root, gp.r)
    
# Generation initial population, ramped half-and-half, may be seeded
def init_population(POP, seed=None): 
    if seed == None:
        pop = []
        for md in range(MINDEPTH+1, MAXDEPTH+1):
            for i in range(int(POP/6)):
                t = GP()
                t.random_tree(grow = True, max_depth = md) # Grow method
                pop.append(t)
            for i in range(int(POP/6)):
                t = GP()
                t.random_tree(grow = False, max_depth = md) # Full method
                pop.append(t)
    else:
        pop = [seed for i in range(POP//2)]
        for md in range(MINDEPTH+1,MAXDEPTH+1):
            for i in range(int(POP/12)):
                t = GP()
                t.random_tree(grow = True, max_depth = md) # Full method
                pop.append(t)
            for i in range(int(POP/12)):
                t = GP()
                t.random_tree(grow = False, max_depth = md) # Grow method
                pop.append(t)
    return pop

# Records
f_best = 0.0   # Best ever fitness
gp_best = None # Best ever program
GPs = []       # All programs

# Restarts
for trial in range(trials):
    # 1. Generate an initial population of GPs
    gp_g  = None                                          # Trial-best programme 
    GPs   = init_population(POP, gp_g)                    # Population
    GPs_p = [deepcopy(gp) for gp in GPs]                  # Personal program
    F_p   = [fitness(GPs[i]) for i in range(POP)]         # Personal fitness
    f_g   = 0.0                                           # Trial-best fitness
    
    # Begin counting
    tick = time.perf_counter()
    
    # 2. Run 
    for gen in range(GEN):
        # Evaluate fitnesses, replace personal and global bests
        for i in range(POP):
            f_i = fitness(GPs[i])
            if f_i > F_p[i]: # Personal best
                GPs_p[i] = deepcopy(GPs[i])
                F_p[i] = f_i
            if f_i > f_g:   # Global best
                gp_g = deepcopy(GPs[i])
                f_g = f_i
        
        # (+) Local search the best of the generation to adjust terminals.
        hillclimb(gp_g, gp_g)
        f_g = fitness(gp_g)
        
        # Check for termination criterion
        if epsilon_close(f_g, 1.0): break # Desired precision achieved.
        # 2c Create new population
        for i in range(POP):
            if time.perf_counter() - tick > 295: break  # Time-out.
            
            parent1a = deepcopy(GPs[i]); parent1b = deepcopy(GPs_p[i])
            parent2a = deepcopy(GPs[i]); parent2b = deepcopy(gp_g)
            
            # Crossover, mutation, point_mutation
            parent1a.crossover(parent1b); parent2a.crossover(parent2b); 
            parent1a.mutate(); parent1b.mutate();
            parent2a.mutate(); parent2b.mutate();
            parent1a.point_mutate(); parent1b.point_mutate();
            parent2a.point_mutate(); parent2b.point_mutate(); 
        
            # Select best children of OLDxPB, and OLDxGB.
            gp_new1 = max_select([parent1a, parent1b]) # Knowledge about PB
            gp_new2 = max_select([parent2a, parent2b]) # Knowledge about GB
            
            # Roulette best among [old, select(old x PB), select(old x GB)]
            gp_choice = roulette_select([GPs[i], gp_new1, gp_new2])

            # (+) Bloat control: throw out children that grow too large
            if gp_choice.depth() <= MAXDHARD: GPs[i] = gp_choice
                
        if time.perf_counter() - tick > 295: break  # Time-out.
    # Check for best of trials
    if f_g > f_best:
        f_best = f_g
        gp_best = deepcopy(gp_g)
        print("________________________")
        print("trial:", trial, ", best-so-far fitness:", f_best, ", best-so-far GP:") 
        gp_best.tree_print()
    print("END OF RUN (T = ", time.perf_counter() - tick, "seconds)")
    
# References
# [1] M. Herrmann. Natural Computing Tutorial 2: Particle Swarm Optimisation. INFR1161, University of Edinburgh, 2021.