In [5]:
import gymnasium as gym
# import Lorenz_envs
import matplotlib.pyplot as plt
import numpy as np
import time
import torch
import warnings
import copy 
import wandb
import le_envs

from IPython import display
from torch.distributions import MultivariateNormal
from torch.optim import Adam
from torch.nn import Linear, Module, MSELoss, ReLU, Sequential

warnings.filterwarnings("ignore")

In [6]:
class PolicyNetwork(Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        self.model = Sequential(
            Linear(input_dim, 64),
            ReLU(),
            Linear(64, 64),
            ReLU(),
            Linear(64, output_dim)
        )
    
    def forward(self, observation):
        if isinstance(observation, np.ndarray):
            observation = torch.tensor(observation, dtype=torch.float)
        return self.model(observation)

In [7]:
class RewardStrategy:
    def __init__(self, gamma, gae_lambda):
        self.gamma = gamma
        self.gae_lambda = gae_lambda

class RewardsToGo(RewardStrategy):
    def __init__(self, gamma, gae_lambda):
        super().__init__(gamma, gae_lambda)

    def compute_advantages(self, rewards, values, masks):
        batch_rtgs = []
        for episode_rewards in reversed(rewards):
            discounted_reward = 0
            for reward in reversed(episode_rewards):
                discounted_reward = reward + discounted_reward * self.gamma
                batch_rtgs.insert(0, discounted_reward)
        
        batch_rtgs = torch.tensor(batch_rtgs, dtype=torch.float)
        adv = batch_rtgs - values
        return batch_rtgs, adv

class GeneralizedAdvantage(RewardStrategy):
    def __init__(self, gamma, gae_lambda):
        super().__init__(gamma, gae_lambda)
        self.coeff = gamma * gae_lambda

    def compute_advantages(self, rewards, values, masks):
        flat_rewards = torch.flatten(rewards)
        batch_size = len(flat_rewards)
        advantage = np.zeros(batch_size + 1)
        advantage[batch_size - 1] = flat_rewards[batch_size - 1] - values[batch_size - 1]

        for i in reversed(range(batch_size - 1)):
            delta = flat_rewards[i] + (masks[i] * self.gamma * values[i + 1]) - values[i]
            advantage[i] = delta + (masks[i] * self.coeff * advantage[i + 1])
        
        advantage = torch.tensor(advantage[:batch_size])
        batch_returns = advantage + np.squeeze(values)
        batch_returns = torch.tensor(batch_returns, dtype=torch.float)
        return batch_returns, advantage

In [8]:
class Objective:
    def __init__(self, epsilon):
        self.epsilon = epsilon

class SurrogateObjectiveClip(Objective):
    def __init__(self, epsilon):
        super().__init__(epsilon)

    def get_loss(self, ratios, advantage):
        surrogate_loss = ratios * advantage
        clipped_surrogate_loss = torch.clamp(ratios, 1 - self.epsilon, 1 + self.epsilon) * advantage

        return (-torch.min(surrogate_loss, clipped_surrogate_loss)).mean()

class SurrogateObjectiveNoClip(Objective):
    def __init__(self, epsilon):
        super().__init__(epsilon)

    def get_loss(self, ratios, advantage):
        surrogate_loss = ratios * advantage

        return (-torch.min(surrogate_loss)).mean()