This is an implementation of the gymnasium Lunar Lander ML project with torch.

In this project, the AI(rather MI) learns how to land a space shuttle on the moon in a 2d simulated space environment.

link: https://gymnasium.farama.org/environments/box2d/lunar_lander/


In [None]:
"""
box2d do not support windows. (try linux)
for some reason installing it with pip, leads to error. (wheels error).
but ppl say, it works on windows with anaconda https://github.com/openai/gym/issues/3143

conda install swig
conda install gymnasium[box2d]
"""

In [12]:
# installing dependencies for google colab.
# see requirements.txt, if you do not use colab.

!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

Collecting shimmy[atari]<1.0,>=0.1.0 (from gymnasium[accept-rom-license,atari])
  Downloading Shimmy-0.2.1-py3-none-any.whl (25 kB)
Collecting autorom[accept-rom-license]~=0.4.2 (from gymnasium[accept-rom-license,atari])
  Downloading AutoROM-0.4.2-py3-none-any.whl (16 kB)
Collecting AutoROM.accept-rom-license (from autorom[accept-rom-license]~=0.4.2->gymnasium[accept-rom-license,atari])
  Downloading AutoROM.accept-rom-license-0.6.1.tar.gz (434 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.7/434.7 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting ale-py~=0.8.1 (from shimmy[atari]<1.0,>=0.1.0->gymnasium[accept-rom-license,atari])
  Downloading ale_py-0.8.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[

# 1. imports

In [4]:
import numpy as np
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from collections import deque

# 2. create network

In [5]:
# !!!! in this example we do not scale the input datas (-> already mostly in the same scale)
class Network(nn.Module):


    def __init__(self, state_size, action_size):
        super().__init__()
        self.fcl1 = nn.Linear(state_size, 64)
        self.fcl2 = nn.Linear(64, 64)
        self.fcl3 = nn.Linear(64, action_size)


    def forward(self, state):
        signal = self.fcl1(state)
        signal = F.relu(signal)
        signal = self.fcl2(signal)
        signal = F.relu(signal)
        return self.fcl3(signal)


# 3 init hyperparameters

In [6]:
memory_size = 1e5 # -> 10^5=100.000
learning_batch_size = 100
alpha = 0.0005 # learning rate
gamma = 0.99 # discount factor
interpolation_parameter = 0.001 # how much information will be incorporated from the local network to the target network

# 4 create experience replay

In [7]:
class ReplayMemory():


    def __init__(self, memory_size, learning_batch_size):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.capacity = memory_size
        self.learning_batch_size = learning_batch_size
        self.memory = []


    def push(self, event):
        # fifo behavior -> first in first out
        self.memory.append(event)
        if len(self.memory) > self.capacity:
            del self.memory[0]


    # sample and transform
    def sample(self, sample_size=None):
        # can't use: sample_size=self.learning_batch_size -> because in python default values are executed when the function is defined,
        # which doesn't exists now. this is a common technik to create this behavior.
        if sample_size == None:
            sample_size = self.learning_batch_size
        # dtypes in memory (returned dtype from env+action): np.array, np.int64, float, bool, bool, sett
        experiences = random.sample(self.memory, k=sample_size)

        states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float().to(self.device)
        actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)
        rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)
        next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)
        dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device)
        return states, actions, rewards, next_states, dones


# 5. create the agent

