In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
!pip install deap
!pip install gymnasium[classic-control]
!pip install pygame # for animations

In [None]:
import numpy
import random
import gymnasium as gym
import operator
import matplotlib.pyplot as plt
import math
from operator import attrgetter

from deap import algorithms
from deap import base
from deap import creator
from deap import tools
from deap import gp

import multiprocessing

In [None]:
def protectedDiv(left, right):
    try: return truncate(left, 8) / truncate(right, 8)
    except ZeroDivisionError: return 0
    
def if_then_else(input, output1, output2):
    if input: return output1
    else: return output2

def limit(input, minimum, maximum):
    return min(max(input,minimum), maximum)

# helper function to limit decimal places
def truncate(number, decimals=0):
    if not isinstance(decimals, int):
        raise TypeError("decimal places must be an integer.")
    elif decimals < 0:
        raise ValueError("decimal places has to be 0 or more.")
    elif decimals == 0:
        return math.trunc(number)
    factor = 10.0 ** decimals
    return math.trunc(number * factor) / factor

In [None]:
# Set up primitives and terminals using DEAP syntax
obs_size = 6 # Acrobot has 6 variables in each observation
pset = gp.PrimitiveSet("MAIN", obs_size) 
pset.addPrimitive(operator.add, 2)
pset.addPrimitive(operator.sub, 2)
pset.addPrimitive(operator.mul, 2)
pset.addPrimitive(protectedDiv, 2)
pset.addPrimitive(math.sin, 1)
pset.addPrimitive(limit, 3)

In [None]:
env_noviz = gym.make("Acrobot-v1")
env_viz = gym.make("Acrobot-v1", render_mode="human")

In [None]:
def action_wrapper(action):
    if action < 0:
        return 0 #left force
    elif action > 0:
        return 2 #right force
    else:
        return 1 #0 force

In [None]:
# evaluates the fitness of an individual policy
def evalRL(policy, vizualize=False):
    env = env_viz if vizualize else env_noviz
    num_episode = 20
    # transform expression tree to functional Python code
    get_action = gp.compile(policy, pset)
    fitness = 0
    for x in range(0, num_episode):
        done = False
        truncated = False
        # reset environment and get first observation
        observation = env.reset()
        observation = observation[0]
        episode_reward = 0
        num_steps = 0
        # evaluation episode
        while not (done or truncated):
            # use the expression tree to compute action
            action = get_action(observation[0], observation[1], observation[2], observation[3], observation[4], observation[5])
            action = action_wrapper(action)
            try:
                observation, reward, done, truncated, info = env.step(action)
            except:
                return (0,)
            episode_reward += reward
            num_steps += 1
        fitness += episode_reward
    return (fitness / num_episode,)

In [None]:
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)

toolbox = base.Toolbox()
toolbox.register("expr", gp.genFull, pset=pset, min_=2, max_=3)
toolbox.register("individual", tools.initIterate, creator.Individual,
                 toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", evalRL)
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("mate", gp.cxOnePoint)
toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)
toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17))
toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17))

In [None]:
random.seed(42)
# set to the number of cpu cores available
num_parallel_evals = 4

population_size = 48
num_generations = 100
prob_xover = 0.9
prob_mutate = 0.1

pop = toolbox.population(n=population_size)

# HallOfFame archives the best individuals found so far,
# even if they are deleted from the population.
hof = tools.HallOfFame(1)  # We keep the single best.

# configues what stats we want to track
stats_fit = tools.Statistics(lambda ind: ind.fitness.values)
stats_size = tools.Statistics(len)
mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
mstats.register("avg", numpy.mean)
mstats.register("std", numpy.std)
mstats.register("min", numpy.min)
mstats.register("max", numpy.max)

#setup parallel evaluations
pool = multiprocessing.Pool(processes=num_parallel_evals)
toolbox.register("map", pool.map)

# run the evolutionary algorithm
pop, log = algorithms.eaSimple(
    pop,
    toolbox,
    prob_xover,
    prob_mutate,
    num_generations,
    stats=mstats,
    halloffame=hof,
    verbose=True
)

pool.close()

best_fits = log.chapters["fitness"].select("max")
best_fit = truncate(hof[0].fitness.values[0], 0)

print("Best fitness: " + str(best_fit))
print(hof[0])

In [None]:
plt.plot(best_fits)
plt.xlabel('Generation')
plt.ylabel('Best Fitness')
plt.show()

In [None]:
evalRL(policy=hof[0], vizualize=True)