In [1]:
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np
import time
import random
from collections import namedtuple
from PIL import Image

import torchvision.transforms as T

In [2]:
env = gym.make('Pendulum-v0')
env.reset()
env._max_episode_steps = 200

In [3]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
dis = 0.98

In [4]:
device

device(type='cuda')

In [5]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))

class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, **kargs):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(**kargs)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        batches = random.sample(self.memory, batch_size)
        batch_state = torch.tensor([b.state for b in batches]).float().reshape(batch_size, -1).to(device)
        batch_next_state = torch.tensor([b.next_state for b in batches]).float().reshape(batch_size, -1).to(device)
        batch_action = torch.tensor([b.action for b in batches]).float().reshape(batch_size, -1).to(device)
        batch_reward = torch.tensor([b.reward for b in batches]).float().reshape(batch_size, -1).to(device)
        batch_done = torch.tensor([b.done for b in batches]).float().reshape(batch_size, -1).to(device)
        return batch_state, batch_action, batch_next_state, batch_reward, batch_done

    def __len__(self):
        return len(self.memory)

In [6]:
class Actor(nn.Module):
    def __init__(
        self,
        input_dim=env.observation_space.shape[0],
        output_dim=env.action_space.shape[0],
        hidden_list=[400,300],
        lr=1e-3
    ):
        super(Actor, self).__init__()
        
        layers = []
        for i in range(len(hidden_list)):
            if i == 0:
                layers.append(nn.Linear(input_dim, hidden_list[i]))
            else:
                layers.append(nn.Linear(hidden_list[i - 1], hidden_list[i]))
            layers.append(nn.ReLU())
        layers.append(nn.Linear(hidden_list[-1], output_dim))
        layers.append(nn.Tanh())
        
        self.layers = nn.Sequential(*layers)
        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        
    def forward(self, x):
        x = self.layers(x)
        return x
    
    def iteration(self, loss):
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
    
        
class Critic(nn.Module):
    def __init__(
        self, 
        input_dim=env.observation_space.shape[0] + env.action_space.shape[0],
        output_dim=1, 
        hidden_list=[400,300],
        lr=1e-2
    ):
        super(Critic, self).__init__()
        
        layers = []
        for i in range(len(hidden_list)):
            if i == 0:
                layers.append(nn.Linear(input_dim, hidden_list[i]))
            else:
                layers.append(nn.Linear(hidden_list[i - 1], hidden_list[i]))
            layers.append(nn.ReLU())
        layers.append(nn.Linear(hidden_list[-1], output_dim))

        self.layers = nn.Sequential(*layers)
        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        
    def forward(self, x):
        x = self.layers(x)
        return x

    def iteration(self, loss):
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        

In [7]:
def init_target_params(model, model_target):
    model_target.load_state_dict(model.state_dict())

In [8]:
def select_action(state, exploration=True, expl_noise=0.05):
    state = torch.tensor(state.reshape(1, -1)).float().to(device)
    action = actor(state).detach().item()
    
    if exploration:
        action = action + torch.normal(0, 2 * expl_noise, (1,)).item()
        
    return action
    

In [9]:
def smooth_target_params(model, model_target, tau=0.01):
    for param, target_param in zip(model.parameters(), model_target.parameters()):
        target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

In [10]:
def train(replay_buffer, batch_size=64):
    state, action, next_state, reward, done = replay_buffer.sample(batch_size)
    
    with torch.no_grad():
        target_Q = critic_target(torch.cat((next_state, actor_target(next_state)), 1))
    target_Q = reward + dis * target_Q
    current_Q = critic(torch.cat((state, action), 1))

    critic_loss = F.mse_loss(current_Q, target_Q)
    critic.iteration(critic_loss)
    
    actor_loss = -critic(torch.cat((state, actor(state)), 1)).mean()
    actor.iteration(actor_loss)
    
    smooth_target_params(critic, critic_target)
    smooth_target_params(actor, actor_target)
    

In [11]:
def train_episode(batch_size=64):
    num_steps = 200
    rAll = 0
    step = 0
    state = env.reset()
    for step in range(num_steps):
        action = select_action(state)
#         env.render()
        next_state, reward, done, _ = env.step([action])
        
        t = {
            'state': state,
            'action': action,
            'next_state': next_state,
            'reward': reward if not done else -100,
            'done': done,
        }
        memory.push(**t)
        
        state = next_state
        rAll += reward
        
        if done:
            break
        
        if len(memory) > batch_size:
            train(memory, batch_size)
            
    return rAll

In [12]:
critic, critic_target = Critic().to(device), Critic().to(device)
actor, actor_target = Actor().to(device), Actor().to(device)

init_target_params(actor, actor_target)
init_target_params(critic, critic_target)

actor.train()
critic.train()

actor_target.eval()
critic_target.eval()

memory = ReplayMemory(1000000)

rAll = 0.0
log_ep = 20
best_reward = -1000
for i in range(200):
    reward = train_episode()
    rAll += reward
    
    if i % log_ep == 0:
        print(f"""ep: {i}, reward: {rAll / log_ep}""")
        rAll = 0
    
    if reward > best_reward:
        best_reward = reward
        best_actor = actor
        best_critic = critic
        
    

ep: 0, reward: -79.81505016489174
ep: 20, reward: -1215.8482168461958
ep: 40, reward: -736.1754322956929
ep: 60, reward: -429.6742439005904
ep: 80, reward: -224.92266194842597
ep: 100, reward: -255.2165775515826
ep: 120, reward: -248.96378934890217
ep: 140, reward: -254.33592350094523
ep: 160, reward: -207.11657070793822
ep: 180, reward: -218.8788838936191


In [13]:
best_actor

Actor(
  (layers): Sequential(
    (0): Linear(in_features=3, out_features=400, bias=True)
    (1): ReLU()
    (2): Linear(in_features=400, out_features=300, bias=True)
    (3): ReLU()
    (4): Linear(in_features=300, out_features=1, bias=True)
    (5): Tanh()
  )
)

In [15]:
env._max_episode_steps = 500

In [16]:
state = env.reset()

for i in range(500):
    env.render()
    action = best_actor(torch.tensor(state).float().to(device)).detach().cpu().item()
    next_state, reward, done, _ = env.step([action])
    state = next_state
    