In [5]:
import gymnasium as gym
import numpy as np
import torch
from torch import nn as nn
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torch.distributions import Categorical, Normal
from tqdm import tqdm
from torch.optim import Adam

In [6]:
class network_(nn.Module):
    def __init__(self):
        super().__init__()
        self.base = nn.Sequential(*[nn.Linear(2, 60), 
                                    nn.ReLU(),
                        nn.Linear(60, 30),
                        nn.ReLU(),
                         ]);
        
        self.μ = nn.Linear(30, 1)
        self.σ = nn.Linear(30, 1)
    
    def forward(self, x):
        base_ = self.base(x)
        return self.μ(base_), 1+torch.exp(self.σ(base_) )

state_value = nn.Sequential(*[nn.Linear(2, 30), 
                           nn.ReLU(),
                          nn.Linear(30, 30),
                          nn.ReLU(),
                          nn.Linear(30, 1)
                             ]) ### This dude is the state value
network = network_()

In [7]:
@torch.no_grad
def choose_action(state:np.ndarray, network:nn.Module = network)->int:
    mean, std = network(torch.tensor(state, dtype = torch.float32))
    action = Normal(loc = mean, scale = std).sample()
    return np.array(action)

In [8]:
env = gym.make("MountainCarContinuous-v0", max_episode_steps = 1900)
γ = 0.99

opt1 = Adam(network.parameters(), 0.00001)
opt2 = Adam(state_value.parameters(), 0.0001)




num_iters = tqdm(range(500))

for i in num_iters:
    last_state, info = env.reset()
    I = 1
    cum_reward = 0
    cum_delta = 0
    for c in range(1900):
        
        action = choose_action(last_state, network)  # agent policy that uses the observation and info
        current_state, reward, terminated, truncated, info = env.step(action)
        
        cum_reward = (0.9)*cum_reward+(0.1)*(reward)
        
        network.train()
        state_value.train()
        with torch.no_grad():

            if terminated or truncated:
                δ = reward - state_value(torch.tensor(last_state))
            else:
                δ = reward + γ*state_value(torch.tensor(current_state)) - state_value(torch.tensor(last_state))
       
        opt1.zero_grad()
        opt2.zero_grad()
        
        cum_reward = (0.9)*cum_reward+(0.1)*(reward)
        cum_delta = (0.9)*cum_delta+(0.1)*(δ)
        
        
        z = -δ*state_value(torch.tensor(last_state))
        z.backward()
        μ, σ = network(torch.tensor(last_state))
        z_ = -δ*I*(-0.5*((torch.tensor(action) - μ)/σ)**2-torch.log(σ))
        z_.backward()

        opt1.step()
        opt2.step()
        
        
        """ 
       Old school update
        with torch.no_grad():
            for param in state_value.parameters():
                param /= torch.linalg.norm(param)
                param += α*δ*param.grad
        
            for param in network.parameters():
                param /= torch.linalg.norm(param)
                param += α*δ*I*param.grad
        """   
        I = γ*I
        if terminated or truncated:
            break
        else:
            last_state = current_state
        
    num_iters.set_description(f"{cum_reward, cum_delta.item()}")
env.close()
#update_params(network, state_value,states, actions, rewards)

(-0.44669853957615524, 2.7269091606140137):   1%|▍                                   | 6/500 [00:48<1:06:17,  8.05s/it]


KeyboardInterrupt: 

In [9]:
env = gym.make("MountainCarContinuous-v0", max_episode_steps = 1900, render_mode = "human")
state, info = env.reset()

for c in range(1900):
    network.eval()
    action = choose_action(state, network)  # agent policy that uses the observation and info
    state, reward, terminated, truncated, info = env.step(action)
    if terminated or truncated:
        state, info = env.reset()
        print(c)
        break
    
env.close()

1899


In [None]:
action