In [2]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

In [3]:
env = gym.make("CarRacing-v0")
obs = env.reset()
env.close()
obs = np.array(obs)

Track generation: 1247..1563 -> 316-tiles track


In [4]:
obs_tensor = torch.tensor(obs, dtype=torch.float32)
print(obs_tensor.shape)
obs_tensor = obs_tensor.unsqueeze(0)
print(obs_tensor.shape)
obs_tensor = obs_tensor.permute(0,3,1,2)
print(obs_tensor.shape)

torch.Size([96, 96, 3])
torch.Size([1, 96, 96, 3])
torch.Size([1, 3, 96, 96])


In [5]:

x = obs_tensor.detach().clone()
print("Input", x.shape)
conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
x = conv1(x)
print("Conv1", x.shape)
conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
x = conv2(x)
print("Conv2", x.shape)
linear_size = x.shape[1] * x.shape[2] * x.shape[3]
x = x.view(x.size(0),-1)
print("Flatten", x.shape)
linear = nn.Linear(linear_size, 3)
x = linear(x)
print("Linear", x.shape)

Input torch.Size([1, 3, 96, 96])
Conv1 torch.Size([1, 16, 46, 46])
Conv2 torch.Size([1, 32, 21, 21])
Flatten torch.Size([1, 14112])
Linear torch.Size([1, 3])


In [6]:
class DQN(nn.Module):
    def __init__(self):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
        self.linear = nn.Linear(14112, 3)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view(x.size(0), -1)
        return self.linear(x)

In [7]:
dqn = DQN()
x = obs_tensor.detach().clone()
dqn(x)

tensor([[-4.0357, -5.4013,  0.1850]], grad_fn=<AddmmBackward>)

In [8]:
# convert a single obs from gym into a tensor for DQN
def to_tensor(obs):
    obs = np.array(obs)
    obs_tensor = torch.tensor(obs, dtype=torch.float32)
    obs_tensor = obs_tensor.unsqueeze(0)
    obs_tensor = obs_tensor.permute(0,3,1,2)
    return obs_tensor

In [44]:
size = 10
states = np.zeros((size, 3, 96, 96))
print(states.shape)
obs_np = np.array(obs)
print(obs_np.shape)
obs_np = obs_np.transpose(2,0,1)
print(obs_np.shape)
states[1] = obs_np
print(np.array_equal(states[0],states[1]))
idxs = np.random.choice(10, 9, replace=False)
print(idxs)
batch = states[idxs]
print(batch.shape)


(10, 3, 96, 96)
(96, 96, 3)
(3, 96, 96)
False
[4 7 6 2 8 9 1 0 3]
(9, 3, 96, 96)


In [45]:
class ReplayBuffer():
    def __init__(self, size):
        self.size = size
        self.states = np.zeros((size, 3, 96, 96))
        self.ctr = 0

    def push(self, obs):
        obs = np.array(obs).transpose(2,0,1) # transpose swaps the dimensions in numpy

        idx = self.ctr % self.size
        self.states[idx] = obs

        self.ctr += 1

    def sample(self, batch_size):
        max_idx = min(self.size, self.ctr)
        idxs = np.random.choice(max_idx, batch_size, replace=False)
        states_batch = self.states[idxs]

        return idxs

In [46]:
mem = ReplayBuffer(10)
for i in range(12):
    mem.push(obs)

In [48]:
mem.sample(3)

array([[[[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]],

        [[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]],

        [[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]]],


       [[[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0.