In [3]:
import cv2
import numpy as np
from rocket import Rocket

task = 'hover'  # 'hover' or 'landing'
max_episode = 5
max_steps = 800


import random
import torch
from collections import deque

alpha = 0.001
gamma = 0.99
lmbda         = 0.99
eps_clip      = 0.1
K_epoch       = 4

env = Rocket(task=task, max_steps=max_steps)


class VNetwork(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.fcV1 = torch.nn.Linear(8, 256)
        self.fcV2 = torch.nn.Linear(256, 256)
        self.fcV3 = torch.nn.Linear(256, 1)
        
    def forward(self, x):
        x = self.fcV1(x)
        x = torch.nn.functional.relu(x)
        x = self.fcV2(x)
        x = torch.nn.functional.relu(x)
        x = self.fcV3(x)
        return x
    
class PolicyNetwork(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.fcA1 = torch.nn.Linear(8, 256)
        self.fcA2 = torch.nn.Linear(256, 256)
        self.fcA3_thrust = torch.nn.Linear(256, 3)
        self.fcA3 = torch.nn.Linear(256, 9)
        
    def forward(self, x):
        x = self.fcA1(x)
        x = torch.nn.functional.relu(x)
        x = self.fcA2(x)
        x = torch.nn.functional.relu(x)
        x = self.fcA3(x)  
        x = torch.nn.functional.softmax(x, dim=-1)
        return x
    
# network and optimizer
pi = PolicyNetwork()
pi_optimizer = torch.optim.Adam(pi.parameters(), lr=alpha)
pi_target = PolicyNetwork()

V = VNetwork()
V_optimizer = torch.optim.Adam(V.parameters(), lr=alpha)  


def gen_episode():
    states = []
    actions = []
    rewards = []
    ratios = []
    state = env.reset() 
    terminated = False
    truncated = False
    while True:
        probs_target = pi_target(torch.FloatTensor(state))
        action = torch.multinomial(probs_target, 1).item()
        
        next_state, reward, terminated, _ = env.step(action) 
        #must add:
#         env.render()
        states.append(state)
        actions.append(action)
        rewards.append(reward)
        if terminated:
            break  
        
        state = next_state
    return states, actions, rewards



episode = 0
MAX_EPISODES = 500
reward_history =[]
reward_history_100 = deque(maxlen=100)

while episode < MAX_EPISODES:  # episode loop
    
    pi_target.load_state_dict(pi.state_dict())
    states, actions, rewards = gen_episode()
        
        
    for k in range(1,K_epoch):
        loss1 = 0
        loss2 = 0
        GAE = 0
        G = 0
        for t in range(len(states) - 2, -1, -1):
            S = states[t]
            A = actions[t]
            R = rewards[t]
            S_next = states[t+1]
            
            S=torch.FloatTensor(S)
            A=torch.tensor(A, dtype=torch.int8)
            S_next=torch.FloatTensor(S_next)
            
            with torch.no_grad():
                delta = R + gamma*V(S_next)-V(S)
                GAE = gamma * lmbda * GAE + delta             
                G = gamma * G + R
            
            ratio = pi(S)[A]/pi_target(S)[A]
            surr1 = ratio * (gamma**t)* GAE
            surr2 = torch.clamp(ratio, 1-eps_clip, 1+eps_clip) * (gamma**t)* GAE 
            loss1 = loss1 - torch.min(surr1, surr2)
            loss2 = loss2 + (G - V(S))**2
        loss2 = loss2/len(states)
            
        pi_optimizer.zero_grad()
        loss1.backward()
        pi_optimizer.step()
        
        V_optimizer.zero_grad()
        loss2.backward()
        V_optimizer.step() 
  
    reward_history.append(G)
    reward_history_100.append(G)
    avg = sum(reward_history_100) / len(reward_history_100)
    episode = episode + 1
    if episode % 10 == 0:
        print('episode: {}, Return: {:.1f}, avg: {:.1f}'.format(episode, G, avg))
    
    
    
    
    
    #############################################


# for episode in range(max_episode):
#     state = env.reset()
    
#     score = 0
#     rewards= []
    
#     for step in range(max_steps):
#         action = env.get_random_action()
#         state, reward, done, _ = env.step(action)
#         env.render()
        
#         score += reward
        
#         if done or step == max_steps-1:
#             rewards.append(score)
#             cv2.destroyAllWindows()
#             break

# average = np.mean(np.array(rewards))
# print(f'average rewards = {average}')

episode: 10, Return: 11.8, avg: 9.8
episode: 20, Return: 9.9, avg: 9.8
episode: 30, Return: 9.1, avg: 10.0
episode: 40, Return: 9.7, avg: 10.1
episode: 50, Return: 6.8, avg: 9.8


KeyboardInterrupt: 

In [None]:
from matplotlib import pyplot as plt
import numpy as np

## Plot objective vs. iteration
t = range(MAX_EPISODES)
plt.plot(t, np.array(reward_history), 'b', linewidth = 2, label = 'PPO')
plt.legend(prop={'size':12})
plt.xlabel('Episode')
plt.ylabel('Return')

In [None]:

import gymnasium as gym
import numpy as np
from IPython.display import clear_output

import random
import torch
from collections import deque

alpha = 0.001
gamma = 0.99
lmbda         = 0.95
eps_clip      = 0.1
K_epoch       = 4

class VNetwork(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.fcV1 = torch.nn.Linear(4, 256)
        self.fcV2 = torch.nn.Linear(256, 256)
        self.fcV3 = torch.nn.Linear(256, 1)
        
    def forward(self, x):
        x = self.fcV1(x)
        x = torch.nn.functional.relu(x)
        x = self.fcV2(x)
        x = torch.nn.functional.relu(x)
        x = self.fcV3(x)
        return x
    
class PolicyNetwork(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.fcA1 = torch.nn.Linear(4, 256)
        self.fcA2 = torch.nn.Linear(256, 256)
        self.fcA3 = torch.nn.Linear(256, 2)
        
    def forward(self, x):
        x = self.fcA1(x)
        x = torch.nn.functional.relu(x)
        x = self.fcA2(x)
        x = torch.nn.functional.relu(x)
        x = self.fcA3(x)  
        x = torch.nn.functional.softmax(x, dim=-1)
        return x
    
# network and optimizer
pi = PolicyNetwork()
pi_optimizer = torch.optim.Adam(pi.parameters(), lr=alpha)
pi_target = PolicyNetwork()

V = VNetwork()
V_optimizer = torch.optim.Adam(V.parameters(), lr=alpha)  


def gen_episode():
    states = []
    actions = []
    rewards = []
    ratios = []
    state, _ = env.reset() 
    terminated = False
    truncated = False
    while True:
        probs_target = pi_target(torch.FloatTensor(state))
        action = torch.multinomial(probs_target, 1).item()
        next_state, reward, terminated, truncated, info = env.step(action)  # take a random action
        states.append(state)
        actions.append(action)
        rewards.append(reward)
        if terminated or truncated:
            break  
        
        state = next_state
    return states, actions, rewards



env = gym.make('CartPole-v0')
episode = 0
MAX_EPISODES = 500
reward_history =[]
reward_history_100 = deque(maxlen=100)

while episode < MAX_EPISODES:  # episode loop
    
    pi_target.load_state_dict(pi.state_dict())
    states, actions, rewards = gen_episode()
        
        
    for k in range(1,K_epoch):
        loss1 = 0
        loss2 = 0
        GAE = 0
        G = 0
        for t in range(len(states) - 2, -1, -1):
            S = states[t]
            A = actions[t]
            R = rewards[t]
            S_next = states[t+1]
            
            S=torch.FloatTensor(S)
            A=torch.tensor(A, dtype=torch.int8)
            S_next=torch.FloatTensor(S_next)
            
            with torch.no_grad():
                delta = R + gamma*V(S_next)-V(S)
                GAE = gamma * lmbda * GAE + delta             
                G = gamma * G + R
            
            ratio = pi(S)[A]/pi_target(S)[A]
            surr1 = ratio * (gamma**t)* GAE
            surr2 = torch.clamp(ratio, 1-eps_clip, 1+eps_clip) * (gamma**t)* GAE 
            loss1 = loss1 - torch.min(surr1, surr2)
            loss2 = loss2 + (G - V(S))**2
        loss2 = loss2/len(states)
            
        pi_optimizer.zero_grad()
        loss1.backward()
        pi_optimizer.step()
        
        V_optimizer.zero_grad()
        loss2.backward()
        V_optimizer.step() 
  
    reward_history.append(G)
    reward_history_100.append(G)
    avg = sum(reward_history_100) / len(reward_history_100)
    episode = episode + 1
    if episode % 10 == 0:
        print('episode: {}, Return: {:.1f}, avg: {:.1f}'.format(episode, G, avg))
    
    