In [90]:
import gymnasium as gym
import torch
from torch import nn
import numpy as np
import random
import time

env = gym.make("CartPole-v1",render_mode="human")
states_dim = env.observation_space.shape[0]
actions_dim = env.action_space.n

class PolicyNetwork(nn.Module):
    def __init__(self, states_dim=states_dim):
        
        super().__init__()
        self.fc1 = nn.Linear(states_dim, 5)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(5, 1)
        self.sig1 = nn.Sigmoid()

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.fc2(x)
        x = self.sig1(x)
        return x

def discount(rewards, gamma):
    discounted = rewards
    for i in range(discounted.shape[0]-2, 0, -1):
        discounted[i] = discounted[i] + gamma*discounted[i+1]
    return discounted

def play_one_step(env, obs, model, loss_fn):
    left = model(torch.tensor(obs))
    action = np.array([int(np.random.uniform(0,1,(1,1)) > left.detach().numpy())])
    y_target = torch.tensor((np.array([1]) - action), dtype=torch.float32)
    loss = loss_fn(left, y_target)
    loss.backward()
    grads = []
    for name, param in model.named_parameters():
        grads.append(param.grad)
    obs, reward, done, truncated, info = env.step(int(action))
    return obs, reward, done, truncated, np.array(grads)

def play_multiple_episodes(env, model, loss_fn, episodes, n_max_steps):
    all_rewards = []
    all_gradients = []
    for episode in range(episodes):
        current_rewards = []
        current_gradients = []
        obs, info = env.reset()
        for step in range(n_max_steps):
            obs, reward, done, truncated, grads = play_one_step(env, obs, model, loss_fn)
            current_rewards.append(reward)
            current_gradients.append(grads)
            if done or truncated:
                break        
        all_rewards.append(np.array(current_rewards))
        all_gradients.append(np.array(current_gradients))
    return all_rewards, np.array(all_gradients)

def discount_and_normalize_rewards(all_rewards, gamma):
    all_discounted_rewards = [discount(rewards, gamma) for rewards in all_rewards]
    
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards-reward_mean)/reward_std for discounted_rewards in all_discounted_rewards]

model = PolicyNetwork()

iterations = 150
episodes = 10
n_max_steps = 500
gamma = 0.95
lr = 0.01

optimizer = torch.optim.NAdam(model.parameters(), lr=lr)
loss_fn = torch.nn.BCELoss()

for iter in range(iterations):
    all_rewards, all_gradients = play_multiple_episodes(env, model, loss_fn, episodes, n_max_steps)
    advantage = discount_and_normalize_rewards(all_rewards, gamma)
    mean_gradients = sum([all_gradients[episode][step]*advantage[episode][step] for episode in range(episodes) for step in range(len(advantage[episode]))])
    with torch.no_grad():
        for param, grad in zip(mean_gradients, model.parameters()):
            param -= lr * grad

  return obs, reward, done, truncated, np.array(grads)
  return obs, reward, done, truncated, np.array(grads)
  return all_rewards, np.array(all_gradients)


In [40]:
advantage.shape => (episodes,steps)
all_gradients.shape => (episodes,steps,grad_tensor_dim)

0.0302