In [1]:
import gym
import math
import collections
import random
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [2]:
class ReplayBuffer():
    def __init__(self):
        self.buffer = collections.deque(maxlen = 5000)
        self.minibatch_size = 32

    def append(self, state, action, reward, next_state, terminal):
        self.buffer.append([state, action, reward, next_state, terminal])

    def sample(self):
        mini_batch = random.sample(self.buffer, self.minibatch_size)
        s_lst, action, r_lst, s_prime_lst, done_mask_lst = map(list, zip(*mini_batch))
        return torch.FloatTensor(s_lst).to(device), torch.FloatTensor(action).to(device), torch.FloatTensor(r_lst).to(device), \
                torch.FloatTensor(s_prime_lst).to(device), torch.FloatTensor(done_mask_lst).to(device)
    
    def size(self):
        return len(self.buffer)

In [9]:
class Actor(nn.Module):
    def __init__(self,action_space, observation_space, max_action):
        super(Actor, self).__init__()
        self.actionNetwork = nn.Sequential(
            nn.Linear(observation_space[0], 256),
            nn.ReLU(),
            nn.Linear(256, 1024),
            nn.ReLU(),
            nn.Linear(1024, 128),
            nn.ReLU(),
            nn.Linear(128,action_space[0]),
            nn.Tanh()
        ).to(device)
        self.action_range = max_action[0]
        
    def forward(self, state):
        return self.actionNetwork(state) * self.action_range

    def select_action(self, state):
        action = self.forward(state)
        policy = torch.normal(action.detach(), 0.15)
        policy = torch.clamp(policy, max=self.action_range, min=self.action_range*(-1))
        return policy

    

class Critic(nn.Module):
    def __init__(self, observation_space):
        super(Critic, self).__init__()
        self.valueNetwork = nn.Sequential(
            nn.Linear(observation_space[0], 256),
            nn.ReLU(),
            nn.Linear(256, 1024),
            nn.ReLU(),
            nn.Linear(1024, 128),
            nn.ReLU(),
            nn.Linear(128,1)
        ).to(device)
        
    def forward(self, state):
        return self.valueNetwork(state)



class CARCLA():
    def __init__(self):
        super(CARCLA, self).__init__()
        self.env = gym.make('Pendulum-v1')
        self.actor = Actor(self.env.action_space.shape, self.env.observation_space.shape, self.env.action_space.high)
        self.actor_target = Actor(self.env.action_space.shape, self.env.observation_space.shape, self.env.action_space.high)
        self.critic = Critic(self.env.observation_space.shape)
        self.critic_target = Critic(self.env.observation_space.shape)
        self.actionOptimizer = optim.RMSprop(self.actor.parameters(), lr = 0.001)
        self.valueOptimizer = optim.RMSprop(self.critic.parameters(), lr = 0.001)
        self.criticLoss = nn.MSELoss()
        self.replay_buffer = ReplayBuffer()
        self.tau = 0.001
        self.discount = 0.9
        self.num_replay = 15
        self.last_state = None
        self.last_action = None
        self.bestAvgReward = 0
        print("action space : ",self.env.action_space.shape)
        print("action range : ",self.env.action_space.low, self.env.action_space.high)
        
    def train(self, epi):
        self.last_state = self.env.reset()
        totalReward = 0
        count = 0
        
        
        while True:
            # self.env.render()
            action = self.actor.select_action(torch.FloatTensor(self.last_state).to(device))
            state, reward, done, _= self.env.step(action.detach().cpu().numpy())
            count += 1
            totalReward += reward
            
            # self.optimize_test(torch.FloatTensor(state).to(device), torch.FloatTensor(self.last_state).to(device), action.detach(), reward, done)
            

            self.replay_buffer.append(self.last_state, action.detach().cpu().numpy(), reward, state, done)
            if self.replay_buffer.size()>self.replay_buffer.minibatch_size:
                for _ in range(self.num_replay):
                    self.optimize_network()

            if done:
                break
            self.last_state = state
        if((epi+1)%10 == 0):
            print(f'Epi : {epi}   Avg reward : {totalReward/count}')
            
        # if(self.bestAvgReward < totalReward/count):
        #     torch.save(model.actor.state_dict(), './checkpoint/CARCLA_best.pt')
        #     print("Save Best")

        torch.cuda.empty_cache()
        
            
    def soft_update(self, local_model, target_model, tau):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)
            
    def optimize_network(self):
        states, actions, rewards, next_states, terminals = self.replay_buffer.sample()
        q_next_mat = self.critic_target(next_states).view(-1)
        targetQ = rewards + q_next_mat*(1-terminals)*self.discount
    
        self.valueOptimizer.zero_grad()
        q_mat = self.critic(states).view(-1)
        valueLoss = self.criticLoss(q_mat,targetQ)
        valueLoss.backward()
        self.valueOptimizer.step()
        
        
        policyUpdateIdx = targetQ - q_mat > 0
        policy_evaluation = self.criticLoss(actions[policyUpdateIdx] ,self.actor.forward(states[policyUpdateIdx]))
        self.actionOptimizer.zero_grad()
        policy_evaluation.backward()
        self.actionOptimizer.step()
        
        self.soft_update(self.critic, self.critic_target, self.tau)
        self.soft_update(self.actor, self.actor_target, self.tau)

    def optimize_test(self, state, last_state, action, reward, terminal):
        targetQ = reward + self.critic(last_state)*(1-terminal)*self.discount
    
        self.valueOptimizer.zero_grad()
        q_mat = self.critic(state)

        valueLoss = self.criticLoss(q_mat,targetQ.detach())
        valueLoss.backward()
        self.valueOptimizer.step()
        
        if(targetQ - q_mat > 0):
            policy_evaluation = self.criticLoss(action,self.actor.forward(last_state))
            self.actionOptimizer.zero_grad()
            policy_evaluation.backward()
            self.actionOptimizer.step()


        
        

In [10]:
model = CARCLA()

for epi in range(1000):
    model.train(epi)


action space :  (1,)
action range :  [-2.] [2.]
Epi : 9   Avg reward : -6.662401054452123
Epi : 19   Avg reward : -7.330152156189403
Epi : 29   Avg reward : -6.809343320367472
Epi : 39   Avg reward : -7.334382747124969
Epi : 49   Avg reward : -7.660936847221068
Epi : 59   Avg reward : -8.398960498438935
Epi : 69   Avg reward : -7.752805245936583
Epi : 79   Avg reward : -7.718644521221913
Epi : 89   Avg reward : -6.8839596818333835
Epi : 99   Avg reward : -6.3218540782422075


In [43]:
env = gym.make('Pendulum-v1') 
state = env.reset()
i = 0
model.eval()

while True:
    env.render()
    action = model.actor.select_action(torch.FloatTensor(state).to(device))
    state, reward, done, _= env.step(action.detach().cpu().numpy())
    i+=1

  "We recommend you to use a symmetric and normalized Box action space (range=[-1, 1]) "


KeyboardInterrupt: 

In [51]:
print(1)

1


In [53]:
torch.save(model.actor.state_dict(), './checkpoint/CARCLA_best.pt')

In [59]:
newmodel = CARCLA()
newmodel.actor.load_state_dict(torch.load('./checkpoint/CARCLA_best.pt'))
env = gym.make('Pendulum-v1') 
state = env.reset()
i = 0

while True:
    env.render()
    action = model.actor.select_action(torch.FloatTensor(state).to(device))
    state, reward, done, _= env.step(action.detach().cpu().numpy())
    i+=1

action space :  (1,)


KeyboardInterrupt: 