## Initialize the environment
These are the packages needed for running the env. You may be required to install additional packages to support them.

In [None]:
%pip install PySuperTuxKart

%pip install imageio

%pip install tensorboard

%pip install stable-baselines3[extra]

%pip install gym

%pip install moviepy

#replace with proper version of torch, torchvision and torchaudio
# %pip install --pre torch torchvision torchaudio --index-url https://download.pytorch.org/whl/nightly/cu124


# %pip install sb3-contrib


This codes checks if cuda is abailable. If it is, then you have successfully installed pytorch with cuda.

In [None]:
# check if the GPU is available
import torch
print(torch.cuda.is_available())


Now let's creat the environment. The imported Env is a GYM env, it needs to be converted to a venv. In theory, vector envs can be crated multiple times and trained in parallel. But in this case, our env can only be created once. So we are using the dummy DummyVecEnv to create a vector with just one instance.

This shouldn't require any change. The last line checks if the env is valid. It should run without any error.

In [None]:
# note: you shouldn't run this cell twice as only one environment can be created at a time
# if you want to run this cell again, you need to restart the runtime

import gymnasium as gym

from matplotlib import pyplot as plt
import numpy as np
from stable_baselines3 import PPO, A2C, DQN, SAC, TD3
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy

import os
import sys

from stable_baselines3.common.vec_env import VecNormalize, VecFrameStack, VecVideoRecorder, DummyVecEnv, VecMonitor

# Add the path to the custom environment
sys.path.insert(0, "homework")

import kartEnv
from stable_baselines3.common.env_checker import check_env

from gymnasium.envs.registration import make, register, registry, spec

# This makes sure that the custom environment is registered only once
if "kartEnv-v0" not in registry:

    # Register the custom environment as kartEnv-v0
    register(
        id="kartEnv-v0",
        entry_point="kartEnv:kartEnv",
        max_episode_steps=2000
    )

    # Now you can make the environment
    env = gym.make("kartEnv-v0")
    # Now you have a gym environment

# Check if the environment is valid, if not, fix it
check_env(env)

# DO NOT overwrite the env variable, it will break the environment,
# and we will use env later

# Convert the environment to a vectorized environment, useful for using SB3 functions like VecNormalize
venv = DummyVecEnv([lambda: env])

# Wrap the environment with a monitor, useful for logging to tensorboard
venv = VecMonitor(venv)


# initialize and normalize the environment
This cell includes a notmalization to the environment.

SB3 documents suggests that normalization can help with the performance of the model.
You can uncomment the lines and change the True/ False to see the difference.

## loading the normalized environment
This code can load a previously used venv normalization parameters. I will show you how to save the params later.

Don't from saved unless you have exact same environment as the one you saved.
Otherwise, you may experiecne stability issues.

In [None]:
load_from_checkpoint = False

In [None]:
# normalize the environment, may be useful for training
# might be helpful to load the VecNormalize object from a file if it already exists
if (load_from_checkpoint and os.patorch.exists("venv.pkl")):
    venv = VecNormalize.load("venv.pkl", venv)
else:
    venv = VecNormalize(venv, 
                training=True, 
                norm_obs=True,                 
                norm_obs_keys= ["speed"],
                norm_reward=True, 
                clip_obs=5,
                clip_reward=100.0, 
                gamma=0.99, 
                epsilon=1e-08,

                )


If you see an image of kart on track without any error, it means the venv is set up properly.

In [None]:
sample_obs = venv.reset()
print(sample_obs)
print(sample_obs["speed"]) # print a sample of the observation, also show the image of the track as an image
print("observation shapes: ", sample_obs["speed"].shape, sample_obs["image"].shape)
plt.imshow(np.moveaxis(sample_obs["image"].squeeze(), 0, -1))

# Defining the model
The code below shows an example of using Stable Baselines 3 PPO. It also shows how to define custom feature extractor.

In [None]:
import gymnasium as gym
from typing import Callable
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision as tv

from stable_baselines3 import PPO
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from stable_baselines3.common.vec_env import VecNormalize

from torchvision import models

n_input_channels = 3

This feature extractor is essentially a CNN that applies to the image and a MLP applies to the speed. 

