# Linear Genetic Programming

This notebook implements Linear Genetic Programming (LGP) to solve the LunarLander reinforcement learning environment. 
We'll evolve interpretable control policies that map observations to actions without using complex neural networks.

## Introduction to Linear Genetic Programming

Linear Genetic Programming (LGP) is a variant of Genetic Programming where programs are represented as sequences of 
instructions that operate on registers. Each instruction typically follows the format:

```
r[destination] = r[source1] operation r[source2]
```

Unlike tree-based Genetic Programming where programs are represented as parse trees, LGP uses a linear sequence of 
instructions, making it more similar to actual computer programs.

Benefits of LGP include:
- More natural representation for imperative programming constructs
- Efficient execution due to sequential processing
- Ability to evolve more compact, interpretable solutions
- Natural support for information reuse through register values

## Implementation Overview

Our implementation consists of several key components:
1. The core LGP representation (instructions and programs)
2. The evolutionary algorithm framework
3. Environment interaction and evaluation
4. Visualization and analysis

Let's start by importing the necessary libraries:

In [None]:
import gymnasium as gym
import numpy as np
import random
import matplotlib.pyplot as plt
from typing import List, Tuple, Dict, Any, Callable
import copy
import time

In [None]:
# Set random seeds for reproducibility
np.random.seed(42)
random.seed(42)

## 1. LGP Core Representation

### 1.1 LGP Instruction

The basic building block of our LGP programs is the instruction. Each instruction performs a simple operation
on register values and stores the result in a destination register.

In [None]:
class LGPInstruction:
   """
   Represents a single instruction in Linear Genetic Programming
   
   Format: r[dest] = operation(r[src1], r[src2])
   """
   def __init__(self, dest: int, op: str, src1: int, src2: int):
       self.dest = dest  # Destination register
       self.op = op      # Operation
       self.src1 = src1  # Source register 1
       self.src2 = src2  # Source register 2
       
   def __str__(self):
       return f"r[{self.dest}] = r[{self.src1}] {self.op} r[{self.src2}]"
   
   def execute(self, registers: np.ndarray) -> np.ndarray:
       """Execute this instruction on the registers"""
       val1 = registers[self.src1]
       val2 = registers[self.src2]
       
       # Apply the appropriate operation
       if self.op == '+':
           result = val1 + val2
       elif self.op == '-':
           result = val1 - val2
       elif self.op == '*':
           result = val1 * val2
       elif self.op == '/':
           # Protected division to avoid division by zero
           result = val1 / val2 if abs(val2) > 1e-10 else val1
       elif self.op == 'sin':
           result = np.sin(val1)
       elif self.op == 'cos':
           result = np.cos(val1)
       elif self.op == 'exp':
           # Protected exp to avoid overflow
           result = np.exp(val1) if val1 < 10 else np.exp(10)
       elif self.op == 'log':
           # Protected log to handle non-positive inputs
           result = np.log(abs(val1)) if abs(val1) > 1e-10 else 0
       elif self.op == '<':
           result = 1.0 if val1 < val2 else 0.0
       elif self.op == '>':
           result = 1.0 if val1 > val2 else 0.0
       elif self.op == 'min':
           result = val2 if val1 > val2 else val1
       elif self.op == 'max':
           result = val1 if val1 > val2 else val2
       else:
           raise ValueError(f"Unknown operation: {self.op}")
       
       # Store the result in the destination register
       registers[self.dest] = result
       return registers

### 1.2 LGP Program

An LGP program consists of a sequence of instructions that operate on a set of registers.
The program takes inputs through the first n_inputs registers and returns outputs from the last n_outputs registers.

