# MAP-Elites using pyribs

In [None]:
import numpy as np
import requests
import json
import random
from ribs.archives import GridArchive
from ribs.emitters import EmitterBase
from ribs.schedulers import Scheduler
from ribs.visualize import grid_archive_heatmap
import matplotlib.pyplot as plt

In [None]:
# Server JS which exposes the trackGenerations and genetic operators 
BASE_URL = 'http://localhost:4242'
POINTS_COUNT = 50
MAX_SELECTED_CELLS = 10
SOLUTION_DIM = POINTS_COUNT * 2 + MAX_SELECTED_CELLS * 2 + 1 
TRACK_SIZE_RANGE = (2, 5)
LENGTH_RANGE = (400, 3000)

ARCHIVE_DIM = 3;
INIT_POPULATION = ARCHIVE_DIM * ARCHIVE_DIM 

### Helper functions

In [None]:
def generate_id(iteration):
    return iteration + random.random()

def create_random_solution(iteration):
    return {
        "id": generate_id(iteration),
        "mode": "voronoi",
        "dataSet": [{"x": random.uniform(0, 600), "y": random.uniform(0, 600)} for _ in range(POINTS_COUNT)],
        "selectedCells": []
    }

def solution_to_array(solution):
    array = np.zeros(SOLUTION_DIM)
    # Fill in dataSet
    for i, point in enumerate(solution["dataSet"]):
        array[i*2] = point["x"]
        array[i*2 + 1] = point["y"]
    # Fill in selectedCells
    for i, cell in enumerate(solution["selectedCells"]):
        if i < MAX_SELECTED_CELLS:
            array[POINTS_COUNT*2 + i*2] = cell["x"]
            array[POINTS_COUNT*2 + i*2 + 1] = cell["y"]
    # Add ID at the end
    array[-1] = solution["id"]
    return array

def array_to_solution(array):
    solution = {
        "id": array[-1],
        "mode": "voronoi",
        "dataSet": [{"x": array[i], "y": array[i+1]} for i in range(0, POINTS_COUNT*2, 2)],
        "selectedCells": [{"x": array[i], "y": array[i+1]} 
                          for i in range(POINTS_COUNT*2, SOLUTION_DIM-1, 2) 
                          if array[i] != 0 or array[i+1] != 0]
    }
    return solution

def evaluate_solution(solution):
    try:
        response = requests.post(f'{BASE_URL}/evaluate', json=solution)
        response.raise_for_status()
        result = response.json()
        fitness = result['fitness']
        objective = -(fitness['deltaX'] + fitness['deltaY'])
        measures = [result['trackSize'], fitness['length']]
        return objective, measures, result['selectedCells']
    except requests.RequestException as e:
        print(f"Error in evaluate_solution: {e}")
        return -np.inf, [TRACK_SIZE_RANGE[0], LENGTH_RANGE[0]], []

## Genetic operators

