In [1]:
# basic DQN with gym cartpole model
# referenced from : https://github.com/seungeunrho/RLfrombasics/blob/master/ch8_DQN.py

In [2]:
# Memory checking required
import torch

In [3]:
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [4]:
import gymnasium as gym
import renderlab as rl
import numpy as np
import collections
import random

## Env Rendering Test

In [5]:
env = gym.make("CartPole-v1", render_mode = "rgb_array")
env = rl.RenderFrame(env, "./output")

observation, info = env.reset()
score = 0

while True:
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)
    score += reward
    
    if terminated:
        print("Score : ", score)
        break

# Below playing requires high memory usage, which can result in torch import error
env.play()

OpenCV: FFMPEG: tag 0x5634504d/'MP4V' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


Score :  14.0
Moviepy - Building video temp-{start}.mp4.
Moviepy - Writing video temp-{start}.mp4



                                                                                                      

Moviepy - Done !
Moviepy - video ready temp-{start}.mp4




## Assign Hyper Parameters

In [19]:
learning_rate = 0.0005
gamma = 0.98
buffer_limit = 50000
batch_size = 32
target_update_interval = 20
print_interval = 1000

## Define ReplayBuffer Class

In [7]:
class ReplayBuffer():
    
    def __init__(self, buffer_limit=50000):
        self._buffer = collections.deque(maxlen=buffer_limit)

    def put(self, transition):
        self._buffer.append(transition)

    def sample_test(self, n):
        batch = random.sample(self._buffer, n)
        s_list, a_list, r_list, sp_list, done_list = [], [], [], [], []

        for transition in batch:
            s, a, r, sp, done = transition
            print(transition)
            break

        return None
    
    def sample(self, n):
        batch = random.sample(self._buffer, n)
        s_list, a_list, r_list, sp_list, done_list = [], [], [], [], []

        for transition in batch:
            s, a, r, sp, done = transition
            s_list.append(s)
            a_list.append([a])
            r_list.append([r])
            sp_list.append(sp)
            done_list.append(done)

        return torch.tensor(s_list, dtype=torch.float), \
                torch.tensor(a_list), torch.tensor(r_list), \
                torch.tensor(sp_list, dtype=torch.float), \
                torch.tensor(done_list)

    def size(self):
        return len(self._buffer)

## Qnet Class

In [8]:
class Qnet(nn.Module):

    def __init__(self):
        super(Qnet, self).__init__()
        self.fc1 = nn.Linear(4, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 2)

    def forward(self, x):
        x1 = F.relu(self.fc1(x))
        x2 = F.relu(self.fc2(x1))
        return self.fc3(x2)

    def sample_action(self, obs, eps):
        coin = random.random()
        if coin < eps:
            return random.randint(0, 1)
        else:
            out = self.forward(obs)
            return out.argmax().item()

## Train Function

In [9]:
def train(train_iter, q, q_target, buffer, optimizer):
    for i in range(train_iter):
        s_list, a_list, r_list, sp_list, done_list = buffer.sample(batch_size)

        target_value = r_list + gamma * q_target(sp_list).max(1)[0].unsqueeze(1) * done_list
        action_value = q(s_list).gather(1, a_list)

        loss = F.smooth_l1_loss(action_value, target_value)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

- PyTorch Gather Example 

In [10]:
cube = torch.tensor(np.array([1,2,3,4,5,6]))
selection = torch.tensor(random.randint(0, 5))
out = cube.gather(0, selection)
out

tensor(5)

# Main Setup

In [11]:
env = gym.make("CartPole-v1")

q = Qnet()
q_target = Qnet()
q_target.load_state_dict(q.state_dict())

memory = ReplayBuffer()

optimizer = optim.Adam(q.parameters(), lr=learning_rate)

## Main Loop for Train 

In [None]:
for n_epi in range(10000):
    epsilon = max(0.01, 0.08 - 0.01*(n_epi/200)) #Linear annealing from 8% to 1%
    s, _ = env.reset()
    done = False
    score = 0.0

    while not done:
        a = q.sample_action(torch.from_numpy(s).float(), epsilon)
        s_prime, r, done, truncated, info = env.step(a)
        done_mask = 0.0 if done else 1.0
        
        memory.put((s, a, r/100.0, s_prime, done_mask))
        # action and reward are plain scalars
        # memory.sample_test(1)
        # break
        
        s = s_prime
        score += r
        if done or truncated:
            break

    if memory.size() > 2000:
        train(10, q, q_target, memory, optimizer)

    if n_epi % target_update_interval == 0:
        q_target.load_state_dict(q.state_dict())

    if n_epi % print_interval == 0 and n_epi != 0:
        print("n_episode :{}, score : {:.1f}, n_buffer : {}, eps : {:.1f}%".format(n_epi, score, memory.size(), epsilon*100))

env.close()

n_episode :1000, score : 8.0, n_buffer : 50000, eps : 3.0%
n_episode :2000, score : 10.0, n_buffer : 50000, eps : 1.0%
n_episode :3000, score : 9.0, n_buffer : 50000, eps : 1.0%
n_episode :4000, score : 9.0, n_buffer : 50000, eps : 1.0%
n_episode :5000, score : 11.0, n_buffer : 50000, eps : 1.0%
n_episode :6000, score : 9.0, n_buffer : 50000, eps : 1.0%
n_episode :7000, score : 10.0, n_buffer : 50000, eps : 1.0%