In [None]:
class LGPProgram:
   """
   Represents a Linear Genetic Programming program
   """
   def __init__(self, instructions: List[LGPInstruction], n_registers: int, 
               n_inputs: int, n_outputs: int, constants: List[float] = None):
       self.instructions = instructions
       self.n_registers = n_registers
       self.n_inputs = n_inputs
       self.n_outputs = n_outputs
       self.constants = constants or [0.0, 1.0, -1.0, 0.5, -0.5, 0.1, -0.1, np.pi]
       self.fitness = None
       
   def copy(self):
       """Create a deep copy of this program"""
       return LGPProgram(
           instructions=[copy.deepcopy(i) for i in self.instructions],
           n_registers=self.n_registers,
           n_inputs=self.n_inputs,
           n_outputs=self.n_outputs,
           constants=self.constants.copy()
       )
       
   def execute(self, inputs: np.ndarray) -> np.ndarray:
       """
       Execute this program on the given inputs
       inputs: Array of shape (n_inputs,)
       returns: Array of shape (n_outputs,)
       """
       # Initialize registers
       registers = np.zeros(self.n_registers)
       
       # Set input registers
       registers[:self.n_inputs] = inputs
       
       # Set constant registers
       constants_start = self.n_inputs
       constants_end = constants_start + len(self.constants)
       registers[constants_start:constants_end] = self.constants
       
       # Execute instructions
       for instruction in self.instructions:
           registers = instruction.execute(registers)
           
       # Return output registers
       return registers[-self.n_outputs:]
   
   def __str__(self):
       result = f"LGP Program with {len(self.instructions)} instructions:\n"
       for i, instruction in enumerate(self.instructions):
           result += f"{i}: {instruction}\n"
       return result

## 2. Evolutionary Algorithm Framework

The evolutionary algorithm implements the process of evolving LGP programs through selection,
crossover, and mutation over multiple generations. The key components include:
- Population initialization
- Selection mechanisms (tournament and truncation)
- Genetic operators (crossover and mutation)
- Population evaluation and replacement

