In [2]:
import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy
import os

In [8]:
environment_name = "CarRacing-v2"
env = gym.make(environment_name,  render_mode="human")

In [6]:
action = env.action_space.sample()
env.reset()

(array([[[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        ...,
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]]], dtype=uint8),
 {})

## Testing Without Training

In [21]:
episodes = 5
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0 
    
    while not done:
        env.render()
        action = env.action_space.sample()
        n_state, reward, done, truncated, info = env.step(action)
        score+=reward
    print('Episode:{} Score:{}'.format(episode, score))
env.close()

Episode:1 Score:-458.0131782945864
Episode:2 Score:-878.2311827958077
Episode:3 Score:-859.1154121864841
Episode:4 Score:-885.3750000001104
Episode:5 Score:-863.939926740032


In [13]:
import torch
import torch.nn as nn
from torch.distributions import MultivariateNormal, Categorical

# Alot of borrowed code from the internet, I'm not sure how much of this I understand Neural Networks are hard!

class ExperienceBuffer:
    def __init__(self):
        self.action_list = []
        self.state_list = []
        self.log_prob_list = []
        self.reward_list = []
        self.value_list = []
        self.terminal_flags = []
    
    def reset_buffer(self):
        self.action_list.clear()
        self.state_list.clear()
        self.log_prob_list.clear()
        self.reward_list.clear()
        self.value_list.clear()
        self.terminal_flags.clear()

class ActorCriticNetwork(nn.Module):
    def __init__(self, input_dim, output_dim, continuous_action_space, init_std):
        super().__init__()

        self.continuous_action = continuous_action_space

        if self.continuous_action:
            self.action_variance = torch.full((output_dim,), init_std ** 2).to(device)
            self.actor_layers = nn.Sequential(
                            nn.Linear(input_dim, 64),
                            nn.Tanh(),
                            nn.Linear(64, output_dim),
                            nn.Tanh()
                        )
        else:
            self.actor_layers = nn.Sequential(
                            nn.Linear(input_dim, 64),
                            nn.Tanh(),
                            nn.Linear(64, output_dim),
                            nn.Softmax(dim=-1)
                        )

        self.critic_layers = nn.Sequential(
                        nn.Linear(input_dim, 64),
                        nn.Tanh(),
                        nn.Linear(64, 1)
                    )

    def adjust_action_std(self, new_std):
        if self.continuous_action:
            self.action_variance.fill_(new_std ** 2)
        else:
            print("Attempting to adjust action standard deviation in a discrete action space.")

    def forward(self):
        raise NotImplementedError

    def perform_action(self, state):
        if self.continuous_action:
            mean_action = self.actor_layers(state)
            covariance_matrix = torch.diag(self.action_variance).unsqueeze(0)
            distribution = MultivariateNormal(mean_action, covariance_matrix)
        else:
            action_probabilities = self.actor_layers(state)
            distribution = Categorical(action_probabilities)

        sampled_action = distribution.sample()
        log_probability = distribution.log_prob(sampled_action)
        value_estimate = self.critic_layers(state)

        return sampled_action.detach(), log_probability.detach(), value_estimate.detach()

    def evaluate_action(self, state, action):
        if self.continuous_action:
            mean_action = self.actor_layers(state)
            action_var = self.action_variance.expand_as(mean_action)
            covariance_matrix = torch.diag_embed(action_var)
            distribution = MultivariateNormal(mean_action, covariance_matrix)

            if action.dim() == 1:
                action = action.view(-1, self.action_variance.size(0))
        else:
            action_probabilities = self.actor_layers(state)
            distribution = Categorical(action_probabilities)

        log_probs = distribution.log_prob(action)
        entropy = distribution.entropy()
        value_estimate = self.critic_layers(state)

        return log_probs, value_estimate, entropy

