
# TuIT_DeepRL - DRL Project
![OST](./resources/ost_logo.png)
## MiniF1RL: My pygame 2d racing environment for DRL
Author: Lars Herrmann    
Date: 12.04.2024  
Repository: [Github - miniF1RL](https://github.com/lherrman/miniF1RL)   
Python Version: 3.11

## Introduction
After a lot of experimentaition and consideration of what environment to use. (Playing with e.g. TrackManiaRL, Gymnasium Integrated Environemnts and Unity Environments), i decided to implement my own 2d racing environment. The main reason for this is that i wanted a environment where no pre-trained models are available what motivates me to train my own models. As i started watching formula one this season, i wanted to go with a racing environment. 

## Implementing the environment

### The CarModel
As a starting point, i was able to use a Python class of a simple 2d topdown car model that i implemented for another project about a year ago. The class already had methods to draw a simple 2d box car and it's wheels onto a pygame screen. As it wasn't intended as a racing game, and rather a geometrical model, i had to implement an updated method for updating the cars position and velocity to make it more fun to drive. The original CarModel class can be found in my github repository for the [SlamCar Project](https://github.com/lherrman/slamcar-controller) in the file `base_model.py`.

To make this into a racing environment, there were still some parts missing i startet to implement, starting with a track to drive on.

### The Track
In order to have a track to drive on, i decided to draw a black and white image of a track in photoshop. When initializing the car model, i wrote a python method that uses the 'cv2.findContours' method to find the track boundaries in the image. The track is then represented as two lists of points, one for the inner and one for the outer boundary.

![Track Image](./resources/MakeTrack.png)


Python Code:
```python
    def _get_track_boundaries_from_image(self, image_path) -> dict:
        '''
        Using opencv to read the image and find the contours of the track boundaries.
        The boundaries are represented as list of points. ([x1, y1], [x2, y2], ...)
        '''
        image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
        contours, _ = cv2.findContours(image.astype('uint8'), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        assert len(contours) == 2, "There should be exactly 2 contours in the image"

        # Transform and downsample the contours
        downsample_rate = 10
        inner_boundary = contours[0][:, 0, :][::downsample_rate]
        outer_boundary = contours[1][:, 0, :][::downsample_rate]

        # # Append last point to close the loop
        inner_boundary = np.append(inner_boundary, [inner_boundary[0]], axis=0)
        outer_boundary = np.append(outer_boundary, [outer_boundary[0]], axis=0)

        # Scale the contours to make suitable for the simulation
        inner_boundary, outer_boundary = inner_boundary / 60, outer_boundary / 60

        return {
            'inner': inner_boundary,
            'outer': outer_boundary
        }   
```

#### Track progress
To approximate the progress the car has made on the track, i implemented a method `_calculate_track_progress` that calculates the distance to the next point on the inner boundary. The first point on the inner boundary is set as the starting point. The progress is then calculated as the index of the nearest point divided by the total number of points on the inner boundary. This could be improved by using the distance from the starting point to the nearest point on the inner boundary as the progress.

### The observation space

I wanted to keep the observation space simple, so i decided to use 'lidar sensors', that give the car only a few distance measurements in front of it. I decided to use 3 sensors, one in the middle and one on each side of the car in a 45 degree angle. The sensors are implemented as a method of the CarModel class that returns the distances to the track boundaries. 

The implementation of the `_segment_intersection` and `_raycast` methods was prompted to ChatGPT, which returned an working implementation after 2-3 iterations. The method uses an all python implementation for which the performance isn't optimal, but i guess it's good enough for my purposes. As the raycast method has to itterate over each segment of the boundaries, a more performant implementation would be beneficial.

For easier validation, and because it looks cool, i decided to draw the lidar sensors as lines on the pygame screen.

![Lidar Sensors](./resources/lidars.png)


### The action space

The action space is also very simple. To make the problem even easier for the RL algorithm, i decided to always have the car accelerate, as slowing down isn't necessary in this environment. The car can steer left or right, or go straight. To make it a little more interesting, i added a third action, boost, that allows the car to go faster on the straights.

### The reward function

I decided the use the following 4 types of rewards:
- Progress reward: The car gets a reward for making progress on the track
- Speed reward: The car gets a reward for driving fast
- Finish reward: The car gets a reward for finishing the track
- Collision reward: The car gets a negative reward for colliding with the track boundaries

As the rewards are calculated from differently scaled values, the weigths of progress and speed can't be compared directly. I decided to use the following weights for the rewards:
Progress: 1000 (* fracton of the progress made in the last step)
Speed: 1 (* speed in m/s)
Finish: 100 (* 1 if the car has finished the track)
Collision: -100 (* 1 if the car has collided with the track boundaries)

### Adding Graphics
To enhance the graphics of the environment, i generated a background image based on the drawn track by locally running a stable diffusion model and prompting it with the track image and a text description of the track i want.

I used the JuggernautXL model trough the [Fooocus UI](https://github.com/lllyasviel/Fooocus).  
I got the best results by setting the image prompt mode to CPDS, in order to keep the structure of the track. By decreasing the weight of the image prompt, the track structure is kept, but the model is free to generate a more interesting background.

![Image Gen](./resources/fooocus_image_gen.png)

Also for the car i generated a sprite image that is used to draw the car on the screen.   
Note: I also got some quite funny results:  
![Car Gen](./resources/car_fail.png)

The environment allows to switch between the simple wireframe graphics and the enhanced graphics by pressing the 'g' key.
I also added the possibility to zoom in and out using the mouse wheel. When the whole track is visible the camera will center on the track instead of the car.

## Demo animation
![Demo Animation](./resources/Trained_Env_Demo.gif)

## Helper functions

In [1]:
%load_ext autoreload
%autoreload 2

from datetime import datetime
import threading
import torch
import pygame as pg 
from minif1env import MiniF1RLEnv
from stable_baselines3 import A2C, PPO
from stable_baselines3.common.env_util import make_vec_env
from torch.utils.tensorboard import SummaryWriter
from stable_baselines3.common.callbacks import CheckpointCallback
import os
import glob

def get_writer(texts: dict):
    writer = SummaryWriter()
    for key, value in texts.items():
        writer.add_text(str(key), str(value))
    return writer

# Use a separate thread to render the environment
# Avoids crashing the pg window during training
def render(env: MiniF1RLEnv, 
           close_event, 
           human_control=False, 
           fps=60):
    pg.init()
    while True:
        delay_ms = 1000 // fps
        env.render()
        pg.time.wait(delay_ms) 

        done = env.handle_pg_events(human_control=human_control)

        if done:
           close_event.set()

        if close_event.is_set():
            pg.quit()
            return
        
def latest_checkpoint_file_path(model_description: str):
  checkpoint_dir = "checkpoints"
  checkpoint_files =  glob.glob(os.path.join(checkpoint_dir, f"*{model_description}*/*.zip"))
  if not checkpoint_files:
    return None
  latest_checkpoint = max(checkpoint_files, key=os.path.getctime)
  return latest_checkpoint


pygame 2.5.2 (SDL 2.28.3, Python 3.11.5)
Hello from the pygame community. https://www.pygame.org/contribute.html


# Manual Play with the environment


In [8]:
env = MiniF1RLEnv()

done = False
last_frame = datetime.now()
while not done:

    # Limit framerate for manual controlls
    if (datetime.now() - last_frame).total_seconds() < 1/100:
        continue
    last_frame = datetime.now()

    exit = env.handle_pg_events(human_control=True)

    observation, step_reward, terminate, info, idk = env.step(None)
    env.render()

    if terminate:
        env.reset()

    if exit:
        break

env.close()


# Training

In [5]:


# Create the environment
env = MiniF1RLEnv()

# Create and start the rendering thread
close_event = threading.Event()
render_thread = threading.Thread(target=render, args=(env,close_event,False,30))
render_thread.start()

# Set the training parameters
total_timesteps = 1_000_000
model_description = "PPO_MlpPolicy_64_64_R7"
load_checkpoint = True

# Load the latest checkpoint if available
latest_checkpoint = latest_checkpoint_file_path(model_description)
if load_checkpoint and latest_checkpoint:
  print(f"Loading checkpoint: {latest_checkpoint}")
  model = PPO.load(latest_checkpoint, env, verbose=1)
else:
  model = PPO("MlpPolicy", env, verbose=1)
run_name = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + model_description

# Define the policy network and enable GPU usage
policy_kwargs = dict(activation_fn=torch.nn.ReLU, net_arch=[64, 64])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Tensorboard writer
writer = get_writer({"run_name": run_name,
                    "model_description": model_description,
                    "total_timesteps": total_timesteps,
                    "env_name": "MiniF1RLEnv",
                    "reward_weights": env.get_reward_weights(),
                    "device": device,
                    "policy_kwargs": policy_kwargs})

# Save a checkpoint every 1000 steps
checkpoint_callback = CheckpointCallback(
  save_freq=10_000,
  save_path=f"./checkpoints/{run_name}",
  name_prefix=f"{run_name}_checkpoint",
  save_replay_buffer=True,
  save_vecnormalize=True,
)

# Train the model
model.learn(total_timesteps=total_timesteps, 
            tb_log_name=run_name, 
            callback=checkpoint_callback)
model.save(f"models/{run_name}")

# Close the rendering thread and environment
close_event.set()
render_thread.join()
env.close()

# Close the tensorboard writer
writer.close()



Loading checkpoint: checkpoints\20240507_220914_PPO_MlpPolicy_64_64_R7\20240507_220914_PPO_MlpPolicy_64_64_R7_checkpoint_90000_steps.zip
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Using device: cuda


KeyboardInterrupt: 

# Inference


In [13]:
import time


model_description = "PPO_MlpPolicy_64_64_R7"


env = MiniF1RLEnv()

# Inference

# Load the latest checkpoint
latest_checkpoint = latest_checkpoint_file_path(model_description)
if not latest_checkpoint:
    raise ValueError(f"No checkpoint found for model description: {model_description}")
print(f"Loading checkpoint: {latest_checkpoint}")

model = PPO.load(latest_checkpoint)

# Create and start the rendering thread
close_event = threading.Event()
render_thread = threading.Thread(target=render, args=(env,close_event,False,30))
render_thread.start()

# Run the model
for i in range(100):
    obs, info = env.reset()
    done = False
    while not done:
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, done, info, idk = env.step(action)

        if close_event.is_set():
            break

        #time.sleep(0.003)

    # Close the rendering thread and environment


close_event.set()
render_thread.join()
env.close()



Loading checkpoint: checkpoints\20240507_220914_PPO_MlpPolicy_64_64_R7\20240507_220914_PPO_MlpPolicy_64_64_R7_checkpoint_90000_steps.zip


# Training Log

## Tests 01
Model:  
Timesteps: 10'000   
Training Time: 1h 10min 
Algorithm: A2C
Result: The car learned to always directly drive into the right border.   
Explaination. After checking the rewards that were given to the model, i realized that the reward function was not working as intended. The weight for the progress reward was way to low so the model did not learn to follow the track. Probebly also the punishment for hitting the track boundaries was to low.

## Tests 02
Model: 
Timesteps: 10'000
Training Time: 1h 10min
Algorithm: PPO