In [None]:
class LGPEvolution:
   """
   Implements the evolutionary process for Linear Genetic Programming
   """
   def __init__(self, 
               n_registers: int = 16,
               n_inputs: int = 8, 
               n_outputs: int = 4,
               population_size: int = 100,
               tournament_size: int = 3,
               n_elites: int = 2,
               p_crossover: float = 0.9,
               p_mutation_instruction: float = 0.2,
               p_mutation_dest: float = 0.3,
               p_mutation_op: float = 0.2,
               p_mutation_src: float = 0.2,
               init_min_length: int = 5,
               init_max_length: int = 15,
               max_length: int = 30,
               operations: List[str] = None):
       
       # Configuration parameters
       self.n_registers = n_registers
       self.n_inputs = n_inputs
       self.n_outputs = n_outputs
       self.population_size = population_size
       self.tournament_size = tournament_size
       self.n_elites = n_elites
       self.p_crossover = p_crossover
       self.p_mutation_instruction = p_mutation_instruction
       self.p_mutation_dest = p_mutation_dest
       self.p_mutation_op = p_mutation_op
       self.p_mutation_src = p_mutation_src
       self.init_min_length = init_min_length
       self.init_max_length = init_max_length
       self.max_length = max_length
       self.operations = operations or ['+', '-', '*', '/', 'sin', 'cos', '<', '>', 'min', 'max']
       
       # Initialize population
       self.population = self.initialize_population()
       self.best_individual = None
       self.best_fitness = float("-inf")
       self.fitness_history = []
       
   # ### 2.1 Population Initialization
   #
   # We start by creating a diverse population of random LGP programs with varying lengths.
   
   def initialize_population(self) -> List[LGPProgram]:
       """Initialize a random population of LGP programs"""
       population = []
       
       for _ in range(self.population_size):
           # Determine program length
           length = random.randint(self.init_min_length, self.init_max_length)
           
           # Generate random instructions
           instructions = []
           for _ in range(length):
               # For outputs, we only want to write to output registers
               dest = random.randint(self.n_inputs, self.n_registers - 1)
               op = random.choice(self.operations)
               src1 = random.randint(0, self.n_registers - 1)
               src2 = random.randint(0, self.n_registers - 1)
               
               instructions.append(LGPInstruction(dest, op, src1, src2))
           
           # Create program
           program = LGPProgram(
               instructions=instructions,
               n_registers=self.n_registers,
               n_inputs=self.n_inputs,
               n_outputs=self.n_outputs
           )
           
           population.append(program)
           
       return population
   
   # ### 2.2 Selection Mechanisms
   #
   # We implement two selection methods:
   # - Truncation selection: Selects the top N individuals based on fitness
   # - Tournament selection: Randomly selects k individuals and returns the best one
   
   def truncation_selection(self, population: List[LGPProgram], n: int) -> List[LGPProgram]:
       """Select the n best individuals from the population using truncation selection."""
       # Sort population by fitness (in descending order)
       sorted_population = sorted(
           population, 
           key=lambda ind: ind.fitness if ind.fitness is not None else float("-inf"),
           reverse=True
       )
       
       # Select the top n individuals
       selected = sorted_population[:n]
       
       return selected

   def tournament_selection(self, population: List[LGPProgram]) -> LGPProgram:
       """Select an individual using tournament selection"""
       # Randomly select tournament_size individuals
       tournament = random.sample(population, self.tournament_size)
       
       # Return the best individual in the tournament
       return max(tournament, key=lambda p: p.fitness if p.fitness is not None else float("-inf"))
   
   # ### 2.3 Genetic Operators
   #
   # We implement two main genetic operators:
   # - Crossover: Exchanges instruction segments between two parent programs
   # - Mutation: Modifies instruction components or adds/removes/swaps instructions
   
   def crossover(self, parent1: LGPProgram, parent2: LGPProgram) -> Tuple[LGPProgram, LGPProgram]:
       """Perform crossover between two parent programs"""
       if random.random() > self.p_crossover or len(parent1.instructions) < 2 or len(parent2.instructions) < 2:
           # No crossover
           return parent1.copy(), parent2.copy()
       
       # One-point crossover
       child1 = parent1.copy()
       child2 = parent2.copy()
       
       # Select crossover points
       point1 = random.randint(1, len(parent1.instructions) - 1)
       point2 = random.randint(1, len(parent2.instructions) - 1)
       
       # Swap instructions
       temp1 = child1.instructions[point1:]
       temp2 = child2.instructions[point2:]
       
       child1.instructions = child1.instructions[:point1] + temp2
       child2.instructions = child2.instructions[:point2] + temp1
       
       # Ensure length constraints
       if len(child1.instructions) > self.max_length:
           child1.instructions = child1.instructions[:self.max_length]
       if len(child2.instructions) > self.max_length:
           child2.instructions = child2.instructions[:self.max_length]
           
       return child1, child2
   
   def mutate(self, program: LGPProgram) -> LGPProgram:
       """Mutate a program"""
       mutated = program.copy()
       
       # 1. Mutate instructions
       if random.random() < self.p_mutation_instruction and len(mutated.instructions) > 0:
           # Randomly choose to add, remove, or swap an instruction
           mutation_type = random.choice(['add', 'remove', 'swap'])
           
           if mutation_type == 'add' and len(mutated.instructions) < self.max_length:
               # Add a new random instruction
               dest = random.randint(self.n_inputs, self.n_registers - 1)
               op = random.choice(self.operations)
               src1 = random.randint(0, self.n_registers - 1)
               src2 = random.randint(0, self.n_registers - 1)
               
               new_instruction = LGPInstruction(dest, op, src1, src2)
               insert_pos = random.randint(0, len(mutated.instructions))
               mutated.instructions.insert(insert_pos, new_instruction)
               
           elif mutation_type == 'remove' and len(mutated.instructions) > 1:
               # Remove a random instruction
               remove_pos = random.randint(0, len(mutated.instructions) - 1)
               mutated.instructions.pop(remove_pos)
               
           elif mutation_type == 'swap' and len(mutated.instructions) > 1:
               # Swap two random instructions
               pos1 = random.randint(0, len(mutated.instructions) - 1)
               pos2 = random.randint(0, len(mutated.instructions) - 1)
               mutated.instructions[pos1], mutated.instructions[pos2] = mutated.instructions[pos2], mutated.instructions[pos1]
       
       # 2. Mutate instruction components
       for instruction in mutated.instructions:
           # Mutate destination register
           if random.random() < self.p_mutation_dest:
               instruction.dest = random.randint(self.n_inputs, self.n_registers - 1)
               
           # Mutate operation
           if random.random() < self.p_mutation_op:
               instruction.op = random.choice(self.operations)
               
           # Mutate source registers
           if random.random() < self.p_mutation_src:
               instruction.src1 = random.randint(0, self.n_registers - 1)
           if random.random() < self.p_mutation_src:
               instruction.src2 = random.randint(0, self.n_registers - 1)
               
       return mutated
   
   # ### 2.4 Population Evaluation and Evolution
   #
   # We evaluate the population and evolve it over multiple generations.
   
   def evaluate_population(self, eval_func: Callable[[LGPProgram], float]) -> None:
       """Evaluate all individuals in the population"""
       for individual in self.population:
           individual.fitness = eval_func(individual)
           
           # Update best individual
           if individual.fitness > self.best_fitness:
               self.best_fitness = individual.fitness
               self.best_individual = individual.copy()
   
   def evolve(self, eval_func: Callable[[LGPProgram], float], 
             n_generations: int = 100, verbose: bool = True) -> LGPProgram:
       """Run the evolutionary process"""
       # Initial evaluation
       self.evaluate_population(eval_func)
       self.fitness_history.append(self.best_fitness)
       
       if verbose:
           print(f"Generation 0: Best fitness = {self.best_fitness:.4f}")
       
       for generation in range(1, n_generations + 1):
           # Create new population with elitism (keeping best individuals)
           new_population = self.truncation_selection(self.population, self.n_elites)
           
           # Fill the rest of the population with offspring from crossover and mutation
           while len(new_population) < self.population_size:
               # Select parents
               parent1 = self.tournament_selection(self.population)
               parent2 = self.tournament_selection(self.population)
               
               # Crossover
               child1, child2 = self.crossover(parent1, parent2)
               
               # Mutation
               child1 = self.mutate(child1)
               child2 = self.mutate(child2)
               
               # Add to new population
               new_population.append(child1)
               new_population.append(child2)
           
           # Truncate if necessary
           if len(new_population) > self.population_size:
               new_population = new_population[:self.population_size]
           
           # Replace population
           self.population = new_population
           
           # Evaluate new population
           self.evaluate_population(eval_func)
           self.fitness_history.append(self.best_fitness)
           
           if verbose and generation % 10 == 0:
               print(f"Generation {generation}: Best fitness = {self.best_fitness:.4f}")
       
       return self.best_individual

