In [1]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.distributions as td

from mapleetz.individual import TorchModuleIndividual, Individual
from mapleetz.evaluate_fn import GymEvaluateFunction, EvaluateOutput
from mapleetz.map import GridMap
from mapleetz.map_elites import MapElites
from mapleetz.mutation import CrossoverMutation, GaussianNoiseMutation, Mutation, MutationSet

## LunarLander policy

In [2]:
class LunarLanderPolicy(nn.Module):
    def __init__(self):
        super().__init__()
        self.input_dims = 8
        self.action_dims = 4

        self.network = nn.Sequential(
            nn.Linear(8, 32),
            nn.Tanh(),
            nn.Linear(32, 4)
        )

    def forward(self, state) -> torch.Tensor:
        state = state.float()
        output = self.network(state)
        dist = td.Categorical(logits=output)
        return dist.sample()

In [3]:
policy = LunarLanderPolicy()

## Mutation

In [None]:
def simulate(model, seed=None, video_env=None):
    """Simulates the lunar lander model.

    Args:
        model (np.ndarray): The array of weights for the linear policy.
        seed (int): The seed for the environment.
        video_env (gym.Env): If passed in, this will be used instead of creating
            a new env. This is used primarily for recording video during
            evaluation.
    Returns:
        total_reward (float): The reward accrued by the lander throughout its
            trajectory.
        impact_x_pos (float): The x position of the lander when it touches the
            ground for the first time.
        impact_y_vel (float): The y velocity of the lander when it touches the
            ground for the first time.
    """
    if video_env is None:
        # Since we are using multiple processes, it is simpler if each worker
        # just creates their own copy of the environment instead of trying to
        # share the environment. This also makes the function "pure." However,
        # we should use the video_env if it is passed in.
        env = gym.make("LunarLander-v2")
    else:
        env = video_env

    action_dim = env.action_space.n
    obs_dim = env.observation_space.shape[0]
    model = model.reshape((action_dim, obs_dim))

    total_reward = 0.0
    impact_x_pos = None
    impact_y_vel = None
    all_y_vels = []
    obs, _ = env.reset(seed=seed)
    done = False

    states = [obs]
    while not done:
        action = np.argmax(model @ obs)  # Linear policy.
        obs, reward, terminated, truncated, _ = env.step(action)
        states.append(obs)
        done = terminated or truncated
        total_reward += reward

        # Refer to the definition of state here:
        # https://gymnasium.farama.org/environments/box2d/lunar_lander/
        x_pos = obs[0]
        y_vel = obs[3]
        leg0_touch = bool(obs[6])
        leg1_touch = bool(obs[7])
        all_y_vels.append(y_vel)

        # Check if the lunar lander is impacting for the first time.
        if impact_x_pos is None and (leg0_touch or leg1_touch):
            impact_x_pos = x_pos
            impact_y_vel = y_vel

    # If the lunar lander did not land, set the x-pos to the one from the final
    # timestep, and set the y-vel to the max y-vel (we use min since the lander
    # goes down).
    if impact_x_pos is None:
        impact_x_pos = x_pos
        impact_y_vel = min(all_y_vels)

    # Only close the env if it was not a video env.
    if video_env is None:
        env.close()

    return EvaluateOutput(fitness=total_reward, np.array([impact_x_pos, impact_y_vel]), states=states)