In [1]:
import numpy as np
import random
import math
from math import inf
import torch
from torch import nn
import matplotlib.pyplot as plt
from copy import deepcopy

In [2]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

In [3]:
import gym
print(gym.__version__)

0.26.2


In [4]:
env =  gym.make("CartPole-v1")

In [5]:
state = env.reset()
print(f'Starting state {state}')

Starting state (array([-0.01303447,  0.02214191, -0.04855942,  0.00686143], dtype=float32), {})


In [6]:
print(env.observation_space)

Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)


In [7]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cpu'

In [8]:

class DQN(nn.Module):
  def __init__(self, state_dims):
    super().__init__()
    self.block = nn.Sequential(
        nn.Linear(in_features = state_dims, out_features= 128),
        nn.ReLU(),
        nn.Linear(in_features = 128, out_features= 64),
        nn.ReLU(),
        nn.Linear(in_features = 64, out_features= 2)
    )
  def forward(self, x):
    return self.block(x)

q_network =  DQN(env.observation_space.shape[0]).to(device)
print(f'q_network is running on {next(q_network.parameters()).device}')
target_q_network = deepcopy(q_network).eval()

q_network is running on cpu


In [9]:
dummy_tensor = torch.rand(4)
print(q_network(dummy_tensor))

tensor([-0.1176, -0.1095], grad_fn=<ViewBackward0>)


In [10]:
class ReplayBuffer:

    def __init__(self, capacity=100000):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def insert(self, transition):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = transition
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        assert self.can_sample(batch_size)

        batch = random.sample(self.memory, batch_size)
        batch = zip(*batch)
        return [torch.cat(items) for items in batch]

    def can_sample(self, batch_size):
        return len(self.memory) >= batch_size * 10

    def __len__(self):
        return len(self.memory)

In [11]:
def exploratory_policy(state, epsilon):
  if random.random() < epsilon:
    return torch.tensor(env.action_space.sample()).view(1,-1)
  else:
    tensor_state = torch.from_numpy(state).unsqueeze(dim = 0)
    action_logits = q_network(tensor_state).detach()
    return torch.argmax(action_logits, dim = 1, keepdim = True)

In [12]:
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(params= q_network.parameters(),
                             lr = 0.0001)

In [13]:
from tqdm.auto import tqdm

In [14]:
def preprocess(state, reward, done, next_state):
  tensor_state = torch.from_numpy(state).unsqueeze(dim=0)
  tensor_next_state = torch.from_numpy(next_state).unsqueeze(dim = 0)
  tensor_reward = torch.tensor(reward).view(1,-1)
  tensor_done = torch.tensor(done).view(1, -1)
  return tensor_state, tensor_next_state, tensor_reward, tensor_done

In [15]:

from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter()

In [16]:
def deep_q_learning(episodes, batch_size=32, gamma=0.5, initial_epsilon=1, decay_rate = 0.01):
  # stats = {'MSE Loss': [], 'Returns': []}
  memory = ReplayBuffer()

  for episode in tqdm(range(episodes)):
    state = env.reset()
    done = False
    epsilon = initial_epsilon * math.e ** (-decay_rate * episode)
    ep_return = 0
    while not done:
      tensor_action = exploratory_policy(state, epsilon)
      next_state, reward, done, _ = env.step(tensor_action.item())
      tensor_state, tensor_next_state, tensor_reward = preprocess(state, reward, done, next_state)
      memory.insert([tensor_state, tensor_action, tensor_reward, tensor_done, tensor_next_state])

      if memory.can_sample(batch_size):
        state_b, action_b, reward_b, done_b, next_state_b = memory.sample(batch_size)
        qsa_b = q_network(state_b).gather(1, action_b)
        next_qsa_b = torch.max(target_q_network(next_state_b), dim = -1, keepdim = True)[0]

        target_b = reward_b + ~done_b * gamma * next_qsa_b
        loss = loss_fn(qsa_b, target_b)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        writer.add_scalar('MSE Loss', loss.item(), episode)
        # stats['MSE Loss'].append(loss.item())

      state = next_state
      ep_return += reward

    writer.add_scalar('Episode Return', ep_return, episode)
    # stats['Returns'].append(ep_return)

    if episode % 10 == 0:
        target_q_network.load_state_dict(q_network.state_dict())

  writer.close()
  return stats


In [17]:
stats = deep_q_learning(50)

  0%|          | 0/50 [00:00<?, ?it/s]

  if not isinstance(terminated, (bool, np.bool8)):


ValueError: too many values to unpack (expected 4)