## 3. Environment Interaction and Evaluation

We'll use OpenAI Gym's LunarLander environment to evaluate our LGP programs.
This environment requires the agent to learn how to land a lunar module safely on a landing pad.

In [None]:
class LunarLanderLGP:
   def __init__(self, episodes_per_eval: int = 5, max_steps: int = 1000):
       self.env = gym.make('LunarLander-v3')
       self.episodes_per_eval = episodes_per_eval
       self.max_steps = max_steps
       
   def evaluate_program(self, program: LGPProgram) -> float:
       """Evaluate an LGP program on the LunarLander environment"""
       total_reward = 0
       
       for _ in range(self.episodes_per_eval):
           observation, info = self.env.reset(seed=random.randint(0, 10000))
           episode_reward = 0
           
           for step in range(self.max_steps):
               # Get action from program
               action_values = program.execute(observation)
               action = np.argmax(action_values)  # Choose action with highest value
               
               # Take action in environment
               observation, reward, terminated, truncated, info = self.env.step(action)
               episode_reward += reward
               
               if terminated or truncated:
                   break
           
           total_reward += episode_reward
       
       avg_reward = total_reward / self.episodes_per_eval
       return avg_reward
   
   def render_best_program(self, program: LGPProgram, n_episodes: int = 3) -> None:
       """Render the best program's performance"""
       for episode in range(n_episodes):
           env_render = gym.make('LunarLander-v3', render_mode='human')
           observation, info = env_render.reset(seed=episode)
           episode_reward = 0
           
           for step in range(self.max_steps):
               # Get action from program
               action_values = program.execute(observation)
               action = np.argmax(action_values)
               
               # Take action in environment
               observation, reward, terminated, truncated, info = env_render.step(action)
               episode_reward += reward
               
               if terminated or truncated:
                   break
           
           print(f"Episode {episode+1} Reward: {episode_reward:.2f}")
           env_render.close()