In [None]:
class CustomEmitter(EmitterBase):
    def __init__(self, archive, solution_dim, batch_size=ARCHIVE_DIM, bounds=None):
        super().__init__(archive, solution_dim=solution_dim, bounds=bounds)
        self.batch_size = batch_size
        self.iteration = 0

    def ask(self):
        self.iteration += 1
        if self.iteration <= INIT_POPULATION: 
            return self.generate_initial_solutions()
        elif np.random.random() < 0.5:  # Mutation
            return self.mutate_solutions()
        else:  # Crossover
            return self.crossover_solutions()

    def tell(self, solution, objective, measures, add_info, **fields):
        # I don't need to adapt the emitter based on results ? I think
        pass

    def generate_initial_solutions(self):
        return np.array([solution_to_array(create_random_solution(self.iteration-1)) for _ in range(self.batch_size)])

    def mutate_solutions(self):
        print("Mutating")
        parents = self.archive.sample_elites(self.batch_size)
        mutated = []
        for parent in parents:
            parent_dict = array_to_solution(parent.solution, parent.metadata['id'])
            response = requests.post(f'{BASE_URL}/mutate', json={
                "individual": parent_dict,
                "intensityMutation": 10
            })
            response.raise_for_status()
            mutated_dict = response.json()['mutated']
            mutated_dict['id'] = generate_id(self.iteration-1)  # Update ID for new generation
            mutated.append(solution_to_array(mutated_dict))
        return np.array(mutated)

    def crossover_solutions(self):
        print("Crossover")
        offspring = []
        for _ in range(self.batch_size // 2):
            parents = self.archive.sample_elites(2)
            parent1 = array_to_solution(parents[0].solution)
            parent2 = array_to_solution(parents[1].solution)
            response = requests.post(f'{BASE_URL}/crossover', json={
                "mode": "voronoi",
                "parent1": parent1,
                "parent2": parent2
            })
            response.raise_for_status()
            children = response.json()['offspring']
            for child in children:
                child['id'] = generate_id(self.iteration-1)  # Update ID for new generation
                offspring.append(solution_to_array(child))
        return np.array(offspring)

## Illuminating search spaces by mapping elites


In [None]:
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def run_map_elites(iterations):
    archive = GridArchive(solution_dim=SOLUTION_DIM,
                          dims=[ARCHIVE_DIM, ARCHIVE_DIM],
                          ranges=[TRACK_SIZE_RANGE, LENGTH_RANGE])

    emitter = CustomEmitter(archive,
                            solution_dim=SOLUTION_DIM,
                            batch_size=INIT_POPULATION,
                            bounds=[(0, 600)] * (SOLUTION_DIM - 1) + [(0, float('inf'))])  # Last bound is for ID

    scheduler = Scheduler(archive, [emitter])

    for itr in range(iterations):
        try:
            solution_batch = scheduler.ask()
            
            objectives = []
            measures_list = []
            updated_solutions = []

            for solution in solution_batch:
                solution_id = generate_id(itr)
                solution[-1] = solution_id  # Set the ID in the array
                solution_dict = array_to_solution(solution)
                objective, measures, selected_cells = evaluate_solution(solution_dict)
                
                # Check if the objective is finite
                if not np.isfinite(objective):
                    logger.warning(f"Non-finite objective value: {objective}. Skipping this solution.")
                    continue
                
                # Update solution with actual selectedCells
                updated_solution = solution.copy()
                for i, cell in enumerate(selected_cells):
                    if i < MAX_SELECTED_CELLS:
                        updated_solution[POINTS_COUNT*2 + i*2] = cell['x']
                        updated_solution[POINTS_COUNT*2 + i*2 + 1] = cell['y']
                
                objectives.append(objective)
                measures_list.append(measures)
                updated_solutions.append(updated_solution)

            # Only tell the scheduler if we have valid solutions
            if objectives and measures_list:
                scheduler.tell(objectives, measures_list)



            if (itr + 1) % 10 == 0:
                logger.info(f"> {itr + 1} iterations completed")
                logger.info(f"  - Archive size: {len(archive)}")
                logger.info(f"  - Archive coverage: {archive.stats.coverage}")

        except Exception as e:
            logger.error(f"Error in iteration {itr}: {str(e)}")
            raise

    return archive

In [None]:
final_archive = run_map_elites(10) 

## Visualize Results

In [None]:
plt.figure(figsize=(10, 8))
grid_archive_heatmap(final_archive)
plt.title("MAP-Elites Archive")
plt.xlabel("Track Size")
plt.ylabel("Track Length")
plt.show()

## Archive Statistics

In [None]:
print(f"Total solutions: {len(final_archive)}")
print(f"Best solution: Objective = {final_archive.stats.obj_max}")
best_solution = final_archive.elite_with_behavior(final_archive.best)
print(f"Best solution measures: {best_solution.behavior}")

best_track = solution_array_to_dict(best_solution.solution, best_solution.behavior[0])
print("Best track data:", json.dumps(best_track, indent=2))