In [1]:
import torch
import torch.nn as nn

import random
import math

import gymnasium as gym

import time

In [2]:
env = gym.make('CartPole-v1', render_mode='human')
env.reset()
action = env.action_space.sample()
observation, reward, terminated, truncated, info = env.step(action)
env.close()

In [3]:
print(observation)
print(reward)
print(terminated)
print(truncated)
print(info)

[ 0.04926112  0.1610219  -0.01655035 -0.3467851 ]
1.0
False
False
{}


In [4]:
env.action_space.sample()

0

In [5]:
n_observations = 4
n_actions = 2

device = 'cpu'

EPS_START, EPS_END = 0.9, 0.05
EPS_DECAY = 1000

steps_done = 0

In [10]:
class DQN(nn.Module):
    def __init__(self):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(n_observations, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, n_actions)
        )

    def forward(self, observation):
        logits = self.network(observation)
        return logits
    
model = DQN().to(device)

We'll implement an epsilon-greedy policy, whith which we select a random action with probability epsilon and the action that maximizes Q otherwise. Here epsilon decays exponentially to a set minimum as the total number of steps increases.

How do we perform optimization in batches where a random action was chosen? If the random action was not what the network predicted would yield most reward, we'll be penalizing our model for predicting the correct action. What should I do then?

Maybe it doesn't matter. Since all we're predicting is the total reward given a certain action, it doesn't matter whether we chose a bad or good action. All that matters is how good the network's prediction was.

Yes, it doesn't matter. We are indeed picking the action with highest expected reward. Nevertheless, we're training the model to predict expected reward given an action, so it doesn't matter which one we picked.

Also, we input only one action at a time, not batches of actions. Batches come into play when we're computing the loss and optimizing the model.

In [7]:
def select_action(model, observation):
    global env, steps_done
    # gym's actions are always ndarrays
    tensor_observation = torch.from_numpy(observation)

    # Compute epsilon
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-EPS_DECAY * steps_done)

    logits = model(tensor_observation)
    with torch.no_grad():
        if sample > eps_threshold:
            # Pick action with biggest predicted reward
            action = logits.argmax().item()
        else:
            # Pick action at random
            action = env.action_space.sample()
    steps_done += 1
    return action, logits

In [16]:
env = gym.make('CartPole-v1', render_mode='human')
observation, info = env.reset()

t_start = time.perf_counter()
for _ in range(100):
    action, _ = select_action(model, observation)  # agent policy that uses the observation and info
    observation, reward, terminated, truncated, info = env.step(action)

    if terminated or truncated:
        observation, info = env.reset()

env.close()

We store the most recent state transitions up to CAPACITY which we'll randomly sample from to optimize our model. 

In [50]:
from collections import namedtuple, deque

Transition = namedtuple('Transition', ['observation', 'action', 'reward', 'next_observation'])

class ReplayMemory():
    def __init__(self, capacity):
        self.replay_memory = deque(maxlen=capacity)

    def __len__(self):
        return len(self.replay_memory)

    def store(self, observation, action, reward, next_observation):
        self.replay_memory.append(Transition(observation, action, reward, next_observation))

    def sample(self, batch_size):
        sample_size = min(self.__len__(), batch_size)
        return random.sample(self.replay_memory, sample_size)

In [60]:
memory = ReplayMemory(10)

In [68]:
print(len(memory))

env = gym.make('CartPole-v1')
observation, _ = env.reset()
action = env.action_space.sample()
next_observation, reward, _, _, _ = env.step(action)
memory.store(observation, action, reward, next_observation)
transition = memory.sample(5)
print(len(transition))

7
5


The visualization will continuously update the graph of the series of episode durations.

In [None]:
def plot_durations(episode_lenghts):
    pass