In [8]:
class Agent():


    def __init__(self, state_size, action_size, memory_size, learning_batch_size, learning_rate, discount_factor, interpolation_parameter):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.state_size = state_size
        self.action_size = action_size
        self.memory_size = memory_size
        self.learning_batch_size = learning_batch_size
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.interpolation_parameter = interpolation_parameter
        self.memory = ReplayMemory(memory_size, learning_batch_size)
        self.local_qnetwork = Network(state_size, action_size).to(self.device)
        self.target_qnetwork = Network(state_size, action_size).to(self.device)
        self.optimizer = Adam(self.local_qnetwork.parameters(), lr=learning_rate)
        self.t_step = 0


    def step(self, state, action, reward, next_state, done):
        self.memory.push((state, action, reward, next_state, done))

        self.t_step = (self.t_step + 1) % 4
        if self.t_step == 0:
            if len(self.memory.memory) > self.learning_batch_size:
                experiences = self.memory.sample(self.learning_batch_size)
                self.learn(experiences)


    # epsilon greedy policy
    def act(self, state, epsilon):
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        self.local_qnetwork.eval()
        with torch.no_grad():
            action_values = self.local_qnetwork(state).data
            #print("act: action values: ", action_values)
        if random.random() > epsilon:
            action = np.argmax(action_values.cpu().numpy())
            #print("act: best action", action)
        else:
            action = random.choice(np.arange(action_size, dtype=np.int64))
            #print("act: action random", action)
        #print("type of action: ", type(action))
        return action


    def learn(self, experiences):
        states, actions, rewards, next_states, dones = experiences
        expected_q_values = self.target_qnetwork(next_states).detach()
        #print("learn: expected_q_values: ", expected_q_values)
        expected_best_q_value = expected_q_values.max(1)[0].unsqueeze(1)
        #print("learn: expected_best_q_value: ", expected_best_q_value)
        q_targets = rewards + self.discount_factor * expected_best_q_value * (1 - dones)
        predicted_q_values = self.local_qnetwork(states).gather(1, actions)
        loss = F.mse_loss(predicted_q_values, q_targets)
        #print("learn: loss: ", loss)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_parameter)


    # nn.module.parameters() returns an iterator, which is reference of its tensor content -> modifying iterator modify the tensor
    # zip() returns with its original input values + types -> tensors -> modifying the target_param directly modify the network object
    def soft_update(self, local_model, target_model, interpolation_parameter):
        for local_params, target_params in zip(local_model.parameters(), target_model.parameters()):
            weighted_params = interpolation_parameter * local_params.data + (1.0 - interpolation_parameter) * target_params.data
            target_params.data = weighted_params


# 6. init environment

In [13]:
import gymnasium as gym


env = gym.make("LunarLander-v2")
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
print("state size: ", state_size)
print("action size: ", action_size)

state size:  8
action size:  4


In [14]:
# init agent

agent = Agent(state_size, action_size, memory_size, learning_batch_size, alpha, gamma, interpolation_parameter)

# 7. training

In [15]:
# training parameters

number_episodes = 1600
max_steps_per_episode = 1000
epsilon_start = 0.99
epsilon_end = 0.01
epsilon_decay = 0.997
epsilon = epsilon_start
score_on_100_episode = deque(maxlen=100)

In [16]:
# training

for episode in range(1, number_episodes + 1):
    score = 0
    state, _ = env.reset()
    for step in range(max_steps_per_episode):
        action = agent.act(state, epsilon)
        # returned dtypes: np.array, float, bool, bool, sett
        next_state, reward, done, _, _ = env.step(action)
        agent.step(state, action, reward, next_state, done)
        score += reward
        state = next_state
        if done:
            break

    score_on_100_episode.append(score)
    epsilon = max(epsilon * epsilon_decay, epsilon_end)


    # visualize
    # "\r" ensures that each new print statement overwrites the previous one on the same line,
    # creating a dynamic display of information, commonly used in console-based progress indicators or animations (+must remove the default end).
    print("\r Episode: {} \t Average_score: {:.2f}".format(episode, np.mean(score_on_100_episode)), end="")
    # keep every 100 episode on the screen.
    if episode % 100 == 0:
        print("\r Episode: {} \t Average_score: {:.2f}".format(episode, np.mean(score_on_100_episode)))

env.close()

 Episode: 100 	 Average_score: -160.20
 Episode: 200 	 Average_score: -113.81
 Episode: 296 	 Average_score: -94.05

KeyboardInterrupt: 

In [None]:
# copy code: video visualize

import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gymnasium.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state, epsilon)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v2')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()