After extraction, the result is passes to the policy and value MLP.

There are different options to define the feature extractor. A simple CNN example is shown. You can adjust it as you need.

Check this, https://stable-baselines3.readthedocs.io/en/master/guide/custom_policy.html

In [None]:
class CustomCombinedExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.spaces.Dict):
        """
        Initialize the custom feature extractor.

        :param observation_space: The observation space of the environment.
        """

        extractors = {}
        total_concat_size = 0

        for key, subspace in observation_space.items():

            # CNN for image input feature extraction
            if key == "image":
                
                # Option 1: Use a custom CNN architecture

                extractors[key] = nn.Sequential(

                    # conv block 1
                    nn.Conv2d(n_input_channels, 64, kernel_size=8, stride=3, padding=3),
                    nn.BatchNorm2d(64),
                    nn.ReLU(),
                    nn.Conv2d(64, 64, kernel_size=4, stride=2, padding=2),
                    nn.BatchNorm2d(64),
                    nn.ReLU(),
                    nn.MaxPool2d(kernel_size=2, stride=1),
                    

                    # conv block 2
                    nn.Conv2d(64, 128, kernel_size=6, stride=2, padding=2),
                    nn.BatchNorm2d(128),
                    nn.ReLU(),
                    nn.Conv2d(128, 128, kernel_size=4, stride=2, padding=2),
                    nn.BatchNorm2d(128),
                    nn.ReLU(),
                    nn.MaxPool2d(kernel_size=2, stride=1),


                    # conv block 3
                    nn.Conv2d(128, 256, kernel_size=5, stride=1, padding=2),
                    nn.BatchNorm2d(256),
                    nn.ReLU(),
                    nn.Conv2d(256, 256, kernel_size=4, stride=1, padding=1),
                    nn.BatchNorm2d(256),
                    nn.ReLU(),
                    nn.MaxPool2d(kernel_size=2, stride=1),

                    # Calculate the output size after the CNN
                    # This is needed to create the linear layer
                    
                    nn.Flatten(),
                    nn.Linear(1536, 256),
                    nn.ReLU(),
                    nn.Dropout(p=0.5),
                )   

                total_concat_size += 256

            # MLP for speed input feature extraction
            elif key == "speed":
                # Simple MLP for speed input
                extractors[key] = nn.Sequential(
                    nn.Linear(subspace.shape[0], 64),
                    nn.ReLU(),
                )
                total_concat_size += 64
            else:
                raise NotImplementedError(f"Extractor for key '{key}' is not implemented.")

        # Now, initialize the parent class with the correct features_dim
        super(CustomCombinedExtractor, self).__init__(observation_space, features_dim=total_concat_size)

        self.extractors = nn.ModuleDict(extractors)


    def forward(self, observations: dict) -> torch.Tensor:
        
        """
        Forward pass through the feature extractor.

        :param observations: Dictionary of observations.
        :return: Concatenated feature tensor.
        """

        encoded_tensor_list = []
        for key, extractor in self.extractors.items():
            obs = observations[key]
            obs = torch.tensor(obs, dtype=torch.float32) if not isinstance(obs, torch.Tensor) else obs
            encoded_tensor_list.append(extractor(obs))

        return torch.cat(encoded_tensor_list, dim=1)
    
    


Alternative design can be residual CNN, or preimplemnted networks by pytorch. Examples shown as below.

In [None]:
# class ResidualBlock(nn.Module):
#     def __init__(self, in_channels, out_channels, stride=1):
#         super(ResidualBlock, self).__init__()
#         self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
#         self.bn1 = nn.BatchNorm2d(out_channels)
#         self.relu = nn.ReLU()
#         self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
#         self.bn2 = nn.BatchNorm2d(out_channels)

#         self.downsample = nn.Sequential()
#         if stride != 1 or in_channels != out_channels:
#             self.downsample = nn.Sequential(
#                 nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
#                 nn.BatchNorm2d(out_channels)
#             )

#     def forward(self, x):
#         identity = self.downsample(x)
#         out = self.conv1(x)
#         out = self.bn1(out)
#         out = self.relu(out)
#         out = self.conv2(out)
#         out = self.bn2(out)
#         out += identity
#         out = self.relu(out)
#         return out

