# Evolving a Lunar Lander with differentiable Genetic Programming

# Setup

In [1]:
import gymnasium as gym
import numpy as np

from genepro.node_impl import *
from genepro.evo import Evolution
from genepro.node_impl import Constant

import torch
import torch.optim as optim

import random
import os
import copy
from collections import namedtuple, deque

import matplotlib.pyplot as plt
from matplotlib import animation

In [2]:
env = gym.make("LunarLander-v2", render_mode="rgb_array")

In [3]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

    def __iadd__(self, other):
      self.memory += other.memory
      return self 

    def __add__(self, other):
      self.memory = self.memory + other.memory 
      return self

In [4]:
frames = []

### Creating the leaf nodes
The gym environment returns states that represent where the lunar lander is at the current frame. These states are used to calculate the next move of the lander. For each state, we create a leaf node that can be used in the tree that represent the solution.

In [5]:
# Create environment to find how many features it returns
env = gym.make("LunarLander-v2", continuous=True)
num_features = env.observation_space.shape[0]

# Create a feature for each value the evironment returns
leaf_nodes = [Feature(i) for i in range(num_features)]
# Add random constants to the features
leaf_nodes = leaf_nodes + [Constant()] 


# Evolution

## Utility functions

In [6]:
#new_x = np.multiply(x,np.array([-1,1,-1,1,-1,-1,1,1,1,-1]))
def transform_input(x):
    if x[0,0]>= 0:
        return x
    else:
         new_x = np.multiply(x,np.array([-1,1,-1,1,-1,-1,1,1]))
         new_x[0,6] = x[0,7]
         new_x[0,7] = x[0,6]
    return new_x

def transform_full_input(x):
    if x[0,0]>= 0:
        return x
    else:
         new_x = np.multiply(x,np.array([-1,1,-1,1,-1,-1,1,1,1,-1]))
         new_x[0,6] = x[0,7]
         new_x[0,7] = x[0,6]
    return new_x

def transform_action(x,action):
    if x[0,0]>= 0:
        return action
    else:
        return -action

## Fitness Function

Here you get to be creative. The default setup evaluates 5 episodes of 300 frames. Think of what action to pick and what fitness function to use. The Multi-tree takes an input of $n \times d$ where $n$ is a batch of size 1.

In [7]:
# def fitness_function_pt(multitree, num_episodes=5, episode_duration=300, render=False, ignore_done=False):
#   memory = ReplayMemory(10000)
#   rewards = []

#   for _ in range(num_episodes):
#     # get initial state of the environment
#     observation = env.reset()
#     observation = observation[0]
#     action = np.array([0,0])

#     for _ in range(episode_duration):
#       if render:
#         frames.append(env.render())

#       input_sample = torch.from_numpy(observation.reshape((1,-1))).float()
      
#       # Improvement symmetry
#       # Mirror the input, and use that as the input
#       input = observation.reshape((1,-1))       
#       mirrored_input = transform_input(input)
#       # outputs = multitree.get_child_outputs(mirrored_input) # Get the outputs for the mirrored input, does this work? --W
#       outputs = multitree.get_output_pt(mirrored_input) # Get the outputs for the mirrored input, does this work? --W
#       # outputs = multitree.get_child_outputs(input)
#       outputs = multitree.get_output_pt(input)
#       # outputs = multitree.get_child_outputs(outputs)

#       main_engine = np.squeeze((outputs[0]))            
#       side_engine = np.squeeze((outputs[1]))

#       # action = torch.argmax(multitree.get_output_pt(input_sample))
#       # action = np.array([main_engine, mirrored_side_engine])
      
#       # Transform the side engine back from the mirrored one, and use that for the action
#       mirrored_side_engine = transform_action(input, side_engine)
#       action = np.array([main_engine, mirrored_side_engine])
#       # action = torch.argmax(np.array([main_engine, mirrored_side_engine]))
#       # action to tensor 
#       action = torch.from_numpy(action).float()
#       # observation, reward, terminated, truncated, info = env.step(action.item())
#       # observation, reward, done, _ = env.step(action)
#       observation, reward, terminated, truncated, info =env.step(action.item())

#       rewards.append(reward)
#       output_sample = torch.from_numpy(observation.reshape((1,-1))).float()
#       memory.push(input_sample, torch.tensor([[action.item()]]), output_sample, torch.tensor([reward]))
#       if (terminated or truncated) and not ignore_done:
#         break

#   fitness = np.sum(rewards)
  
#   return fitness, memory

In [8]:
def fitness_function_pt(multitree, num_episodes=5, episode_duration=300, render=False, ignore_done=False):
  memory = ReplayMemory(10000)
  rewards = []

  for _ in range(num_episodes):
    # get initial state of the environment
    observation = env.reset()
    observation = observation[0]
    action = np.array([0,0])

    for _ in range(episode_duration):
      if render:
        frames.append(env.render())

      input_sample = torch.from_numpy(observation.reshape((1,-1))).float()
      
      # Improvement symmetry
      # Mirror the input, and use that as the input
      input = observation.reshape((1,-1))       
      outputs = multitree.get_output_pt(torch.from_numpy(transform_input(input))) # Get the outputs for the mirrored input, does this work? --W
      print(outputs.shape)

      # !!! Transform the output from [1, 4] dimension tensor to [4] dimension array
      [ do_nothing, left_thruster, main_thruster, right_thruster ] = np.squeeze(outputs.tolist())
      
      # !!! Transform the orientation of the right thruster based on location, and use that for the action???
      mirrored_right_thruster = transform_action(input, right_thruster)

      # Here we store the new action values, with the mirrored value replacing the right thruster.
      actions = np.array([[do_nothing, left_thruster, main_thruster, mirrored_right_thruster]])
        
      # !!! Here you need some logic to pick the action
      action = torch.argmax(torch.from_numpy(actions))

      # !!! I tried the default way of picking actions, which also produces an error...
      #action = torch.argmax(multitree.get_output_pt(input_sample))
        
      observation, reward, terminated, truncated, info =env.step(action.item())

      rewards.append(reward)
      output_sample = torch.from_numpy(observation.reshape((1,-1))).float()
      memory.push(input_sample, torch.tensor([[action.item()]]), output_sample, torch.tensor([reward]))
      if (terminated or truncated) and not ignore_done:
        break

  fitness = np.sum(rewards)
  
  return fitness, memory