## 4. Running the Evolution and Visualizing Results

Now we can run the evolution process and visualize the results.

In [None]:
def run_lgp_evolution():
   # Initialize LunarLander environment for LGP
   lunar_lander = LunarLanderLGP(episodes_per_eval=3, max_steps=1000)
   
   # Configure LGP evolution
   lgp_evolution = LGPEvolution(
       n_registers=20,               # Number of registers
       n_inputs=8,                   # LunarLander has 8 inputs
       n_outputs=4,                  # LunarLander has 4 actions
       population_size=30,
       tournament_size=3,
       n_elites=5,                   # Number of elite individuals to keep
       p_crossover=0.6,
       p_mutation_instruction=0.2,
       p_mutation_dest=0.5,
       p_mutation_op=0.5,
       p_mutation_src=0.5,
       init_min_length=8,
       init_max_length=20,
       max_length=30,
       operations=['+', '-', '*', '/', 'sin', 'cos', '<', '>', 'min', 'max']
   )
   
   # Evolution process
   print("Starting LGP Evolution for LunarLander...")
   start_time = time.time()
   
   best_program = lgp_evolution.evolve(
       eval_func=lunar_lander.evaluate_program,
       n_generations=50,
       verbose=True
   )
   
   end_time = time.time()
   elapsed_time = end_time - start_time
   
   print(f"\nEvolution completed in {elapsed_time:.2f} seconds")
   print("\nBest program:")
   print(best_program)
   
   # Plot fitness history
   plt.figure(figsize=(10, 6))
   plt.plot(lgp_evolution.fitness_history)
   plt.title('LGP Evolution Fitness History')
   plt.xlabel('Generation')
   plt.ylabel('Fitness (Average Reward)')
   plt.grid(True)
   plt.savefig('lgp_fitness_history.png')
   plt.show()
   
   # Visualize best program
   print("\nRendering best program performance...")
   lunar_lander.render_best_program(best_program, n_episodes=3)
   
   return best_program, lgp_evolution

In [None]:
best_program, lgp_evolution = run_lgp_evolution()

## 5. Exercise: Extending to Quality-Diversity Optimization

Our current implementation focuses only on finding a single high-performing solution.
However, the search gets stuck at local optima, limiting the result. Quality-Diversity Optimization can help explore new behaviors, leading to better policies in the end.

The pyribs library provides tools for Quality-Diversity Optimization, including the 
MAP-Elites algorithm and its variants.

For an exercise, try extending this implementation to use Quality-Diversity Optimization.
Here's a sketch of how you might start if you use pyribs. Another option is to modify the above code, adding a Novelty fitness term to encourage exploration.


In [None]:
import ribs
from ribs.archives import GridArchive
from ribs.emitters import GeneticAlgorithmEmitter
from ribs.schedulers import Scheduler

# Define behavior descriptors for LunarLander
# e.g., landing speed and fuel consumption

# Create archive
archive = GridArchive(
    solution_dim=...,  # Dimension of solution
    dims=[10, 10],     # Number of bins for each behavior descriptor
    ranges=[[-5, 5], [0, 100]],  # Ranges for each behavior descriptor
)

# Create emitter
emitter = GeneticAlgorithmEmitter(
    archive=archive,
    x0=initial_solution,
    batch_size=population_size,
    ...
)

# Create scheduler
scheduler = Scheduler([emitter])

# Run optimization
for _ in range(iterations):
    solutions = scheduler.ask()
    
    # Evaluate solutions and collect behavior descriptors
    objectives = []
    behaviors = []
    
    for solution in solutions:
        # Convert solution to LGP program
        program = solution_to_program(solution)
        
        # Evaluate program
        objective, behavior = evaluate_program_with_behavior(program)
        
        objectives.append(objective)
        behaviors.append(behavior)
    
    # Tell scheduler results
    scheduler.tell(objectives, behaviors)