In [41]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gymnasium as gym
import random
import matplotlib.pyplot as plt
from collections import deque, namedtuple
from torch.distributions import Categorical
import matplotlib.animation as animation
import time

In [42]:
class Actor(nn.Module):
    """ Given a state, return the probability of each action. """
    def __init__(self, state_dim=4, action_dim=2, hidden_dim=16):
        """ Initialize an Actor object.

        Params
        ======
            state_dim (int): vector dimension of state vector.
            action_dim (int): number of actions.
            fc1_dim (int): number of units in the first hidden layer.
            fc2_dim (int): number of units in the second hidden layer.
        """
        
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        """ Given a state, return probability of each action.

        Params
        ======
            x (torch.Tensor): the state tensor of shape [batch_size, state_dim]
        """ 
        
        x = F.relu(self.fc1(x)) # [batch_size, hidden_dim]
        return F.softmax(self.fc2(x), dim=1) # [batch_size, action_dim]

In [43]:
class SPG:
    def __init__(self, state_dim=4, action_dim=2, hidden_dim=8, gamma=0.99, 
                 actor_lr=2e-2, n_actor=500, n_critic=500, max_t=300, alpha=0.9,
                 print_every=1, seed=0):

        """
        n_actor: number of actor network updates
        n_critic: In SPG, the number of trajectories per iteration
        max_t: length of each trajectory

        *patience: no patience required
        """
        
        # Input & Output
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.hidden_dim = hidden_dim

        # Training
        self.n_actor = n_actor
        self.n_critic = n_critic
        self.max_t = max_t
        self.gamma = gamma
        self.alpha = alpha
        self.print_every = print_every
        # self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.device = torch.device('cpu')
        self.seed = 0
        
        # Networks and Optimizers
        self.actor_network = Actor(state_dim, action_dim, hidden_dim)
        self.actor_optimizer = optim.Adam(self.actor_network.parameters(), lr=actor_lr)

        # loss functions
        self.mse = nn.MSELoss()
        self.kl_div = nn.KLDivLoss(reduction='batchmean')


    # verified; no requirement for batches
    def act(self, state):
        """ Given a state vector, return an action randomly.

        params
        ======
            state is represented by an numpy array

        Returns
        ======
            A random action given input state.
        """
        
        state = torch.from_numpy(state).unsqueeze(0).float() # Convert state to a torch tensor and add batch dimension
        probs = self.actor_network(state).squeeze(0) # Get the action probabilities from the actor network
        m = Categorical(probs) # Create a Categorical distribution based on the probabilities
        action = m.sample() # Sample an action from the distribution
        log_prob = m.log_prob(action)
        return action.item(), log_prob


    def CVaR_loss(self, env):
        """ 
        env: game environment
        """
        ## First off, generate tons of trajectorys
        trajectorys = []
        probs = []
        values = []

        # We need to generate number of n_critic trajectorys
        for _ in range(self.n_critic):
            # initialize current trajectory
            trajectory = []
            prob = 0
            value = 0
            
            state, _ = env.reset(seed=self.seed) # fix the initial state -> fixed
            
            for t in range(self.max_t):
                action, log_prob = self.act(state) # action no gradient, log_prob with gradient
                next_state, reward, done, _, _ = env.step(action)
                
                trajectory.append(state)
                trajectory.append(action)
                trajectory.append(-reward)
                value += self.gamma ** t * (-reward)
                prob += log_prob

                if done:
                    break
                
                state = next_state

            trajectorys.append(trajectory)
            values.append(value)
            probs.append(prob)


        # Given n_critic trajectorys, and log probabilities of trajectorys and final values
        # compute the CVaR Gradient
        cvar_grad = 0 # initialization
        q_alpha = np.quantile(values, self.alpha, method='inverted_cdf') # compute q_alpha
        for i in range(self.n_critic):
            if values[i] > q_alpha:
                cvar_grad += probs[i] * (values[i] - q_alpha) / self.alpha
        
        return cvar_grad / self.n_critic

    
    
    def test(self, env):
        state, _ = env.reset(seed=self.seed)
        if isinstance(state, int):
            state = np.array([state])
        
        score = 0
        for t in range(self.max_t):
            action, _ = self.act(state)
            next_state, reward, done, _, _ = env.step(action)
            if isinstance(next_state, int):
                next_state = np.array([next_state])
            
            score += reward

            if done:
                break
            
            state = next_state
        return score

    
    def train(self, env):
        init_score = self.test(env)
        scores_deque = deque(maxlen=5)
        scores_deque.append(init_score)
        scores = [init_score]
        time_count = [0.]
        traj_count = [0]

        for i_actor in range(self.n_actor):
            if i_actor % self.print_every == 0:
                print(f'Episode {i_actor}\tAverage Score: {np.mean(scores_deque):.2f}')
            
            start_time = time.time()
            loss = self.CVaR_loss(env)
            self.actor_optimizer.zero_grad()
            loss.backward()
            self.actor_optimizer.step()
            end_time = time.time()
            
            time_count.append(end_time - start_time)
            traj_count.append(self.n_critic)
            
            # Test One Episode
            score = self.test(env)
            scores.append(score)
            scores_deque.append(score)

            if np.mean(scores_deque) >= 1000.0:
                print('Environment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(i_actor-5, np.mean(scores_deque)))
                break
        
        return np.array(scores), np.array(time_count), np.array(traj_count)

In [44]:
env = gym.make('CartPole-v1')

In [45]:
SPG_Base = SPG(state_dim=4, action_dim=2, hidden_dim=8, gamma=0.99, 
               actor_lr=1e-2, n_actor=200, n_critic=200, max_t=300, alpha=0.9,
               print_every=1)

In [None]:
spg = SPG(state_dim=4, action_dim=2, hidden_dim=8, gamma=0.99, 
               actor_lr=1e-2, n_actor=100, n_critic=200, max_t=300, alpha=0.9,
               print_every=1)
spg.actor_network.load_state_dict(SPG_Base.actor_network.state_dict())
scores_1, time_count_1, traj_count_1 = spg.train(env)