## Baseline

In [9]:
np.random.seed(42)

num_features = env.observation_space.shape[0]
leaf_nodes = [Feature(i) for i in range(num_features)] + [Constant() for _ in range(1)]
internal_nodes = [Plus(), Minus(), Times(), Div(), Square(), Sqrt(), Log(), Sin(), Cos(), Max(), Min()] #Add your own operators here

# Run a few times to collect Data
for _ in range(5):
  evo = Evolution(
    fitness_function_pt, internal_nodes, leaf_nodes,
    4,
    pop_size=16,
    max_gens=2,
    max_tree_size=31,
    n_jobs=1,
    # pop_size=256,
    # max_gens=50,
    # max_tree_size=31,
    # n_jobs=8,
    log_data=True,
    verbose=True)
  
  evo.evolve()

Generation: 1
	Best - 	Fitness: 0.000, Size: 12
	Population - 	Average Fitness: 0.0
	mean = 0.00, 	median = 0.00, 	best_median = 0.00

torch.Size([1, 4])


IndexError: invalid index to scalar variable.

## Save models

In [None]:
import pickle

with open('baseline.pickle', 'wb') as file:
    # Pickle the 'data' dictionary using the highest protocol available.
    pickle.dump(evo, file, pickle.HIGHEST_PROTOCOL)

# Test

In [None]:
def get_test_score(tree):
    rewards = []

    for i in range(10):
      # get initial state
      observation = env.reset(seed=i)
      observation = observation[0]

      for _ in range(500):    
        # build up the input sample for GP
        input_sample = torch.from_numpy(observation.reshape((1,-1))).float()
        # get output (squeezing because it is encapsulated in an array)
        output = tree.get_output_pt(input_sample)
        action = torch.argmax(output)
        observation, reward, terminated, truncated, info = env.step(action.item())
        rewards.append(reward)

        output_sample = torch.from_numpy(observation.reshape((1,-1))).float()
        if (terminated or truncated):
            break

    fitness = np.sum(rewards)
    
    return fitness

best = evo.best_of_gens[-1]

print(best.get_readable_repr())
print(get_test_score(best))

## Make an animation
Here the best evolved individual is selected and one episode is rendered. Make sure to save your lunar landers over time to track progress and make comparisons.

In [None]:
frames = []

# gist to save gif from https://gist.github.com/botforge/64cbb71780e6208172bbf03cd9293553
def save_frames_as_gif(frames, path='./', filename='evolved_lander.gif'):
  plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi=72)
  patch = plt.imshow(frames[0])
  plt.axis('off')
  def animate(i):
      patch.set_data(frames[i])
  anim = animation.FuncAnimation(plt.gcf(), animate, frames = len(frames), interval=50)
  anim.save(path + filename, writer='imagemagick', fps=60)

frames = []
fitness_function_pt(best, num_episodes=1, episode_duration=500, render=True, ignore_done=False)
env.close()
save_frames_as_gif(frames)

## Play animation

<img src="evolved_lander.gif" width="750">

## Optimisation
The coefficients in the multi-tree aren't optimised. Here Q-learning (taken from https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html) is used to optimise the weights further. Incorporate coefficient optimisation in training your agent(s). Coefficient Optimisation can be expensive. Think about how often you want to optimise, when, which individuals etc.

In [None]:
batch_size = 128
GAMMA = 0.99

constants = best.get_subtrees_consts()

if len(constants)>0:
  optimizer = optim.AdamW(constants, lr=1e-3, amsgrad=True)

for _ in range(500):

  if len(constants)>0 and len(evo.memory)>batch_size:
    target_tree = copy.deepcopy(best)

    transitions = evo.memory.sample(batch_size)
    batch = Transition(*zip(*transitions))
    
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                        batch.next_state)), dtype=torch.bool)

    non_final_next_states = torch.cat([s for s in batch.next_state
                                               if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    state_action_values = best.get_output_pt(state_batch).gather(1, action_batch)
    next_state_values = torch.zeros(batch_size, dtype=torch.float)
    with torch.no_grad():
      next_state_values[non_final_mask] = target_tree.get_output_pt(non_final_next_states).max(1)[0].float()

    expected_state_action_values = (next_state_values * GAMMA) + reward_batch
    
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
   
    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_value_(constants, 100)
    optimizer.step()

print(best.get_readable_repr())
print(get_test_score(best))

In [None]:
frames = []
fitness_function_pt(best, num_episodes=1, episode_duration=500, render=True, ignore_done=False)
env.close()
save_frames_as_gif(frames, filename='evolved_lander_RL.gif')

<img src="evolved_lander_RL.gif" width="750">