In [1]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"

In [2]:
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F

import numpy as np
from collections import deque, namedtuple
import random


In [3]:
env = gym.make('CartPole-v1')

class QNet(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(QNet, self).__init__()
        self.fc1 = nn.Linear(s_size, h_size)
        self.fc2 = nn.Linear(h_size, a_size)

    # Forward Propagation
    def forward(self, x):        
        x = torch.Tensor(x)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x
    
    # Select best action and corresponding Q-value
    def act(self, state):
        pred = self.forward(state)
        result = pred.squeeze().argmax().item()
        return result

net_test = QNet(4,2,64)
state = env.reset()
net_test.forward(env.reset()), net_test.act(state)


(tensor([ 0.0153, -0.4431], grad_fn=<AddBackward0>), 0)

In [4]:
Sequence = namedtuple("Sequence",('state','action','reward','next_state','done'))
class ReplayMemory:
    def __init__(self, capacity):
        self.capacity = capacity        
        self.memory = deque(maxlen=self.capacity)

    def push(self, *args):
        self.memory.append(Sequence(*args))
    
    def sample(self, batch_size):
        experiences = random.sample(self.memory, batch_size)
        return Sequence(*zip(*experiences))

done = False
mem_test = ReplayMemory(500)
state = env.reset()
while not done:
    action = env.action_space.sample()
    next_state,reward,done,_ = env.step(action)
    mem_test.push(state,action,reward,next_state,done)
    state = next_state
    
batch = mem_test.sample(5)
print(batch.state)

(array([ 0.05714364,  0.5655521 , -0.10201085, -0.9005526 ], dtype=float32), array([-0.04364025,  0.17305994,  0.04624137, -0.26197276], dtype=float32), array([ 0.11381183,  0.18757986, -0.20426363, -0.5935359 ], dtype=float32), array([ 0.04975789,  0.36928758, -0.0903945 , -0.58081794], dtype=float32), array([ 0.10621776,  0.37970385, -0.18782851, -0.8217558 ], dtype=float32))


In [5]:
def calc_loss(batch, net, GAMMA):
    s = torch.FloatTensor(batch.state)
    r = torch.Tensor(batch.reward)    
    a = torch.Tensor(batch.action).to(torch.int64)    
    next_s = torch.FloatTensor(batch.next_state)
    m = 1-torch.Tensor(batch.done).to(torch.int64)

    target = r + GAMMA * net.forward(next_s).amax(1) * m
    target = target.view(1,-1)
    pred = net.forward(s).gather(1,a.view(-1,1)).view(1,-1)    
    
    loss = (target-pred).pow(2) ## COMPLETE THIS LINE
    return loss.sum()


In [6]:
def dqn_replay_memory(GAMMA, EPS_START, EPS_END, EPS_DECAY, NUM_EPI, BATCH_SIZE):
    # initialize replay memory
    memory = ReplayMemory(1000) 
    # initialize Q-network
    net = QNet(4,2,32)
    optimizer = torch.optim.Adam(net.parameters())
    # for episodes 1,M do
    avg_rewards = deque(maxlen=50)

    for epi in range(NUM_EPI+1):
        # init sequence
        state = env.reset()
        done = False
        reward_list = []

        # iterate inside the sequence
        # calculate EPS
        EPS = EPS_END + (EPS_START - EPS_END) * np.exp(-1. * epi / EPS_DECAY)
        while not done:
            with torch.no_grad():
            # with prob EPS select random action
                if np.random.random() < EPS:
                    action = env.action_space.sample()
                else: # otherwise select action w.r.t policy
                    action = net(state).squeeze().argmax().item() # Complete this code
                
            # proceed
            next_state, reward, done, _ = env.step(action)
            reward_list.append(reward)
            
            # store transition in memory
            memory.push(state,action,reward,next_state,done) # Complete this code
            
            # sample random minibatch from memory
            if len(memory.memory)<BATCH_SIZE:
                continue
            else:
                batch = memory.sample(BATCH_SIZE) # Complete this code
            # calculate loss
            loss=calc_loss(batch, net, GAMMA)

            # perform gradient descent step
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # update state
            state = next_state
            
        # insert avg
        avg_rewards.append(sum(reward_list))
        if epi%50 == 0:
            print(epi, np.mean(avg_rewards), EPS)

    return net




discount_rate = 0.95
eps_start = 0.99
eps_end = 0.02
eps_decay = 200
num_epi = 1000
batch_size =128
result = dqn_replay_memory(   
    discount_rate, eps_start, eps_end, eps_decay, num_epi, batch_size)

0 12.0 0.99


  s = torch.FloatTensor(batch.state)


50 23.0 0.7754367595792627
100 28.92 0.6083347399212544
150 39.9 0.47819555615878423
200 115.0 0.3768430579362991
250 237.86 0.2979096529543844
300 305.08 0.23643625534397691
350 192.06 0.18856072514693178
400 347.46 0.1512752247395143
450 170.3 0.1222372478250084
500 152.76 0.09962244866518184
550 276.22 0.08201002537050635
600 132.52 0.06829345631682802
650 177.98 0.05761098159677035
700 185.72 0.04929146191964895
750 203.38 0.042812213480328834
800 102.2 0.037766169722072154
850 105.2 0.03383630689172928
900 113.32 0.030775726642095037
950 259.0 0.028392144347027016
1000 136.68 0.026535808589112905


In [9]:
# Test cartpole
total_rewards = 0
state = env.reset()
done = False
while not done:
    action = result.act(state)    
    new_state,reward,done,_ = env.step(action)
    total_rewards += reward
    state = new_state
    env.render()
    
print(total_rewards)
env.close()
