In [1]:
!pip install gym[atari]
!pip install gym[accept-rom-license]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import gym
import numpy as np
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from copy import deepcopy
import random
import cv2
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [4]:
class AtariEnv():
    def __init__(self, name):
        self.env = gym.make(name, render_mode="rgb_array")
        self.obs_dim = self.env.observation_space.shape[0]
        self.act_dim = self.env.action_space.n
        self.action_space = self.env.action_space
        self.n_buffer = 4
        self.buffer = None

    def pre_process(self, observation):
        '''
        State Preprocessing
        '''
        gray = cv2.cvtColor(observation, cv2.COLOR_BGR2GRAY)  
        reshaped = cv2.resize(gray, (84,110))
        cropped = reshaped[18:102,:]/255
        return np.expand_dims(cropped, 0)

    def get_state(self):
        if self.buffer == None:
            self.reset()
        return np.expand_dims(np.vstack(self.buffer), 0)

    def get_reward(self, observation, reward):
        return reward

    def reset(self, seed=None):
        if seed:
            observation = self.env.reset(seed)
        else:
            observation = self.env.reset()
        observation = self.pre_process(observation)
        self.buffer = [observation,]*self.n_buffer

    def step(self, action):
        observation, reward, terminated, _ = self.env.step(action)
        observation = self.pre_process(observation)
        self.buffer.pop(0)
        self.buffer.append(observation)
        next_state = self.get_state()
        if terminated:
            self.buffer = None
        return next_state, self.get_reward(observation, reward), terminated
    
    def render(self):
        return self.env.render()/255

In [5]:
class AtariLearner(nn.Module):
    def __init__(self, in_channels, act_dim):
        super().__init__()
        self.cnn1 = nn.Conv2d(in_channels=in_channels, out_channels=16, kernel_size=8, stride=4)
        h, w = self.calc_conv2d_output_dim(in_dim = (84,84), kernel_size=(8,8), stride=(4,4))
        self.bn1 = nn.BatchNorm2d(16)
        self.cnn2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=4, stride=2)
        h, w = self.calc_conv2d_output_dim(in_dim = (h,w), kernel_size=(4,4), stride=(2,2))
        self.bn2 = nn.BatchNorm2d(32)
        self.fcc1 = nn.Linear(h*w*32, 256)
        self.fcc2 = nn.Linear(256, act_dim)

    def calc_conv2d_output_dim(self, in_dim, kernel_size, padding=(0,0), dialation=(1,1), stride=(1,1)):
        h = (in_dim[0]+2*padding[0]-dialation[0]*(kernel_size[0]-1)-1)//stride[0]+1
        w = (in_dim[1]+2*padding[1]-dialation[1]*(kernel_size[1]-1)-1)//stride[1]+1
        return h, w

    def forward(self, x):
        x = F.relu(self.bn1(self.cnn1(x)))
        x = F.relu(self.bn2(self.cnn2(x)))
        x = x.view(x.size(0),-1)
        x = F.relu(self.fcc1(x))
        return self.fcc2(x)

In [6]:
class BatchReplayMemory():
    def __init__(self, n_buffer, max_len=10000):
        self.queue = []
        self.max_len = max_len
        self.n_buffer = n_buffer
    
    def push(self, data):
        self.queue.append(data)
        if len(self.queue)>self.max_len:
            self.queue.pop(0)
    
    def sample(self, batch_size):
        batch = []
        while len(batch)<batch_size:
            idx = random.sample(range(len(self.queue)),1)[0]
            if idx >= self.n_buffer:
                s, a, r, n, t = zip(*self.queue[idx-self.n_buffer:idx])
                interm_t = False
                for i in t[:-1]:
                    interm_t = interm_t or i
                    
                # Abrupt transitions should not be sampled
                if not interm_t:
                    batch.append((np.expand_dims(np.vstack(s),0),a[-1],r[-1],np.expand_dims(np.vstack(n),0),t[-1]))
        return batch
    
    def length(self):
        return len(self.queue)

In [7]:
# Constant Parameters
RENDER = False
GAMMA = 0.98 # Discount factor
UPDATE_INTERVAL = 1000 # Interval for target update
LR = 0.001 # AdamW learning rate
EPSILON_START = 0.9 # Annealing start
EPSILON_END = 0.05 # Annealing end
EXPLORATION_FRAMES = 1000000 # Annealing frames
BATCH_SIZE = 64 # Sampling size from memory
MEMORY_BUFFER = 50000 # Replay buffer size
EPISODES = 1000 # Number of episodes for training

environment = 'PongDeterministic-v4'
# environment, training policy, target policy
env = AtariEnv(environment)
policy = AtariLearner(env.n_buffer, env.act_dim).double().to(device)
target = AtariLearner(env.n_buffer, env.act_dim).double().to(device)
target.load_state_dict(policy.state_dict())
renv = deepcopy(env)

loss_fn = nn.SmoothL1Loss()
optimizer = optim.AdamW(policy.parameters(), lr=LR, amsgrad=True)

