<a href="https://colab.research.google.com/github/nutov/03_BuildEscape/blob/master/TweenDelayedDDPG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [26]:
!pip install pybullet



In [0]:
import os
import time
import random
import numpy as np
import matplotlib.pyplot as plt
import pybullet_envs
import torch
import gym
import torch.nn as nn
import torch.nn.functional as F
from gym import wrappers
from torch.autograd import Variable


In [0]:
# Creating a Buffer 
class ReplayBuffer(object):
  def __init__(self,max_size = 1e6):
    self.storage = []
    self.max_size = max_size
    self.ptr = 0

  def add(self,transition):
    if len(self.storage) == self.max_size:
      self.storage[int(self.ptr)] = transition
      self.ptr = (self.ptr+1) % self.max_size
    else:
      self.storage.append(transition)

  def sample(self,batch_size):
    ind = np.random.randint(0,len(self.storage),size = batch_size)
    batch_states,batch_next_states,batch_actions,batch_rewards,batch_dones = [],[],[],[],[]
    for i in ind:
      state,next_state,action,reward,done = self.storage[i]
      batch_states.append(np.array(state,copy=False))
      batch_next_states.append(np.array(next_state,copy=False))
      batch_actions.append(np.array(action,copy=False))
      batch_rewards.append(np.array(reward,copy=False))
      batch_dones.append(np.array(done,copy=False))
      
    return np.array(batch_states),np.array(batch_next_states),np.array(batch_actions),np.array(batch_rewards).reshape(-1,1),np.array(batch_dones).reshape(-1,1)



In [0]:
class Actor(nn.Module):
  def __init__(self,state_dim,action_dim,max_action):
    super(Actor,self).__init__()
    self.layer_1 = nn.Linear(state_dim,400)
    self.layer_2 = nn.Linear(400,300)
    self.layer_3 = nn.Linear(300,action_dim)
    self.max_action = max_action
  
  def forward(self,x):
    x = F.relu(self.layer_1(x))
    x = F.relu(self.layer_2(x))
    x = self.max_action*torch.tanh(self.layer_3(x))
    return x


In [0]:
class Critic(nn.Module):
  def __init__(self,state_dim,action_dim,max_action):
    super(Critic,self).__init__()

    # defining the first critic
    self.layer_1 = nn.Linear(state_dim + action_dim,400)
    self.layer_2 = nn.Linear(400,300)
    self.layer_3 = nn.Linear(300,1)
    # defining the second critic
    self.layer_1 = nn.Linear(state_dim + action_dim,400)
    self.layer_2 = nn.Linear(400,300)
    self.layer_3 = nn.Linear(300,1)  

  def forward(self,x,u):
    xu = torch.cat([x,u],dim = 1)
    # forward pass first network
    x_1 = F.relu(self.layer_1(xu))
    x_1 = F.relu(self.layer_2(x_1))
    x_1 = self.layer_3(x_1)
    # forward pass second network
    x_2 = F.relu(self.layer_4(xu))
    x_2 = F.relu(self.layer_5(x_2))
    x_2 = self.layer_6(x_2)
    return x_1,x_2

  def Q1(self,x,u):
    xu = torch.cat([x,u],dim = 1)
    x_1 = F.relu(self.layer_1(xu))
    x_1 = F.relu(self.layer_2(x_1))
    x_1 = self.layer_3(x_1)
    return x_1

In [0]:
# selecting a device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Building the training process into a class
class TD3(object):
  def __init__(self,state_dim,action_dim,max_action,device = 'cpu'):
    self.device = device
    self.actor = Actor(state_dim,action_dim,max_action).to(device)
    self.actor_target = Actor(state_dim,action_dim,max_action).to(device)
    self.actor_target.load_state_dict(self.actor.state_dict())
    self.actor_optimizer = torch.optim.Adam(self.actor.parameters())
    # 
    self.critic = Critic(state_dim,action_dim).to(device)
    self.critic_target = Critic(state_dim,action_dim).to(device)
    self.critic_target.load_state_dict(self.critic.state_dict())
    self.actor_optimizer = torch.optim.Adam(self.critic.parameters())    

    self.max_action = max_action
  def select_action(self,states):
    state = torch.tensor(state.reshape(1,-1)).to(self.device)
    return self.actor(state).cpu().data.numpy().flatten()

  def train(self,replay_buffer,iteration,batch_size=100,discount=0.99,tau=0.005,policy_noise = 0.2,noise_clip=0.5,policy_freq = 2):

    for it in range(iteration):

      batch_states,batch_next_states,batch_actions,batch_rewards,batch_dones = replay_buffer.sample(batch_size)
      state = torch.Tensor(batch_states).to(self.device)
      next_state = torch.Tensor(batch_next_states).to(self.device)
      action = torch.Tensor(batch_actions).to(self.device)
      reward = torch.Tensor(batch_rewards).to(self.device)
      done = torch.Tensor(batch_dones).to(self.device)
      
      # from the next state the actor target plays the next action 
      next_action = self.actor_target(next_state)
      noise = torch.Tensor(batch_actions).data.normal_(0,policy_noise).to(self.device)
      noise = noise.clamp(-noise_clip,noise_clip)
      next_action = (next_action + noise).clamp(-self.max_action,self.max_action)
      
      traget_Q1,traget_Q2 = self.critic_target(next_state,next_action)
      target_Q = torch.min(traget_Q1,traget_Q2)

      target_Q = reward + (discount*(1-done)*target_Q).detach()
      current_Q1,current_Q2 = self.critic(state,action)

      critic_loss = F.mse_loss(current_Q1,target_Q)+F.mse_loss(current_Q2,target_Q)

      self.critic_optimizer.zero_grad()
      critic_loss.backward()
      self.critic_optimizer.step()
      
      if it % policy_freq == 0:
        actor_loss = -self.critic.Q1(state,self.actor(state)).mean()
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        for param,target_param in zip(self.actor.parameters(),self.actor_target.parameters()):
          target_param.data.copy_(tau*param.data + (1-tau)*target_param.data)

        for param,target_param in zip(self.critic.parameters(),self.critic_target.parameters()):
          target_param.data.copy_(tau*param.data + (1-tau)*target_param.data)        
