In [1]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from typing import Sequence
from collections import namedtuple, deque
import itertools
import random
import warnings
warnings.filterwarnings("ignore")

In [2]:
GAMMA = 0.99
BATCH_SIZE = 64
BUFFER_SIZE = 100000
MIN_REPLAY_SIZE = 1000
EPS_START = 1.0
EPS_END = 0.01
EPS_DECAY = 0.995
TARGET_UPDATE_FREQ = 10

In [3]:
env = gym.make("LunarLander-v2")
obs = env.reset()
episode_reward = 0.0

In [4]:
Transition = namedtuple('Transition', ('states', 'actions', 'rewards', 'dones', 'next_states'))

class replay_memory():
    def __init__(self, env, fullsize, minsize, batchsize):
        self.env = env
        self.memory = deque(maxlen=fullsize)
        self.rewards = deque(maxlen=50)
        self.batchsize = batchsize
        self.minsize = minsize

    def append(self, transition):
        self.memory.append(transition)

    def sample_batch(self):
        batch = random.sample(self.memory, self.batchsize)
        batch = Transition(*zip(*batch))
        states = torch.from_numpy(np.array(batch.states, dtype=np.float32))
        actions = torch.from_numpy(np.array(batch.actions, dtype=np.int64)).unsqueeze(1)
        rewards = torch.from_numpy(np.array(batch.rewards, dtype=np.float32)).unsqueeze(1)
        dones = torch.from_numpy(np.array(batch.dones, dtype=np.bool8)).unsqueeze(1)
        next_states = torch.from_numpy(np.array(batch.next_states, dtype=np.float32))
        return states, actions, rewards, dones, next_states

    def initialize(self):
        obs, info = env.reset()
        for _ in range(self.minsize):
            action = self.env.action_space.sample()
            new_obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            transition = Transition(obs, action, reward, done, new_obs)
            self.append(transition)
            obs = new_obs
            if done:
                self.env.reset()
        return self
    
replay_buffer = replay_memory(env, BUFFER_SIZE, MIN_REPLAY_SIZE, BATCH_SIZE)
replay_buffer.initialize()

<__main__.replay_memory at 0x1dcfeee6eb0>

In [5]:
class DQN(nn.Module):
    def __init__(self, num_inputs, num_actions):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(num_inputs, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, num_actions)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

dqn_policy = DQN(env.observation_space.shape[0], env.action_space.n).float()
dqn_target = DQN(env.observation_space.shape[0], env.action_space.n).float()
dqn_target.load_state_dict(dqn_policy.state_dict())

optimizer = torch.optim.Adam(dqn_policy.parameters())
loss_fn = nn.MSELoss()

def epsilon_greedy_policy(epsilon, obs):
    rnd_sample = random.random()
    if rnd_sample <= epsilon:
        action = env.action_space.sample()
    else:
        with torch.no_grad():
            action = int(torch.argmax(dqn_policy(torch.Tensor(obs))))
    return action

In [6]:
eps_threshold = EPS_START
episode = 1
avg_rewards = deque(maxlen=50)  # To track the average reward over the last 50 episodes

for step in itertools.count():
    action = epsilon_greedy_policy(eps_threshold, obs)
    new_obs, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    replay_buffer.append(Transition(obs, action, reward, done, new_obs))
    episode_reward += reward
    obs = new_obs

    if done:
        episode += 1
        eps_threshold = max(eps_threshold * EPS_DECAY, EPS_END)
        avg_rewards.append(episode_reward)
        obs = env.reset()

        if episode % 100 == 0:
            avg_res = np.mean(avg_rewards)
            print(f'Episode: {episode} Avg Results: {avg_res} Epsilon: {eps_threshold}')

        if np.mean(avg_rewards) >= 195:
            print(f'Solved at episode: {episode} Avg Results: {np.mean(avg_rewards)}')
            break
        episode_reward = 0

    if len(replay_buffer.memory) > MIN_REPLAY_SIZE:
        b_states, b_actions, b_rewards, b_dones, b_next_states = replay_buffer.sample_batch()

        qvalues = dqn_policy(b_states).gather(1, b_actions)

        with torch.no_grad():
            target_qvalues = dqn_target(b_next_states)
            max_target_qvalues = torch.max(target_qvalues, axis=1).values.unsqueeze(1)
            expected_qvalues = b_rewards + GAMMA * (1 - b_dones.type(torch.int64)) * max_target_qvalues

        loss = loss_fn(qvalues, expected_qvalues)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if episode % TARGET_UPDATE_FREQ == 0:
        dqn_target.load_state_dict(dqn_policy.state_dict())

ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (64,) + inhomogeneous part.

In [None]:
import renderlab as rl
env = rl.RenderFrame(env, "./video")

obs = env.reset()

while True:
    with torch.no_grad():
        action = int(torch.argmax(dqn_policy(torch.from_numpy(obs).float().unsqueeze(0))))
    obs, reward, terminated, truncated, _ = env.step(action)

    if terminated or truncated:
        break

env.play()