In [1]:
from pydantic.main import Model

from agent import Agent
from tetris.game import Game
import numpy as np
import nest_asyncio
from tqdm import tqdm
nest_asyncio.apply()

# Evolutionary Algorithm
Fitness:
- Score weights based on average final Tetris score over N random games
Selection:
- Use fitness proportional selection to select top K performing options
Reproduction:
- Parent A contributes 1-4 random genes and Parent B contributes the compliment
Mutation:
- Apply m operations in sequence
  - No-op
  - Swap genes (i, j) 
  - Add/subtract value from gene i
  - Double/half value of gene i   

In [6]:
from pydantic import BaseModel, ConfigDict
from typing import List, Tuple, Any, Awaitable, Callable, Dict

import concurrent.futures
import asyncio

class EvoAlgo(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    selection_rate: int = 0.2
    mutation_rate: float = 0.2
    num_mutations: int = 3
    crossover_rate: float = 0.2
    increment_vals: List[float] = [0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0]
    multiply_vals: List[float] = [0.5, 0.75, 1.33, 2]
    weight_constraint: Tuple[float, float] = (0, 10)
    population: List[Agent] = []
    best_weights: List[np.ndarray] = []
    best_scores: List[float] = []
    history: Dict[int, Dict[float, np.ndarray]] = {}
    seed: int = 87
    rng: np.random.Generator = np.random.default_rng(seed)
    
    def minimize(self, objective: Callable, num_generations: int = 10, population_size: int = 50) -> Tuple[List[np.ndarray], List[float], Dict[float, List[np.ndarray]]]:
        weights = self.rng.random(size=(population_size, 5)) * self.weight_constraint[1]
        agents = [Agent(weights=w) for w in weights]
        self.history = {}
        scores = []
        for g in range(num_generations):
            if g > 0:
                # Start new generation
                keep_agents = self.select(agents, scores)
                child_agents = self.crossover(keep_agents)
                mutated_agents = self.mutate(child_agents)
                agents = mutated_agents
            with concurrent.futures.ProcessPoolExecutor() as executor:
                scores = executor.map(objective, agents)
            scores = list(scores)
            
            for score, agent in zip(scores, agents):
                self.history.setdefault(g, {}).setdefault(score, []).append(agent.weights)

            self.best_scores.append(np.max(scores))
            self.best_weights.append(agents[np.argmax(scores)].weights)
            print(f"Best score was {self.best_scores[-1]} with weights {self.best_weights[-1]}")
            
        return self.best_weights, self.best_scores, self.history
    
    def select(self, agents: List[Agent], scores: List[float]) -> List[Agent]:
        """Stochastic uniform sampling"""
        fps_scores = np.array(scores) / sum(scores)
        order = np.argsort(fps_scores)
        fps_scores = fps_scores[order]
        agents = [agents[i] for i in order]
        
        roulette_bins = np.cumsum(fps_scores)
        
        # Generate evenly spaced pointers
        N = len(agents)
        pointers = np.arange(N) / N
        offset = np.random.rand()  # Spin the roulette wheel!
        pointers = pointers + offset
        pointers[pointers > 1] = pointers[pointers > 1] - 1
        # Use digitize to select items
        keep_inds = np.digitize(pointers, roulette_bins)
        return [agents[i] for i in keep_inds]
    
    def crossover(self, agents: List[Agent]) -> List[Agent]:
        def _crossover_fn(a: Agent, b: Agent) -> Agent:
            genes = self.rng.permutation(5)
            selection = self.rng.choice(np.arange(4) + 1)
            new_weight = np.zeros(5)
            # Genetic selection should be complementary
            a_genes = genes[:selection]
            b_genes = genes[selection:]
            new_weight[a_genes] = a.weights[a_genes]
            new_weight[b_genes] = b.weights[b_genes]
            return Agent(weights=new_weight)
        
        new_agents = []
        for agent in agents:
            if self.rng.random() < self.crossover_rate:
                agent_coparent = self.rng.choice(agents)
                new_agents.append(_crossover_fn(agent, agent_coparent))
            else:
                new_agents.append(agent)
        return new_agents
    
    def _increment_gene(self, agent: Agent) -> Agent:
        i = self.rng.choice(5)
        val = self.rng.choice([-1, 1]) * self.rng.choice(self.increment_vals)
        agent.weights[i] += val
        agent.weights = np.clip(agent.weights, self.weight_constraint[0], self.weight_constraint[1])
        return agent
    
    def _multiply_gene(self, agent: Agent) -> Agent:
        i = self.rng.choice(5)
        val = self.rng.choice(self.multiply_vals)
        agent.weights[i] *= val
        agent.weights = np.clip(agent.weights, self.weight_constraint[0], self.weight_constraint[1])
        return agent
    
    def _swap_gene(self, agent: Agent) -> Agent:
        i, j = self.rng.choice(5, size=2, replace=False)
        agent.weights[[j, i]] = agent.weights[[i, j]]
        return agent
    
    def mutate(self, agents: List[Agent]) -> List[Agent]:
        mutation_fns = [self._swap_gene, self._increment_gene, self._multiply_gene]
        mutation_fn_weights = [0.2, 0.6, 0.2]
        mutated_agents = []
        for agent in agents:
            if self.rng.random() < self.mutation_rate:
                for i in range(self.num_mutations):
                    mutation_fn = self.rng.choice(mutation_fns, p=mutation_fn_weights)
                    agent = mutation_fn(agent)
            mutated_agents.append(agent)

        # Ensure the best agent has at least one candidate
        mutated_agents[-1] = Agent(weights=self.best_weights[-1])
        return mutated_agents

In [4]:
GAME_SEED = 87
async def objective(agent: Agent) -> float:
    game = Game(agent, seed=GAME_SEED)
    results = []
    async for item in game.run():
        results.append(item)
    return game.score

In [64]:
algo = EvoAlgo(mutation_rate=0.8)

In [6]:
def process_agent(agent: Agent) -> float:
    return asyncio.run(objective(agent))


In [None]:
algo.minimize(process_agent, num_generations=10, population_size=100)

In [None]:
GAME_SEEDS = [87, 42, 101]
async def objective(agent: Agent, seed) -> float:
    game = Game(agent, seed=seed)
    results = []
    async for item in game.run():
        results.append(item)
    return game.score

def process_agent(agent: Agent) -> float:
    scores = []
    for seed in GAME_SEEDS:
        scores.append(asyncio.run(objective(agent, seed)))
    return np.mean(scores)


In [None]:
algo = EvoAlgo(mutation_rate=0.8, crossover_rate=0.4)

In [None]:
algo.minimize(process_agent, num_generations=10, population_size=20)

In [None]:
import concurrent.futures
import asyncio

num_trials = 10
game_seed = 42

async def objective(agent: Agent) -> float:
    game = Game(agent, seed=game_seed)
    async for _ in game.run():
        continue
    return game.score

def process_agent(agent: Agent) -> float:
    return asyncio.run(objective(agent))


agents = [Agent(np.random.randn(5)) for _ in range(num_trials)]
with concurrent.futures.ProcessPoolExecutor() as executor:
    scores = executor.map(process_agent, agents)
scores = list(scores)

# GA with 5 seeds

In [3]:
GAME_SEEDS = [73, 42, 101, 69, 987]
async def objective(agent: Agent, seed) -> float:
    game = Game(agent, seed=seed)
    results = []
    async for item in game.run():
        results.append(item)
    return game.score

def process_agent(agent: Agent) -> float:
    scores = []
    for seed in GAME_SEEDS:
        scores.append(asyncio.run(objective(agent, seed)))
    return np.mean(scores)


In [4]:
algo = EvoAlgo(mutation_rate=0.8, crossover_rate=0.4)

In [5]:
best_scores, best_weights, history = algo.minimize(process_agent, num_generations=10, population_size=20)

Best score was 40000.0 with weights [0.34622345 2.95584655 4.18676425 5.13021089 1.2361363 ]
Best score was 40000.0 with weights [0.34622345 4.4419328  4.28676425 5.13021089 1.47792327]
Best score was 40000.0 with weights [8.05534227 8.05337716 7.5986661  0.90625126 2.92657677]
Best score was 29840.0 with weights [0.44622345 5.90777063 4.28676425 6.82318048 1.47792327]
Best score was 40000.0 with weights [1.47792327 4.4419328  3.88676425 9.07483004 1.46792327]
Best score was 40000.0 with weights [1.96563795 4.6419328  3.88676425 9.02483004 1.46792327]
Best score was 40000.0 with weights [3.98676425 0.23784989 1.52792327 1.2061363  0.34622345]
Best score was 40000.0 with weights [1.96563795 4.7219328  3.39676425 8.02483004 1.46792327]
Best score was 40000.0 with weights [1.96563795 4.4419328  3.39676425 8.02483004 1.46792327]
Best score was 40000.0 with weights [2.61429848 4.1819328  4.32769645 8.01483004 1.43792327]


NameError: name 'history' is not defined

In [6]:
algo.history

{0: {9140.0: array([7.0010428 , 9.82102919, 3.7596524 , 7.93533889, 5.64248419]),
  15680.0: array([1.88571401, 5.39103801, 5.02116492, 7.30876796, 4.00647543]),
  610.0: array([9.25624864, 0.03412066, 0.47758348, 0.85098027, 8.83780385]),
  3210.0: array([5.29752653, 6.71983133, 3.08170028, 6.83454646, 6.24244872]),
  16260.0: array([ 6.92553618, 10.06527898,  3.31903872,  0.90625126,  0.05047459]),
  3810.0: array([7.12314253, 6.26352079, 2.55857868, 7.6906871 , 8.39120924]),
  8930.0: array([9.79994568, 7.68813925, 7.99821582, 0.04348765, 5.62675341]),
  40000.0: array([0.34622345, 2.95584655, 4.18676425, 5.03021089, 1.2361363 ]),
  3910.0: array([8.03351176, 7.60061054, 2.33041888, 0.88570469, 6.90067299]),
  16710.0: array([3.08090581, 8.05614784, 7.5986661 , 7.05857145, 1.86635076]),
  1530.0: array([7.12740311, 1.86370782, 3.96423882, 3.0651178 , 9.28019871]),
  9630.0: array([9.05925209, 9.00479474, 5.00792546, 9.80121916, 5.69249244]),
  1590.0: array([7.19658983, 2.30007769, 

# GA with 5 seeds, 100 agents

In [7]:
GAME_SEEDS = [32, 410, 99, 37, 8]
async def objective(agent: Agent, seed) -> float:
    game = Game(agent, seed=seed)
    results = []
    async for item in game.run():
        results.append(item)
    return game.score

def process_agent(agent: Agent) -> float:
    scores = []
    for seed in GAME_SEEDS:
        scores.append(asyncio.run(objective(agent, seed)))
    return np.mean(scores)


In [8]:
algo = EvoAlgo(mutation_rate=0.8, crossover_rate=0.4)

In [9]:
best_scores, best_weights, history = algo.minimize(process_agent, num_generations=10, population_size=100)

Best score was 30530.0 with weights [0.45980569 7.85032218 7.02579063 6.08156309 2.99888919]
Best score was 30530.0 with weights [6.95500591 7.85032218 7.02579063 6.08156309 2.99888919]
Best score was 30530.0 with weights [5.84437859 2.84828686 8.73765569 1.08301312 2.52757954]
Best score was 31000.0 with weights [ 7.61424701  3.38785169 10.          0.5129242   3.3403593 ]
Best score was 31000.0 with weights [ 7.66424701  3.38785169 10.          0.5129242   3.2903593 ]
Best score was 31000.0 with weights [ 7.66424701  3.38785169 10.          0.5129242   3.2903593 ]
Best score was 31000.0 with weights [ 7.66424701  3.38785169 10.          0.5129242   3.2403593 ]
Best score was 31000.0 with weights [ 7.76424701  3.40785169 10.          0.38469315  3.2403593 ]
Best score was 31000.0 with weights [ 7.76424701  3.40785169 10.          0.51164189  3.2403593 ]
Best score was 30530.0 with weights [1.39443746 6.92579063 7.5        6.19727297 3.49270794]