# # # Custom feature extractor for environments with image and speed inputs
# class CustomCombinedExtractor(BaseFeaturesExtractor):
#     def __init__(self, observation_space: gym.spaces.Dict):
#         """
#         Initialize the custom feature extractor.

#         :param observation_space: The observation space of the environment.
#         """

#         extractors = {}
#         total_concat_size = 0

#         for key, subspace in observation_space.items():
#             if key == "image":
                
#                 # Option 2: Use a similar architecture to ResNet, but with less layers

#                 # Define a more complex CNN architecture for better feature extraction
#                 extractors[key] = nn.Sequential(
#                     nn.Conv2d(n_input_channels, 64, kernel_size=7, stride=2, padding=3),
#                     nn.BatchNorm2d(64),
#                     nn.ReLU(),
#                     nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
                    
#                     # Residual blocks, the number of blocks can be increased for better performance, but it will increase the training time
#                     ResidualBlock(64, 64),
#                     ResidualBlock(64, 64),

#                     ResidualBlock(64, 128, stride=2),
#                     ResidualBlock(128, 128),
#                     ResidualBlock(128, 128),

#                     ResidualBlock(128, 256, stride=2),
#                     ResidualBlock(256, 256),
#                     ResidualBlock(256, 256),
#                     ResidualBlock(256, 256),

#                     ResidualBlock(256, 512, stride=2),
#                     ResidualBlock(512, 512),

#                     nn.AdaptiveAvgPool2d((1, 1)),

#                     nn.Flatten(),
#                     # Calculate the output size after the CNN
#                     # Assuming input image size is (3, 96, 128)
#                     # After Conv layers:
#                     # (96, 128) -> (64, 48, 64) -> (64, 24, 32) -> (128, 24, 32) -> (128, 12, 16)
#                     # -> (256, 12, 16) -> (256, 6, 8) -> (512, 6, 8) -> (512, 3, 4) -> (512, 1, 1)  
                    
#                     nn.Linear(512, 256),
#                     nn.ReLU(),
#                     nn.Dropout(p=0.5),
#                 )

#                 total_concat_size += 256


#             elif key == "speed":
#                 # Simple MLP for speed input
#                 extractors[key] = nn.Sequential(
#                     nn.Linear(subspace.shape[0], 32),
#                     nn.ReLU(),
#                 )
#                 total_concat_size += 32
#             else:
#                 raise NotImplementedError(f"Extractor for key '{key}' is not implemented.")

#         # Now, initialize the parent class with the correct features_dim
#         super(CustomCombinedExtractor, self).__init__(observation_space, features_dim=total_concat_size)

#         self.extractors = nn.ModuleDict(extractors)

#     def forward(self, observations: dict) -> torch.Tensor:
#         """
#         Forward pass through the feature extractor.

#         :param observations: Dictionary of observations.
#         :return: Concatenated feature tensor.
#         """
#         encoded_tensor_list = []
#         for key, extractor in self.extractors.items():
#             obs = observations[key]
#             obs = torch.tensor(obs, dtype=torch.float32) if not isinstance(obs, torch.Tensor) else obs

#             if key == "image":
#                 # Ensure the image is of type float and normalized
#                 obs = obs.float() / 255.0
            
#             encoded_tensor_list.append(extractor(obs))
#         return torch.cat(encoded_tensor_list, dim=1)

In [None]:
# class ResidualBlock(nn.Module):
#     def __init__(self, in_channels, out_channels, stride=1):
#         super(ResidualBlock, self).__init__()
#         self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
#         self.bn1 = nn.BatchNorm2d(out_channels)
#         self.relu = nn.ReLU()
#         self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
#         self.bn2 = nn.BatchNorm2d(out_channels)

#         self.downsample = nn.Sequential()
#         if stride != 1 or in_channels != out_channels:
#             self.downsample = nn.Sequential(
#                 nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
#                 nn.BatchNorm2d(out_channels)
#             )

#     def forward(self, x):
#         identity = self.downsample(x)
#         out = self.conv1(x)
#         out = self.bn1(out)
#         out = self.relu(out)
#         out = self.conv2(out)
#         out = self.bn2(out)
#         out += identity
#         out = self.relu(out)
#         return out