# Memory for Experience Replay
memory = BatchReplayMemory(env.n_buffer, MEMORY_BUFFER)
glob_frame = 0

  deprecation(
  deprecation(


In [8]:
def get_epsilon():
    # Linear Annealing
    return EPSILON_END + (EXPLORATION_FRAMES-glob_frame)*(EPSILON_START-EPSILON_END)/EXPLORATION_FRAMES \
        if glob_frame < EXPLORATION_FRAMES else EPSILON_END

def select_action(state, act_dim, eps=None):    
    if eps==None:
        eps = get_epsilon()
    # Epsilon-greedy exploration
    if np.random.uniform() < eps:
        return np.random.choice(act_dim)
    else:
        with torch.no_grad():
            policy.eval()
            q_sa = policy(torch.tensor(state, device=device))
        return torch.argmax(q_sa[0]).item()

def optimize_policy(samples):
    states, actions, rewards, next_states, terminals = zip(*samples)
    states = torch.tensor(np.vstack(states), device=device)
    actions = torch.tensor(np.vstack(actions), device=device)
    next_states = torch.tensor(np.vstack(next_states), device=device)
    policy.train()
    q_sa = policy(states).gather(1, actions).squeeze()
    with torch.no_grad():
        target.eval()
        q_nsa_max = target(next_states).max(1).values
    q_sa_target = [rewards[j]+GAMMA*q_nsa_max[j].item()*(1-terminals[j]) for j in range(len(rewards))]
    q_sa_target = torch.tensor(q_sa_target, device=device)
    # Optimize on the TD loss
    loss = loss_fn(q_sa, q_sa_target)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()            

def validate_policy():    
    renv.reset()
    done = False
    valid_reward = 0
    if RENDER:
        cv2.namedWindow(environment, cv2.WINDOW_NORMAL)
    # cv2.resizeWindow(environment, 300, 300)
    while not done:       
        state = renv.get_state()
        if RENDER:
            rgb = renv.render()
            cv2.imshow(environment, rgb)
            cv2.waitKey(10)
        action = select_action(state, renv.act_dim, EPSILON_END)
        _, reward, done = renv.step(action)
        valid_reward+=reward
    return valid_reward

In [None]:
max_possible_reward = 21
reward_increment = max_possible_reward/10
max_valid_reward = -21
reward_history = []
max_reward_target = max_valid_reward + reward_increment
for episode in range(EPISODES):
    # if max_valid_reward > max_possible_reward*0.98:
    #     RENDER = True
    valid_reward = validate_policy()
    print('Episode: ', episode, ' | Validation Reward: ', valid_reward, ' | Epsilon: ', get_epsilon())
    max_valid_reward = max(valid_reward,max_valid_reward)
    reward_history.append(valid_reward)

    # Save model when there is a performance improvement
    if max_valid_reward>max_reward_target:
        max_reward_target = min(max_possible_reward, max(max_reward_target,max_valid_reward)+reward_increment)-1    
        print('Performance Improvement!')    
        print('Episode: ', episode, ' | Max Validation Reward: ', max_valid_reward, ' | Epsilon: ', get_epsilon())
        torch.save(policy.state_dict(), path+'/'+environment+'/'+str(int(max_valid_reward))+'.dqn')
        if max_valid_reward==max_possible_reward:
            print('Best Model Achieved !!!')
            break
    
    # Default max episode steps is defined in Gym environments
    done = False
    while not done:       
        state = env.get_state()
        action = select_action(state, env.act_dim)
        next_state, reward, done = env.step(action)        
        glob_frame+=1

        memory.push((state[:,env.n_buffer-1,:,:], action, reward, next_state[:,env.n_buffer-1,:,:], done))
        if memory.length()<MEMORY_BUFFER*0.5:
            continue
        else:
            optimize_policy(memory.sample(BATCH_SIZE))

        if glob_frame%UPDATE_INTERVAL==0:
            target.load_state_dict(policy.state_dict())

  logger.deprecation(


Episode:  0  | Validation Reward:  -20.0  | Epsilon:  0.9
Episode:  1  | Validation Reward:  -21.0  | Epsilon:  0.8989545
Episode:  2  | Validation Reward:  -21.0  | Epsilon:  0.89792345
Episode:  3  | Validation Reward:  -20.0  | Epsilon:  0.89696635
Episode:  4  | Validation Reward:  -21.0  | Epsilon:  0.8961002
Episode:  5  | Validation Reward:  -21.0  | Epsilon:  0.8951737
Episode:  6  | Validation Reward:  -21.0  | Epsilon:  0.8940142999999999
Episode:  7  | Validation Reward:  -21.0  | Epsilon:  0.89303595
Episode:  8  | Validation Reward:  -21.0  | Epsilon:  0.8920431500000001
Episode:  9  | Validation Reward:  -21.0  | Epsilon:  0.8909254000000001
Episode:  10  | Validation Reward:  -21.0  | Epsilon:  0.89005755
Episode:  11  | Validation Reward:  -20.0  | Epsilon:  0.8888854
Episode:  12  | Validation Reward:  -21.0  | Epsilon:  0.88793425
Episode:  13  | Validation Reward:  -21.0  | Epsilon:  0.88703835
Episode:  14  | Validation Reward:  -21.0  | Epsilon:  0.8857336
Episode:

In [None]:
# RENDER = True
# validate_policy()

reward_history = np.array(reward_history)
smooth_reward_history = np.convolve(reward_history, np.ones(20)/20, mode='same')
import matplotlib.pyplot as plt
plt.plot(reward_history, label='Real')
plt.plot(smooth_reward_history, label='Smooth')
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.legend(loc='upper left')
plt.show()