class ProximalPolicyOptimization:
    def __init__(self, input_dim, output_dim, actor_lr, critic_lr, discount_factor, update_epochs, clip_epsilon, continuous_action_space, initial_std=0.6):
        self.continuous_action = continuous_action_space
        self.discount_factor = discount_factor
        self.clip_epsilon = clip_epsilon
        self.update_epochs = update_epochs
        self.experience_buffer = ExperienceBuffer()

        self.main_policy = ActorCriticNetwork(input_dim, output_dim, continuous_action_space, initial_std).to(device)
        self.optimizer = torch.optim.Adam([
                        {'params': self.main_policy.actor_layers.parameters(), 'lr': actor_lr},
                        {'params': self.main_policy.critic_layers.parameters(), 'lr': critic_lr}
                    ])

        self.old_policy = ActorCriticNetwork(input_dim, output_dim, continuous_action_space, initial_std).to(device)
        self.old_policy.load_state_dict(self.main_policy.state_dict())

        self.loss_function = nn.MSELoss()

    def update_policy(self):
        discounted_rewards = []
        temp_reward = 0
        for reward, terminal in zip(reversed(self.experience_buffer.reward_list), reversed(self.experience_buffer.terminal_flags)):
            if terminal:
                temp_reward = 0
            temp_reward = reward + (self.discount_factor * temp_reward)
            discounted_rewards.insert(0, temp_reward)

        rewards_normalized = torch.tensor(discounted_rewards, dtype=torch.float32).to(device)
        rewards_normalized = (rewards_normalized - rewards_normalized.mean()) / (rewards_normalized.std() + 1e-7)

        old_states = torch.stack(self.experience_buffer.state_list).detach().to(device)
        old_actions = torch.stack(self.experience_buffer.action_list).detach().to(device)
        old_log_probs = torch.stack(self.experience_buffer.log_prob_list).detach().to(device)
        old_values = torch.stack(self.experience_buffer.value_list).detach().to(device)

        advantages = rewards_normalized - old_values

        for _ in range(self.update_epochs):
            current_log_probs, value_estimates, entropy = self.main_policy.evaluate_action(old_states, old_actions)
            value_estimates = torch.flatten(value_estimates)
            ratios = torch.exp(current_log_probs - old_log_probs.detach())
            surrogate1 = ratios * advantages
            surrogate2 = torch.clamp(ratios, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantages

            loss = -torch.min(surrogate1, surrogate2) + 0.5 * self.loss_function(value_estimates, rewards_normalized) - 0.01 * entropy
            self.optimizer.zero_grad()
            loss.mean().backward()
            self.optimizer.step()

        self.old_policy.load_state_dict(self.main_policy.state_dict())
        self.experience_buffer.reset_buffer()

    def save_model(self, filepath):
        torch.save(self.old_policy.state_dict(), filepath)

    def load_model(self, filepath):
        self.old_policy.load_state_dict(torch.load(filepath, map_location=lambda storage, loc: storage))
        self.main_policy.load_state_dict(torch.load(filepath, map_location=lambda storage, loc: storage))

        


## Training and Saving Model

In [7]:
log_path = os.path.join('Training', 'Logs')
model = PPO("CnnPolicy", env, verbose=1, tensorboard_log=log_path)
model.learn(total_timesteps=400000)

ppo_path = os.path.join('Training', 'Saved Models', 'PPO_400K_Driving_model')
model.save(ppo_path)


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.
Logging to Training/Logs/PPO_7
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1e+03    |
|    ep_rew_mean     | -60.3    |
| time/              |          |
|    fps             | 199      |
|    iterations      | 1        |
|    time_elapsed    | 10       |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 1e+03        |
|    ep_rew_mean          | -59.5        |
| time/                   |              |
|    fps                  | 91           |
|    iterations           | 2            |
|    time_elapsed         | 44           |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0073701795 |
|    clip_fraction   

## Load Latest Model

In [10]:
del model 
ppo_path = os.path.join('Training', 'Saved Models', 'PPO_400_Driving_model')
model = PPO.load(ppo_path, env=env)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.


In [12]:
evaluate_policy(model, env, n_eval_episodes=10, render=True)


(-33.492388664186, 26.04638754470016)

In [25]:
env.close()