# # # Custom feature extractor for environments with image and speed inputs
# class CustomCombinedExtractor(BaseFeaturesExtractor):
#     def __init__(self, observation_space: gym.spaces.Dict):
#         """
#         Initialize the custom feature extractor.

#         :param observation_space: The observation space of the environment.
#         """

#         extractors = {}
#         total_concat_size = 0

#         for key, subspace in observation_space.items():
#             if key == "image":

#                 # Option 3: Use a ResNeXt model
#                 # Load a ResNeXt model

#                 resnext = models.resnext50_32x4d(pretrained=False)

#                 # Remove the fully connected layer and avgpool
#                 modules = list(resnext.children())[:-2]  # Exclude avgpool and fc
#                 resnext = nn.Sequential(*modules)

#                 # Add AdaptiveAvgPool and Flatten
#                 resnext = nn.Sequential(
#                     resnext,
#                     nn.AdaptiveAvgPool2d((1, 1)),
#                     nn.Flatten(),
#                     nn.Linear(2048, 256),  # ResNeXt-50-32x4d has 2048 output channels
#                     nn.ReLU(),
#                     nn.Dropout(p=0.5),
#                 )

#                 # Optionally, freeze ResNeXt layers to prevent training
#                 for param in resnext.parameters():
#                     param.requires_grad = False

#                 extractors[key] = resnext
#                 total_concat_size += 256  # Size after Linear layer

#             elif key == "speed":
#                 # Simple MLP for speed input
#                 extractors[key] = nn.Sequential(
#                     nn.Linear(subspace.shape[0], 32),
#                     nn.ReLU(),
#                 )
#                 total_concat_size += 32
#             else:
#                 raise NotImplementedError(f"Extractor for key '{key}' is not implemented.")

#         # Now, initialize the parent class with the correct features_dim
#         super(CustomCombinedExtractor, self).__init__(observation_space, features_dim=total_concat_size)

#         self.extractors = nn.ModuleDict(extractors)

#     def forward(self, observations: dict) -> torch.Tensor:
#         """
#         Forward pass through the feature extractor.

#         :param observations: Dictionary of observations.
#         :return: Concatenated feature tensor.
#         """
#         encoded_tensor_list = []
#         for key, extractor in self.extractors.items():
#             obs = observations[key]
#             obs = torch.tensor(obs, dtype=torch.float32) if not isinstance(obs, torch.Tensor) else obs

#             if key == "image":
#                 # Ensure the image is of type float and normalized
#                 obs = obs.float() / 255.0
            
#             encoded_tensor_list.append(extractor(obs))
#         return torch.cat(encoded_tensor_list, dim=1)

This cell below should run without error. It checks the dimension.

In [None]:
import numpy as np
import torch

sample_observation = venv.reset()

print("Sample observation:")
print(sample_observation)
print("observation shape: ", sample_obs["speed"].shape, sample_obs["image"].shape)

# Forward pass
custom_extractor = CustomCombinedExtractor(env.observation_space)
feature_results = custom_extractor(sample_observation)

print("\nFeatures shape:", feature_results.shape)

print("feature dim: ", custom_extractor.features_dim)


To further define the network used, you can create a custom network. But I don't think it is necessary. 

In [None]:

from typing import Callable, Dict, List, Optional, Tuple, Type, Union

from gymnasium import spaces
from stable_baselines3.common.policies import ActorCriticPolicy

# Policy keyword arguments with optimized architecture
policy_kwargs = {
    'ortho_init': False,  # Orthogonal initialization may be beneficial
    'activation_fn': nn.ReLU,  # Explicitly define activation function
    'net_arch': {
        'pi': [256, 256],  # Increased layer sizes for policy network
        'vf': [256, 256],  # Increased layer sizes for value function
    },
    'features_extractor_class': CustomCombinedExtractor, # use the custom feature extractor
    'features_extractor_kwargs': {},  #
    # 'share_features_extractor': False,
    
}


# Define the PPO model with optimized parameters


