In [1]:
from datetime import datetime
from collections import deque
import os
import random
import gym
import torch
import copy
from torch.distributions import Categorical
import numpy as np



learning_rate_actor = 0.0005
learning_rate_critic = 0.0005
replay_memory_size = 500
minibatch_size = 35
discount = 0.99  # discount factor gamma
no_episodes = 200

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class QNetwork(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.fcQ1 = torch.nn.Linear(4, 256)
        self.fcQ2 = torch.nn.Linear(256, 256)
        self.fcQ3 = torch.nn.Linear(256, 2)
        
    def forward(self, x):
        x = self.fcQ1(x)
        x = torch.nn.functional.relu(x)
        x = self.fcQ2(x)
        x = torch.nn.functional.relu(x)
        x = self.fcQ3(x)
        
        return x

    
    
class PolicyNetwork(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.fcA1 = torch.nn.Linear(4, 256)
        self.fcA2 = torch.nn.Linear(256, 256)
        self.fcA3 = torch.nn.Linear(256, 2)
        
    def forward(self, x):
        x = self.fcA1(x)
        x = torch.nn.functional.relu(x)
        x = self.fcA2(x)
        x = torch.nn.functional.relu(x)
        x = self.fcA3(x)  
        x = torch.nn.functional.softmax(x, dim=-1)
        return x
    
# network and optimizer
pi = PolicyNetwork().to(device)
pi_target = copy.deepcopy(pi)
pi_optimizer = torch.optim.Adam(pi.parameters(), lr=learning_rate_actor)


Q = QNetwork().to(device)
Q_target = copy.deepcopy(Q)
Q_optimizer = torch.optim.Adam(Q.parameters(), lr=learning_rate_critic)


history = deque(maxlen=replay_memory_size)  # replay buffer



def update_Q():
    loss = 0

    for state, action, next_state, reward, done in random.sample(history, min(minibatch_size, len(history))):
        state=torch.FloatTensor(state).to(device)
        action=torch.tensor(action, dtype=torch.int8).to(device)
        next_state=torch.FloatTensor(next_state).to(device)
        reward = torch.FloatTensor([reward]).to(device)
        with torch.no_grad():
            if done:
                target = reward
            else:
                target = reward + discount * torch.dot(pi_target(next_state),Q_target(next_state))

        loss = loss + (target - Q(state)[action])**2

    loss = loss/min(minibatch_size, len(history))
    Q_optimizer.zero_grad()
    loss.backward()
    Q_optimizer.step()



def update_pi():
    loss = 0
    for state, action, state_next, reward, done in random.sample(history, min(minibatch_size, len(history))):
        state=torch.FloatTensor(state).to(device)
        action=torch.tensor(action, dtype=torch.int8).to(device)
        with torch.no_grad():
            A = Q(state)[action]
        loss = loss - A * pi(state)[action].log()
    pi_optimizer.zero_grad()
    loss.backward()
    pi_optimizer.step()    
    

# gym environment
env = gym.make("CartPole-v0")

torch.manual_seed(0)
np.random.seed(0)
    
scores = []
# training
for episode in range(no_episodes):
    # sum of accumulated rewards
    score = 0

    # get initial observation
    state = env.reset()
    
    done = False
    # loop until an episode ends
    while not done:

        probs = pi(torch.FloatTensor(state).to(device))
        action = torch.multinomial(probs, 1).item()
        
        next_state, reward, done, info = env.step(action)
        if done:
            reward = -10
            

        # collect reward
        score = score + reward

        # collect a transition
        history.append([state, action, next_state, reward, done])

        update_Q()
        update_pi()
        
        # Soft update
        for target_param, param in zip(Q_target.parameters(), Q.parameters()):
            target_param.data.copy_(param.data * 0.01 + target_param.data * (1.0 - 0.01))
        for target_param, param in zip(pi_target.parameters(), pi.parameters()):
            target_param.data.copy_(param.data * 0.01 + target_param.data * (1.0 - 0.01))

        if done:
            env.close()
            break

        # pass observation to the next step
        state = next_state
        
    scores.append(score)
    # compute average reward
    print('episode: {}, reward: {:.1f}'.format(episode, score))

env.close()



episode: 0, reward: 10.0
episode: 1, reward: -1.0
episode: 2, reward: 2.0
episode: 3, reward: 9.0
episode: 4, reward: 6.0
episode: 5, reward: 15.0
episode: 6, reward: 15.0
episode: 7, reward: 21.0


RuntimeError: CUDA error: device-side assert triggered

In [None]:
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
plt.plot(range(len(scores)), np.array(scores), 'b', linewidth = 2, label = 'REINFORCE with baseline')
plt.legend(prop={'size':12})
plt.xlabel('Episode')
plt.ylabel('Total rewards')
plt.xlim(0, no_episodes)
plt.grid(True)

In [None]:

# TEST     
episode = 0
state = env.reset()     
while episode < 5:  # episode loop
    env.render()
    state = torch.tensor(state, dtype=torch.float32).to(device)
    probs = pi(state)
    action = torch.multinomial(probs, 1).item()
    next_state, reward, done, info = env.step(action)  # take a random action
    state = next_state

    if done:
        episode = episode + 1
        state = env.reset()
env.close()     


