In [1]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.distributions as td

from qdkit.individual import TorchIndividual, Individual
from qdkit.utils import EvaluateOutput
from qdkit.map import GridMap
from qdkit.map_elites import MapElites
from qdkit.mutation import CrossoverMutation, GaussianNoiseMutation, Mutation, MutationSet

## LunarLander policy

In [2]:
class LunarLanderPolicy(TorchIndividual):
    def __init__(self):
        super().__init__()
        self.input_dims = 8
        self.action_dims = 4

        self.network = nn.Sequential(
            nn.Linear(8, 4),
            # nn.Tanh(),
            # nn.Linear(128, 4)
        )

        self.__device_param_dummy__ = nn.Parameter(
            torch.empty(0)
        )  # to keep track of device

    @property
    def device(self):
        return self.__device_param_dummy__.device

    def forward(self, state) -> torch.Tensor:
        state = state.float()
        output = self.network(state)
        return output.squeeze().argmax(-1)

    def act(self, state) -> np.array:
        with torch.no_grad():
            torch_state = torch.from_numpy(state).view(1,-1).to(self.device)
            action = self.forward(torch_state).item()
        return action

In [3]:
policy = LunarLanderPolicy()

## Mutation

In [4]:
def gym_evaluate(individual: Individual, render=False, max_steps=1000):
    env = gym.make("LunarLander-v2")
    states = []
    fitness = 0
    done = False
    state, _ = env.reset(seed=52)
    states = [state]
    step = 0
    while not done:
        if step >= max_steps:
            break
        if render:
            env.render()
        inp = state
        action = env.action_space.sample()
        # action = individual.act(inp)
        next_state, reward, done, truncated, _ = env.step(action)
        fitness += reward
        state = next_state
        states.append(state)
        step += 1
    return EvaluateOutput(states=states, fitness=fitness, individual=None)

In [5]:
eval_output = gym_evaluate(None, render=True)

  gym.logger.warn(
  if not isinstance(terminated, (bool, np.bool8)):


In [6]:
def lunar_lander_bc(individual, eval_output):
    all_y_vels = []
    states = eval_output.states
    impact_x_pos = None
    for obs in states:
        x_pos = obs[0]
        y_vel = obs[3]
        leg0_touch = bool(obs[6])
        leg1_touch = bool(obs[7])
        all_y_vels.append(y_vel)

        # Check if the lunar lander is impacting for the first time.
        if impact_x_pos is None and (leg0_touch or leg1_touch):
            impact_x_pos = x_pos
            impact_y_vel = y_vel

    if impact_x_pos is None:
        impact_x_pos = x_pos
        impact_y_vel = min(all_y_vels)

    return np.array([impact_x_pos, impact_y_vel])

In [7]:
lunar_lander_bc(policy, eval_output)

array([-0.40148544, -1.5157455 ], dtype=float32)

In [8]:
grid_map = GridMap(
    behavior_characteristic_fn = lunar_lander_bc,
    behavior_space = [(-1.0, 1.0), (-3.0, 0.0)],
    n_bins=50,
    niche_size=10,
    sampling_method="sorted"
)

In [9]:
mutations = [
    GaussianNoiseMutation(std=0.2),
    CrossoverMutation(
        parameter_proportion=0.5,
        sampling_method="sorted",
        sample_segment=True
    )
]

In [10]:
map_elites = MapElites(
    initial_pop = [LunarLanderPolicy() for _ in range(100)],
    map=grid_map,
    evaluate_fn=gym_evaluate,
    mutations=mutations
)

In [None]:
map_elites.run(100000)

Max Fitness: 62.59462719169926:  98%|▉| 98008/100000 [10:02<00:12,