# change the parameters as needed, check this https://stable-baselines3.readthedocs.io/en/master/modules/ppo.html
model = PPO(
    # CustomActorCriticPolicy,
    "MultiInputPolicy",
    venv,
    verbose=1,
    tensorboard_log="./logs",
    
    policy_kwargs=policy_kwargs,
    # learning_rate=linear_schedule(3e-4), 
    learning_rate=1e-3,
    n_steps=4000,  
    batch_size=1000, 
    n_epochs=6,  
    gamma=0.999,  
    gae_lambda=0.95,  
    clip_range=0.2,  
    ent_coef=0.01, 
    vf_coef=0.5,  
    max_grad_norm = 100.0, 
    # target_kl=0.015,

    device="auto",  # Use GPU if available
)


Another possibble approach is to use PPO with LSTM. LSTM should be useful for this application. But.....will require more vram. 

In [None]:
# from sb3_contrib import RecurrentPPO


# policy_kwargs = {
#     'ortho_init': False,  # Orthogonal initialization can be beneficial, consider enabling
#     'activation_fn': nn.ReLU,  # Explicitly define activation function
#     'net_arch': {
#         # 'shared_extractor': None,  # Using custom feature extractor
#         'pi': [256, 256],  # Increased layer sizes for policy network
#         'vf': [256, 256],  # Increased layer sizes for value function
#     },
#     'features_extractor_class': CustomCombinedExtractor,
#     'features_extractor_kwargs': {},  # Add if any additional args needed
#     # 'share_features_extractor': False,  # Ensure features extractor is not shared
#     'n_lstm_layers': 15,
    
# }

# model = RecurrentPPO(
#     "MlpLstmPolicy",
#     venv,
#     verbose=1,
#     tensorboard_log="./logs",

#     policy_kwargs=policy_kwargs,
    
#     learning_rate=3e-4,
#     n_steps=3000,
#     batch_size=500,
#     n_epochs=4,
#     gamma=0.9992,
#     gae_lambda=0.95,
#     clip_range=0.2,
#     ent_coef=0.01,
#     vf_coef=0.5,
#     max_grad_norm= 100, 

#     device="auto"
# )


# Training
The code below shows how training is done using PPO. It also shows how to load and save models and venv.
To train more, simply increase total_timesteps. You can change the tb_log_name to log different trainings.
## Warning:
- Do not load save unless it is exact same model

In [None]:
load_from_checkpoint = True

In [None]:
tracks = ['lighthouse', 'zengarden', 'hacienda', 'snowtuxpeak', 'cornfield_crossing', 'scotland', 'cocoa_temple']

# use this code to train on different tracks
env.changeTrack("lighthouse")
    

if load_from_checkpoint and os.path.exists("ppo_kart.zip"):
    model = PPO.load("ppo_kart", env=venv)
    # model = RecurrentPPO.load("ppo_kart", env=venv)
    print("Model loaded from checkpoint")
    

In [None]:

# this can log to tensorboard
# reset_num_timesteps=False is used to continue training from the previous timesteps, this if for logging purposes
# progress bar may not work well in some cases

# significantly increase the total_timesteps for better performance
model.learn(total_timesteps=2000, reset_num_timesteps=False, tb_log_name="ppo_test1", log_interval=1, progress_bar=False)


Save the trained model and params to the folder.

In [None]:
model.save("ppo_kart")
venv.save("venv.pkl")

# Playing the video
The code below shows how to render and play video using the env.
Note, our env use a different rendering logic than usual GYM envs.

In [None]:
import numpy as np

# Reset the environment and get the initial observation
obs, info = env.reset()

lap_completed = False
truncated = False

# Generate a video until the kart finishes the track or the episode is truncated
while not (lap_completed or truncated):
    print(
        f"Processing frame {env.currentFrame},lap_completed? {lap_completed or truncated}",
        end="\r"
    )
    
    # Get the action from the trained model based on the current observation
    action, _states = model.predict(obs)
    
    # Perform the action in the environment
    obs, rewards, lap_completed, truncated, info = env.step(action)

    # Render the current frame (collect frames for the video)
    env.render()
    
# Play the generated video
env.playVideo()

# If you see "finished at frame: 2000", it means it timed out, not actually finished
