In [1]:
# Access parent directories
from sys import path
from os.path import abspath
path += [abspath("../"*i) for i in (1, 2, 3)]

In [2]:
from QLab import Qptimizer, QNetwork, ReplayMemory
import gym
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from boring_gym_stuff import get_screen, select_action
from QLearning.utilities import quickplot


env = gym.make('CartPole-v0').unwrapped
is_ipython = 'inline' in matplotlib.get_backend()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
plt.ion()

pygame 2.1.2 (SDL 2.0.16, Python 3.9.12)
Hello from the pygame community. https://www.pygame.org/contribute.html




<matplotlib.pyplot._IonContext at 0x7f5ad727f820>

In [3]:
class DQN(QNetwork):
    def __init__(self, h, w, outputs):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
        self.bn3 = nn.BatchNorm2d(32)

        # Number of Linear input connections depends on output of conv2d layers
        # and therefore the input image size, so compute it.
        def conv2d_size_out(size, kernel_size = 5, stride = 2):
            return (size - (kernel_size - 1) - 1) // stride  + 1
        convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
        convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))
        linear_input_size = convw * convh * 32
        self.head = nn.Linear(linear_input_size, outputs)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = x.to(device)
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        return self.head(x.view(x.size(0), -1))

In [4]:
Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state'))


class ReplayMemory(object):
    def __init__(self, capacity):
        self.memory = deque([],maxlen=capacity)

    def push(self, state, action, reward, next_state):
        """Save a transition"""
        if next_state is not None:
            next_state = next_state
        self.memory.append(Transition(
            state,
            action[0],
            reward,
            next_state
        ))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [6]:
env.reset()
init_screen = get_screen(env)
_, _, screen_height, screen_width = init_screen.shape
n_actions = env.action_space.n

In [7]:
BATCH_SIZE = 128
GAMMA = 0.999
TARGET_UPDATE = 10
num_episodes = 1000
memory = ReplayMemory(10000)
policy_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net = policy_net.clone()
optimizer = optim.RMSprop(policy_net.parameters())
optimizer = Qptimizer(memory, optimizer, policy_net, target_net)


episode_durations = []
for i_episode in range(num_episodes):
    # Initialize the environment and state
    env.reset()
    last_screen = get_screen(env)
    current_screen = get_screen(env)
    state = current_screen - last_screen
    for t in count():
        # Select and perform an action
        action = select_action(state, sum(episode_durations), n_actions, policy_net)
        _, reward, done, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)

        # Observe new state
        last_screen = current_screen
        current_screen = get_screen(env)
        next_state = current_screen - last_screen
        if done:
            next_state = None

        # Store the transition in memory
        memory.push(state, action, reward, next_state)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        optimizer(gamma = GAMMA)
        if done:
            episode_durations.append(t + 1)
            optimizer.plot_loss()
            optimizer.plot_loss_variance()
            quickplot(episode_durations, "Score", path = "./score")
            break
    # Update the target network, copying all weights and biases in DQN
    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

print('Complete')
policy_net.save()
env.render()
env.close()
plt.ioff()
plt.show()

torch.Size([128, 3, 40, 90]) torch.Size([128, 1]) torch.Size([128])
torch.Size([128, 1]) torch.Size([128, 1])
torch.Size([128, 3, 40, 90]) torch.Size([128, 1]) torch.Size([128])
torch.Size([128, 1]) torch.Size([128, 1])
torch.Size([128, 3, 40, 90]) torch.Size([128, 1]) torch.Size([128])
torch.Size([128, 1]) torch.Size([128, 1])
torch.Size([128, 3, 40, 90]) torch.Size([128, 1]) torch.Size([128])
torch.Size([128, 1]) torch.Size([128, 1])
torch.Size([128, 3, 40, 90]) torch.Size([128, 1]) torch.Size([128])
torch.Size([128, 1]) torch.Size([128, 1])
torch.Size([128, 3, 40, 90]) torch.Size([128, 1]) torch.Size([128])
torch.Size([128, 1]) torch.Size([128, 1])
torch.Size([128, 3, 40, 90]) torch.Size([128, 1]) torch.Size([128])
torch.Size([128, 1]) torch.Size([128, 1])
torch.Size([128, 3, 40, 90]) torch.Size([128, 1]) torch.Size([128])
torch.Size([128, 1]) torch.Size([128, 1])
torch.Size([128, 3, 40, 90]) torch.Size([128, 1]) torch.Size([128])
torch.Size([128, 1]) torch.Size([128, 1])
torch.Size

<Figure size 432x288 with 0 Axes>