DDPG Pendulum-v0 environment

- No batch normalization
- Random Gaussian parameter noise (Not using action noise)

In [None]:
from unity_wrappers import unity_env_generator
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np
import time
import random
import logging

logger = logging.getLogger("mlagents.envs")
logger.disabled = True

In [None]:
class DDPG_Mu(nn.Module):
    def __init__(self):
        super(DDPG_Mu, self).__init__()
        self.fc1 = nn.Linear(9, 512)
        self.fc_mu = nn.Linear(512, 3)
        self.optimizer = optim.Adam(self.parameters(), lr=0.0001)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        mu = torch.tanh(self.fc_mu(x))*2
        return mu
    
    def train(self, loss):
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
class DDPG_Q(nn.Module):
    def __init__(self):
        super(DDPG_Q, self).__init__()
        self.fc_a = nn.Linear(3, 128)
        self.fc_s = nn.Linear(9, 128)
        self.fc_1 = nn.Linear(256, 256)
        self.fc_q = nn.Linear(256, 1)
        self.optimizer = optim.Adam(self.parameters(), lr=0.001)
    
    def forward(self, x, a):
        x1 = F.relu(self.fc_a(a))
        x2 = F.relu(self.fc_s(x))
        x = torch.cat([x1, x2], dim=1)
        x = F.relu(self.fc_1(x))
        q = self.fc_q(x)
        return q

    
    def train(self, loss):
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

In [None]:
env = unity_env_generator("Drone")
Q, Q_p, Mu, Mu_p = DDPG_Q(), DDPG_Q(), DDPG_Mu(), DDPG_Mu()
GAMMA = 0.99
BATCH_SIZE = 32
BUFFER_SIZE = 30000
replay_buffer = []
TAU = 0.01
PARAMETER_NOISE_COEF = 0.0005
EPOCH = 10

In [None]:
def training():
    for i in range(EPOCH):
        s, a, r, s_p, done_mask = make_minibatch()

        target = r + GAMMA*Q_p(s_p, Mu_p(s_p))*done_mask
        critic_loss = F.smooth_l1_loss(Q(s,a), target.detach())
        Q.train(critic_loss)
        
        actor_loss = -Q(s, Mu(s)).mean()
        Mu.train(actor_loss)
        
        soft_target_update(Mu, Mu_p)
        soft_target_update(Q, Q_p)
    
def soft_target_update(model, model_p):
    for param_target, param in zip(model_p.parameters(), model.parameters()):
        param_target.data.copy_(param_target.data*(1.0 - TAU) + param.data*TAU)
        
def init_target_param(model, model_p):
    for param_target, param in zip(model_p.parameters(), model.parameters()):
        param_target.data.copy_(param.data)
        
def parameter_noise(model):
    with torch.no_grad():
        for param in model.parameters():
            param.add_(torch.randn(param.size()) * PARAMETER_NOISE_COEF)
            
def store_transition(s, a, r, s_prime, done):
    if len(replay_buffer) == BUFFER_SIZE:
        del(replay_buffer[0])
    s = s.unsqueeze(0)
    a = a.unsqueeze(0)
    r = torch.tensor([r], dtype=torch.float).unsqueeze(0)
    s_prime = s_prime.unsqueeze(0)
    replay_buffer.append((s, a, r, s_prime, done))
    
def make_minibatch():
    s_list, r_list, a_list, s_p_list, done_list = [], [], [], [], []
    mini_batch = random.sample(replay_buffer, BATCH_SIZE)
    for sample in mini_batch:
        s, a, r, s_p, done = sample
        s_list.append(s)
        a_list.append(a)
        r_list.append(r)
        s_p_list.append(s_p)
        done_list.append([0]) if done else done_list.append([1])
    return torch.cat(s_list, dim=0), torch.cat(a_list, dim=0), torch.cat(r_list, dim=0), torch.cat(s_p_list, dim=0),\
            torch.tensor(done_list, dtype=torch.float).reshape(-1,1)

In [None]:
reward_sum = 0.0
reward_list = []
init_target_param(Mu, Mu_p)
init_target_param(Q, Q_p)

for ep in range(20000):
    observation = env.reset()
    while True:
        state = torch.tensor(observation, dtype=torch.float)
        parameter_noise(Mu) ## for exploration
        action = Mu(state).detach() ## must .detach!! <- important!!!
        observation, reward, done, _ = env.step(action.numpy())
        reward_sum += reward
        next_state = torch.tensor(observation, dtype=torch.float)
        store_transition(state, action, reward, next_state, done)   
        if done:
            break
            
    if len(replay_buffer) >= 500:
        training()
            
    if ep % 20 == 19:
        print('Episode %d'%ep,', Reward mean : %f'%(reward_sum/20.0))
        reward_list.append(reward_sum/20.0)
        #plt.plot(reward_list)
        #plt.show()
        reward_sum = 0.0
        
env.close()

Learning curve

mean reward every 20 episodes

In [None]:
plt.plot(reward_list)
plt.show()

Test rendering

In [None]:
for ep in range(10):
    observation = env.reset()
    while True:
        state = torch.tensor(observation, dtype=torch.float)
        action = Mu(state)
        observation, reward, done, _ = env.step(action.numpy())
        if done: break
env.close()

Reference

- https://github.com/seungeunrho/minimalRL/blob/master/ddpg.py
- https://github.com/l5shi/Multi-DDPG-with-parameter-noise/blob/master/Multi_DDPG_with_parameter_noise.ipynb
- https://arxiv.org/abs/1706.01905
- https://openai.com/blog/better-exploration-with-parameter-noise/