In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Normal, Categorical
import numpy as np

In [None]:
#Actor network class created below

class Actor(nn.Module):
    def __init__(self, in_dim: int, out_dim: int,):
        
        self.in_dim=in_dim
        self.out_dim=out_dim
        
        super(Actor, self).__init__()

        self.hidden_one = nn.Linear(in_dim, 150)
        self.hidden_two = nn.Linear(150,150)
        self.mu_layer = nn.Linear(150, out_dim)
        #self.log_std_layer = nn.Linear(150, out_dim)

    def forward(self, state: torch.Tensor) -> torch.Tensor:
        
        x = F.tanh(self.hidden_one(state))
        x = F.tanh(self.hidden_two(x))
        
        mu = torch.tanh(self.mu_layer(x))
        #log_std = torch.tanh(self.log_std_layer(x))

        #std = torch.exp(log_std)
        std = torch.ones((1,self.out_dim), dtype=torch.float64)
        std = std.new_full((1, self.out_dim), 0.5)
        dist = Normal(mu, std)
        action = dist.sample()

        return action, dist
    
#Critic network class created below    

class Critic(nn.Module):
    def __init__(self, in_dim: int):
        super(Critic, self).__init__()

        self.hidden_one = nn.Linear(in_dim, 120)
        self.hidden_two = nn.Linear(120,120)
        self.out = nn.Linear(120, 1)

    def forward(self, state: torch.Tensor) -> torch.Tensor:

        x = F.relu(self.hidden_one(state))
        x = F.relu(self.hidden_two(x))
        value = self.out(x)

        return value

In [None]:
#Function calculates the gae based on immediate reward and state values 

def get_gae(rewards: list, values: list, is_terminals: list, gamma: float, lamda: float,):
    
    gae = 0
    returns = []
    
    for i in reversed(range(len(rewards))):
        delta = (rewards[i] + gamma * values[i + 1] * is_terminals[i] - values[i])
        gae = delta + gamma * lamda * is_terminals[i] * gae
        returns.insert(0, gae + values[i])

    return returns


#Function looks at all of the states, actions, returns, etc. and makes training batches

