In [1]:
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import time
import numpy as np

In [2]:
#Hyperparameters
learning_rate = 0.0005
gamma         = 0.98
lmbda         = 0.95
eps_clip      = 0.1
K_epoch       = 2
T_horizon     = 20

In [5]:
class PPO(nn.Module):
    def __init__(self,input_dim,output_dim):
        super(PPO,self).__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.data = []
        
        self.layer_1 = nn.Linear(input_dim,64)
        self.lstm_layer = nn.LSTM(64,32)
        self.actor = nn.Linear(32,output_dim)
        self.critic = nn.Linear(32,1)
        
        self.optimizer = optim.Adam(self.parameters(),lr = learning_rate)
        
    def get_action(self,x,hidden):
        x = F.relu(self.layer_1(x))
        x = x.view(-1,1,64)
        x, lstm_hidden = self.lstm_layer(x,hidden)
        
        x = self.actor(x)
        prob = F.softmax(x,dim = 2)
        return prob, lstm_hidden
    
    def get_value(self,x,hidden):
        x = F.relu(self.layer_1(x))
        x = x.view(-1,1,64)
        x, lstm_hidden = self.lstm_layer(x,hidden)
        v = self.critic(x)
        return v
    
    def put_data(self,x):
        self.data.append(x)
    
    def make_batch(self):
        state_lst, action_lst,reward_lst,next_state_lst,prob_lst,hidden_in_lst,\
        hidden_out_lst, done_lst = [],[],[],[],[],[],[],[]
        for transition in self.data:
            state,action,reward,next_state, prob,hidden_in,hidden_out,done = transition
            
            state_lst.append(state)
            action_lst.append([action])
            reward_lst.append([reward])
            next_state_lst.append(next_state)
            prob_lst.append([prob])
            hidden_in_lst.append(hidden_in)
            hidden_out_lst.append(hidden_out)
            done_mask = 0 if done else 1
            done_lst.append([done_mask])
            
        state = torch.tensor(state_lst,dtype = torch.float)
        action = torch.tensor(action_lst)
        reward = torch.tensor(reward_lst)
        next_state = torch.tensor(next_state_lst,dtype = torch.float)
        prob = torch.tensor(prob_lst, dtype=torch.float)
        done_mask = torch.tensor(done_lst,dtype = torch.float)
        self.data = []
        return state,action,reward,next_state,prob,hidden_in_lst[0],hidden_out_lst[0],done_mask ##test

    def train(self):
        state,action,reward,next_state,prob,(hidden_in_1,hidden_in_2),\
        (hidden_out_1,hidden_out_2),done_mask = self.make_batch()
        
        first_hidden = (hidden_in_1.detach(),hidden_in_2.detach())
        second_hidden = (hidden_out_1.detach(),hidden_out_2.detach())
        for i in range(K_epoch):
            td_error = self.get_value(next_state,second_hidden).squeeze(1) ##test
            td_target = reward + gamma * td_error * done_mask
            td_value = self.get_value(state,first_hidden).squeeze(1)
            delta = td_target - td_value
            delta = delta.detach().numpy()
            
            advantage_lst = []
            advantage = 0.0
            for item in delta[::-1]:
                advantage = gamma * lmbda * advantage + item[0] ##test
                advantage_lst.append([advantage])
            advantage_lst.reverse()
            advantage = torch.tensor(advantage_lst, dtype = torch.float)
            
            action_prob, _ = self.get_action(state, first_hidden)
            action_selected = action_prob.squeeze(1).gather(1,action)
            ratio = torch.exp(torch.log(action_selected) - torch.log(prob))
            
            surr_1 = ratio * advantage
            surr_2 = torch.clamp(ratio, 1 - eps_clip, 1 + eps_clip)
            loss = -torch.min(surr_1,surr_2) + F.smooth_l1_loss(td_value,td_target.detach())
            self.optimizer.zero_grad()
            loss.mean().backward(retain_graph = True)
            self.optimizer.step()

In [9]:

env = gym.make('CartPole-v1')
model = PPO(4,2)
score = 0.0
print_interval = 20

for n_epi in range(10000):
    h_out = (torch.zeros([1, 1, 32], dtype=torch.float), torch.zeros([1, 1, 32], dtype=torch.float))
    state = env.reset()
    done = False
        
    while not done:
        for t in range(T_horizon):
            h_in = h_out
            prob, h_out = model.get_action(torch.from_numpy(state).float(), h_in)
            prob = prob.view(-1)
            m = Categorical(prob)
            action = m.sample().item()
            next_state, reward, done, info = env.step(action)

            model.put_data((state, action, reward/100.0, next_state, prob[action].item(), h_in, h_out, done))
            state = next_state

            score += reward
            if done:
                break
                    
        model.train()

    if n_epi%print_interval==0 and n_epi!=0:
        print("# of episode :{}, avg score : {:.1f}".format(n_epi, score/print_interval))
        score = 0.0

env.close()



# of episode :20, avg score : 21.1
# of episode :40, avg score : 18.9
# of episode :60, avg score : 19.6
# of episode :80, avg score : 19.6
# of episode :100, avg score : 27.1


KeyboardInterrupt: 