def trajectories_data_generator(states: torch.Tensor,actions: torch.Tensor,
                                returns: torch.Tensor,log_probs: torch.Tensor,
                                values: torch.Tensor,advantages: torch.Tensor,
                                batch_size, num_epochs,):

    data_len = states.size(0)
    for _ in range(num_epochs):
        for _ in range(data_len // batch_size):
            ids = np.random.choice(data_len, batch_size)
            yield states[ids, :], actions[ids], returns[ids], log_probs[ids], values[ids], advantages[ids]


In [None]:
class Environment:
    def __init__(self, x=50.0,y=50.0,z=50.0):
        self.x=x
        self.y=y
        self.z=z
        self.x_reset=x
        self.y_reset=y
        self.z_reset=z
        self.f=self.calculate_f()
        self.counter=0
        self.done=False
        
    
    def reset(self):
        self.x=self.x_reset
        self.y=self.y_reset
        self.z=self.z_reset
        self.f=self.calculate_f()
        self.counter=0
        self.done=False
        observation=np.array([self.x/100.0,self.y/100.,self.z/100.0])
        return(observation)
    
    def step (self,action):
        action_x=action[0][0]*5.0
        action_y=action[0][1]*5.0
        action_z=action[0][2]*5.0
        
        self.x+=action_x
        self.y+=action_y
        self.z+=action_z
                
        f_=self.calculate_f()
        reward= np.float64((self.f-f_)/150)
        self.f=f_
        self.counter +=1
        if self.counter>=150:
            self.done=True
        observation=np.array([self.x/100.0,self.y/100.0,self.z/100.0])
        
        return observation,reward,self.done
    
    def calculate_f(self):
        f=(self.x-4.0)**2.0+(self.y-8.0)**2.0+(self.z-9.0)**2.0
        return f
    
    def return_f_value(self,state):
        x=state[0][0]*100
        y=state[0][1]*100
        z=state[0][2]*100
        f=np.array([(x-4.0)**2.0+(y-8.0)**2.0+(z-9.0)**2])
        return f
        
    

    
        
        
        

In [None]:
class Memory:
    def __init__(self):
        self.states = []
        self.actions = []
        self.rewards = []
        self.is_terminals = []
        self.log_probs = []
        self.values = []

    def clear_memory(self):
        self.states = []
        self.actions = []
        self.rewards = []
        self.is_terminals = []
        self.log_probs = []
        self.values = []
        
 

In [None]:
class PPOAgent(object):
    def __init__(self, obs_dim=3, act_dim =3, gamma =0.99,lamda =0.9,
                 entropy_coef =0.01,epsilon =0.2, value_range =0.2,
                 rollout_len =150, total_rollouts =300, num_epochs =10,
                 batch_size =30,is_evaluate =False, solved_reward = None,
                 actor_lr =0.001, critic_lr =0.001):
        

        
        self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
        self.env = Environment()
        
        self.gamma=gamma
        self.lamda=lamda
        self.entropy_coef=entropy_coef
        self.epsilon=epsilon
        self.value_range=value_range
        
        self.rollout_len=rollout_len
        self.total_rollouts=total_rollouts
        self.num_epochs=num_epochs
        self.batch_size=batch_size
        
        self.obs_dim=obs_dim
        self.act_dim=act_dim
        self.actor_lr=actor_lr
        self.critic_lr=critic_lr
        
        self.actor=Actor(self.obs_dim,self.act_dim)
        self.critic=Critic(self.obs_dim)
        self.actor_optimizer=optim.Adam(self.actor.parameters(),lr=self.actor_lr)
        self.critic_optimizer=optim.Adam(self.critic.parameters(),lr=self.critic_lr)
            
        # Memory for the trajectory
        self.memory=Memory()
            
        # Memory of the train history to see how agent is improving
        self.actor_loss_history=[]
        self.critic_loss_history=[]
        self.scores=[]
            
        self.is_evaluate=is_evaluate
        self.solved_reward=solved_reward
        
    def _get_action(self,state):
        
        state = torch.FloatTensor(state).to(self.device)
        action,dist=self.actor.forward(state)
        #print('action',action)
            
        if not self.is_evaluate:
            value=self.critic.forward(state)
            #print('value',value)
                
        #Store trajectory in memory class
        self.memory.states.append(state)
        self.memory.actions.append(action)
        self.memory.log_probs.append(dist.log_prob(action))
        self.memory.values.append(value)
        #print('self memory states',self.memory.states)
        #print('self memory actions', self.memory.actions)
        #print('self memory log probs', self.memory.log_probs)
        #print('self memory values', self.memory.values)
        
        return action
    
    def _step(self, action):
        next_state, reward, done = self.env.step(action)
        #print('next_state',next_state,next_state.shape)
        #print('reward', reward, reward.shape)
        #print('done',done)


        # add fake dim to match dimension with batch size
        next_state = np.reshape(next_state, (1, -1)).astype(np.float64)
        reward = np.reshape(reward, (1, -1)).astype(np.float64)
        done = np.reshape(done, (1, -1))

        if not self.is_evaluate:
            self.memory.rewards.append(torch.FloatTensor(reward).to(self.device))
            self.memory.is_terminals.append(torch.FloatTensor(1 - done).to(self.device))
        
        #print('self.memory.rewards',self.memory.rewards)
        #print('self.memory.is_terminal', self.memory.is_terminals)

        return next_state, reward, done
    
    
    def _update_weights(self):
        
        returns = get_gae(self.memory.rewards, self.memory.values,self.memory.is_terminals,
                          self.gamma,self.lamda,)

        # flattening a list of torch.tensors into vectors
        states = torch.cat(self.memory.states).view(-1, self.obs_dim)
        actions = torch.cat(self.memory.actions)
        returns = torch.cat(returns).detach()
        log_probs = torch.cat(self.memory.log_probs).detach()
        values = torch.cat(self.memory.values).detach()
        advantages = returns - values[:-1]

        for state, action, return_, old_log_prob, old_value, advantage in trajectories_data_generator(
            states=states,actions=actions, returns=returns,log_probs=log_probs,values=values,advantages=advantages,
            batch_size=self.batch_size,num_epochs=self.num_epochs,):

            # compute ratio (pi_theta / pi_theta__old)
            _, dist = self.actor(state)
            cur_log_prob = dist.log_prob(action)
            ratio = torch.exp(cur_log_prob - old_log_prob)

            # compute entropy
            entropy = dist.entropy().mean()

            # compute actor loss
            loss =  advantage * ratio
            clipped_loss = (torch.clamp(ratio, 1. - self.epsilon, 1. + self.epsilon)
                            * advantage)
            actor_loss = (-torch.mean(torch.min(loss, clipped_loss))
                          - entropy * self.entropy_coef)
            
            # critic loss, uncoment for clipped value loss too.
            cur_value = self.critic(state)
            #clipped_value = (
            #    old_value + torch.clamp(cur_value - old_value,
            #                            -self.value_range, self.value_range)
            #   )
            #loss = (return_ - cur_value).pow(2)
            #clipped_loss = (return_ - clipped_value).pow(2)
            #critic_loss = torch.mean(torch.max(loss, clipped_loss))

            critic_loss = (return_ - cur_value).pow(2).mean()

            # actor optimizer step
            self.actor_optimizer.zero_grad()
            actor_loss.backward()
            self.actor_optimizer.step()

            # critic optimizer step
            self.critic_optimizer.zero_grad()
            critic_loss.backward()
            self.critic_optimizer.step()
            
            self.memory.clear_memory()

In [None]:
obs_dim=3
act_dim=3
gamma=0.95
lamda=0.95
entropy_coef=0.005
epsilon=0.2
value_range=0.2
rollout_len=150
total_rollouts=500
num_epochs=15
batch_size=50
is_evaluate=False
solved_reward = None
actor_lr = 1e-3
critic_lr = 1e-3

ppo_agent=PPOAgent(obs_dim,act_dim,gamma,lamda,entropy_coef,epsilon,value_range,
                   rollout_len,total_rollouts,num_epochs,batch_size,is_evaluate,
                   solved_reward,actor_lr,critic_lr)

In [None]:
# Training Loop

score = 0
state = ppo_agent.env.reset()
state = np.reshape(state, (1, -1))
ending_states=[]

for rollout in range(ppo_agent.total_rollouts):
    for step in range(ppo_agent.rollout_len):
        action = ppo_agent._get_action(state)
        next_state, reward, done = ppo_agent._step(action)
        state = next_state
        score += reward[0][0]
        
        if done[0][0]:
            ppo_agent.scores.append(score)
            ending_state=ppo_agent.env.return_f_value(state)
            print('Rollout Number: ', rollout, ' Total Rewards: ', np.round(score,1), ' x:',
                  np.round(state[0][0]*100,1) ,' y: ', np.round(state[0][1]*100,1),' z: ', 
                  np.round(state[0][2]*100,1) , ' f: ', np.round(ending_state,1))
            ending_states.append(ending_state)
            score = 0
            state = ppo_agent.env.reset()
            state = np.reshape(state, (1, -1))

    if ppo_agent.solved_reward is not None:
        if np.mean(ppo_agent.scores[-10:]) > self.solved_reward:
            print("Congratulations, it's solved!")
            break

    value = ppo_agent.critic.forward(torch.FloatTensor(next_state))
    ppo_agent.memory.values.append(value)
    ppo_agent._update_weights()

#print(ppo_agent.scores)
#print(ending_states)