Requirements Installation (run once):

In [None]:
!pip install gymnasium stable-baselines3 shimmy vgamepad numpy tensorboard

Start controller and UDP listener(run ONLY ONCE)

In [None]:
import socket
import struct
import threading
import numpy as np
import vgamepad as vg
import time

# 1. Gamepad
try:
    if global_gamepad: print("üéÆ Gamepad exists.")
except NameError:
    global_gamepad = vg.VX360Gamepad()
    print("üéÆ Gamepad Created!")

# 2. Listener (NOW WITH YAW)
class Global_UDP_Listener:
    def __init__(self, ip="127.0.0.1", port=20777):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        try:
            self.sock.bind((ip, port))
        except OSError:
            print(f"‚ö†Ô∏è Port {port} occupied.")
            
        self.sock.settimeout(0.2)
        
        self.telemetry = np.zeros(10, dtype=np.float32)
        self.surface = np.zeros(4, dtype=np.int8) 
        self.lap_data = {"lapDistance": 0.0, "currentLapInvalid": 0}
        self.position = np.zeros(3, dtype=np.float32)
        self.yaw = 0.0 # <--- NEW: Car Rotation
        
        self.frame_id = 0
        self.running = True
        self.lock = threading.Lock()
        self.thread = threading.Thread(target=self.loop, daemon=True)
        self.thread.start()
        print(f"‚úÖ Global UDP Listener started on {ip}:{port}")

    def loop(self):
        while self.running:
            try:
                data, _ = self.sock.recvfrom(2048)
                self.parse_packet(data)
            except socket.timeout: continue 
            except Exception: pass

    def parse_packet(self, data):
        if len(data) < 24: return
        header = struct.unpack("<HBBBBQfIBB", data[:24])
        packet_id = header[4]
        current_frame = header[7]
        
        with self.lock:
            self.frame_id = current_frame
            player_idx = header[8]

            if packet_id == 0: # Motion
                start = 24 + (player_idx * 60)
                if len(data) >= start + 60:
                    # Pos X, Y, Z (0-12)
                    pos = struct.unpack("<fff", data[start:start+12])
                    self.position[0] = pos[0]
                    self.position[1] = pos[1]
                    self.position[2] = pos[2]
                    
                    # Yaw is at offset 48 (12 pos + 12 vel + 6 fwd + 6 right + 12 gforce)
                    self.yaw = struct.unpack("<f", data[start+48:start+52])[0]

                if len(data) >= 1424:
                    self.telemetry[6:10] = struct.unpack("<ffff", data[1408:1424])
                    
            elif packet_id == 2: # Lap Data
                start = 24 + (player_idx * 43)
                if len(data) >= start + 43:
                    lap_slice = data[start:start+43]
                    self.lap_data["lapDistance"] = struct.unpack("<f", lap_slice[12:16])[0]
                    self.lap_data["currentLapInvalid"] = struct.unpack("<B", lap_slice[29:30])[0]
                    
            elif packet_id == 6: # Telemetry
                start = 24 + (player_idx * 60)
                if len(data) >= start + 60:
                    car_slice = data[start:start+60]
                    vals = struct.unpack("<HfffBbH", car_slice[:18])
                    self.telemetry[0] = float(vals[0]) 
                    self.telemetry[1] = vals[1]        
                    self.telemetry[2] = vals[2]        
                    self.telemetry[3] = vals[3]        
                    self.telemetry[4] = float(vals[5]) 
                    self.telemetry[5] = float(vals[6]) 
                    self.surface = np.frombuffer(car_slice[56:60], dtype=np.uint8)

    def get_state(self):
        with self.lock:
            return self.telemetry.copy(), self.lap_data.copy(), self.surface.copy(), self.position.copy(), self.yaw, self.frame_id

try:
    if global_udp: pass
except NameError:
    global_udp = Global_UDP_Listener()

Connection Test

In [None]:
import time

try:
    listener = global_udp 
    print("‚úÖ Found running Global Listener.")
    
    print("Testing connection... Drive the car in game!")
    print("(Press Stop button to cancel if needed)")
    
    for i in range(20): # Print for 10 seconds
        
        # Now unpacks 3 items!
        telemetry, lap_info, surfaces = listener.get_state()
        
        speed = telemetry[0]
        throttle = telemetry[1]
        dist = lap_info['lapDistance']
        
        # Check surface types (0=Tarmac, 7=Grass, etc.)
        # Just check Rear Left (Index 0) for example
        rl_surface = surfaces[0] 
        
        status = "üü¢ ROAD" if rl_surface == 0 else "üü§ OFF-ROAD"
        
        if speed > 0:
            print(f"üöÄ Speed: {speed:.0f} km/h | Dist: {dist:.1f} | Surface: {status} ({rl_surface})")
        else:
            print(f"‚ö†Ô∏è Speed: 0.0 (Is game paused? Check settings)", end="\r")
            
        time.sleep(0.5)
        
    print("\nTest finished. Listener is ready.")

except NameError:
    print("‚ùå 'global_udp' is not defined. You must run the 'Global Resources' cell (Step 1) first!")

Gym Env

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import time
import pickle
import os

XUSB_BUTTON_START = 0x0010
XUSB_BUTTON_A = 0x1000

class F1_22_Env(gym.Env):
    def __init__(self, gamepad, udp_listener):
        super(F1_22_Env, self).__init__()
        
        self.action_space = spaces.Box(
            low=np.array([-1.0, 0.0, 0.0], dtype=np.float32), 
            high=np.array([1.0, 1.0, 1.0], dtype=np.float32), 
            dtype=np.float32
        )
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(20,), dtype=np.float32)

        self.udp = udp_listener
        self.gamepad = gamepad
        
        self.center_line = {}
        self.track_length = 4318.0
        map_file = "track_data/bahrain_center_line.pkl" 
        if os.path.exists(map_file):
            with open(map_file, 'rb') as f:
                self.center_line = pickle.load(f)
                if len(self.center_line) > 0:
                    self.track_length = max(self.center_line.keys())

        self.prev_distance = 0.0
        self.stationary_counter = 0
        self.steps_alive = 0 
        self.first_run = True
        self.prev_steer = 0.0
        self.last_frame_id = 0

    def step(self, action):
        # 1. Action (DEFINED CORRECTLY NOW)
        steer = float(action[0])
        throttle = float(action[1])
        brake = float(action[2])
        
        self.gamepad.left_joystick_float(x_value_float=steer, y_value_float=0.0)
        self.gamepad.right_trigger_float(value_float=throttle)
        self.gamepad.left_trigger_float(value_float=brake)
        self.gamepad.update()
        
        self._wait_for_new_frame()
        self.steps_alive += 1

        # 2. Get Data
        obs, lap_info, surfaces, pos, yaw, frame_id = self.udp.get_state()
        self.last_frame_id = frame_id
        
        speed = obs[0]
        dist = lap_info['lapDistance']
        invalid = lap_info['currentLapInvalid']
        
        if dist < 0: mapped_dist = self.track_length + dist
        else: mapped_dist = dist
        
        # Deviation
        deviation = self._get_deviation(mapped_dist, pos[0], pos[2])
        
        # 12 METER LIMIT
        is_off_track = deviation > 12.0 
        
        # Lookaheads
        lookaheads = [20, 50, 100, 150]
        local_points = []
        
        for look_dist in lookaheads:
            target_dist = mapped_dist + look_dist
            if target_dist > self.track_length: target_dist -= self.track_length
            
            t_idx = int(target_dist)
            tx, tz = 0, 0
            for i in range(-5, 6):
                check = t_idx + i
                if check in self.center_line:
                    tx, tz = self.center_line[check]
                    break
            
            dx = tx - pos[0]
            dz = tz - pos[2]
            local_x = dx * np.cos(-yaw) - dz * np.sin(-yaw)
            local_z = dx * np.sin(-yaw) + dz * np.cos(-yaw)
            local_points.extend([local_x / 100.0, local_z / 100.0])

        norm_dist = mapped_dist / self.track_length
        extra_obs = np.array([deviation] + [norm_dist] + local_points, dtype=np.float32)
        final_obs = np.concatenate((obs, extra_obs))

        # --- REWARD ---
        reward = 0.0
        
        # Magnet
        reward += (1.0 - (deviation / 12.0)) * 2.0 
        
        # Speed
        if speed > 0: reward += (speed / 100.0) * 0.5
        
        # Progress
        delta_dist = dist - self.prev_distance
        if delta_dist < -1000: reward += 50.0 
        elif abs(delta_dist) < 100 and delta_dist > 0: reward += delta_dist * 1.0
            
        # Brake Hint Logic (Fixed)
        far_point_x = abs(local_points[4]) * 100.0 
        if far_point_x > 30.0 and speed > 200.0 and brake < 0.1:
            reward -= 0.5 

        # Termination
        terminated = False
        truncated = False
        
        if self.steps_alive > 80:
            if is_off_track:
                reward -= 50.0; terminated = True
            elif invalid == 1:
                reward -= 20.0; terminated = True
            if speed < 10: 
                self.stationary_counter += 1
                if self.stationary_counter > 100: 
                    reward -= 50.0; terminated = True
            else:
                self.stationary_counter = 0

        if self.steps_alive % 100 == 0:
            status = "üü¢" if not is_off_track else "‚ùå"
            print(f"\rüèéÔ∏è Spd: {speed:.0f} | Dev: {deviation:.1f}m {status} | Turn: {local_points[4]*100:.1f}    ", end="")

        self.prev_distance = dist
        self.prev_steer = steer
        return final_obs, reward, terminated, truncated, {}

    def _get_deviation(self, dist_idx, cx, cz):
        idx = int(dist_idx)
        for i in range(-5, 6):
            check = idx + i
            if check in self.center_line:
                tx, tz = self.center_line[check]
                return np.sqrt((cx - tx)**2 + (cz - tz)**2)
        return 0.0

    def _wait_for_new_frame(self):
        timeout = time.time() + 0.5
        while True:
            _, _, _, _, _, current_id = self.udp.get_state()
            if current_id != self.last_frame_id: break
            if time.time() > timeout: break

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        if self.first_run:
            print("\n\n‚è≥ INITIAL STARTUP: 5s...")
            time.sleep(5.0)
            self.first_run = False
        else:
            self._restart_lap_macro()
            time.sleep(2.0)
        
        obs, lap_info, _, _, _, _ = self.udp.get_state()
        new_data = np.zeros(10, dtype=np.float32)
        self.prev_distance = lap_info['lapDistance']
        self.stationary_counter = 0
        self.steps_alive = 0
        return np.concatenate((obs, new_data)), {}

    def _restart_lap_macro(self):
        self.gamepad.press_button(button=XUSB_BUTTON_START)
        self.gamepad.update(); time.sleep(0.1)
        self.gamepad.release_button(button=XUSB_BUTTON_START)
        self.gamepad.update(); time.sleep(0.3) 
        self.gamepad.press_button(button=XUSB_BUTTON_A)
        self.gamepad.update(); time.sleep(0.1)
        self.gamepad.release_button(button=XUSB_BUTTON_A)
        self.gamepad.update(); time.sleep(0.2) 
        self.gamepad.press_button(button=XUSB_BUTTON_A)
        self.gamepad.update(); time.sleep(0.1)
        self.gamepad.release_button(button=XUSB_BUTTON_A)
        self.gamepad.update(); time.sleep(0.2) 
        self.gamepad.reset(); self.gamepad.update()
    def close(self): pass

Training Loop

In [None]:
from stable_baselines3 import PPO
import os
import torch

# ==========================================
# üëá CONTROL PANEL üëá
# ==========================================

# To RESUME a training run, paste the path inside the quotes:
#LOAD_MODEL = "models/F1_PPO_1/PPO_40000.zip"
LOAD_MODEL = None 

# How often to save? (20,000 steps @ 60Hz is approx 5-6 minutes)
SAVE_INTERVAL = 20000 

# ==========================================
# ‚öôÔ∏è SETUP
# ==========================================

base_dir = "models"
log_dir = "logs"
os.makedirs(base_dir, exist_ok=True)
os.makedirs(log_dir, exist_ok=True)

# 1. Initialize Environment with Globals
# (Assumes you ran the Global Resources cell previously)
env = F1_22_Env(global_gamepad, global_udp)

# 2. Setup or Load Model
if LOAD_MODEL is not None:
    print(f"üîÑ RESUMING: {LOAD_MODEL}")
    
    # Load previous brain
    model = PPO.load(LOAD_MODEL, env=env, device="auto")
    
    # Keep saving in the same folder
    save_dir = os.path.dirname(LOAD_MODEL)
    run_name = os.path.basename(save_dir)
    
    # Guess timestep from filename
    try:
        current_timesteps = int(LOAD_MODEL.split("_")[-1].replace(".zip", ""))
    except:
        current_timesteps = 0
        
else:
    # Find next available folder (F1_PPO_1, F1_PPO_2...)
    i = 1
    while os.path.exists(f"{base_dir}/F1_PPO_{i}"):
        i += 1
    
    run_name = f"F1_PPO_{i}"
    save_dir = f"{base_dir}/{run_name}"
    os.makedirs(save_dir, exist_ok=True)
    current_timesteps = 0
    
    print(f"üÜï NEW RUN: {run_name}")
    print(f"üìÇ Saving to: {save_dir}")

    # --- ‚ö° SPEED OPTIMIZED HYPERPARAMETERS ---
    model = PPO(
        "MlpPolicy", 
        env, 
        verbose=1, 
        tensorboard_log=log_dir, 
        device="auto",
        
        # Aggressive learning for racing
        learning_rate=0.0005, 
        n_steps=4096,         # Collect more data before updating
        batch_size=256,       # Stable updates
        gamma=0.995,          # Focus on long-term lap completion
        gae_lambda=0.95,
        clip_range=0.2,
        ent_coef=0.01         # Encourage exploration early on
    )

print("\nüèéÔ∏è TRAINING STARTED")
print(f"üíæ Saving every {SAVE_INTERVAL} steps.")
print("--------------------------------------------------")

# ==========================================
# üîÑ INFINITE LOOP
# ==========================================
while True:
    # Train
    model.learn(
        total_timesteps=SAVE_INTERVAL, 
        reset_num_timesteps=False, 
        tb_log_name=run_name
    )
    
    # Update Counter
    current_timesteps += SAVE_INTERVAL
    
    # Save Checkpoint
    save_path = f"{save_dir}/PPO_{current_timesteps}"
    model.save(save_path)
    print(f"üíæ Saved: {save_path}.zip")

Track recording block

In [None]:
import numpy as np
import time
import os
import pickle

os.makedirs("track_data", exist_ok=True)
filename = "track_data/bahrain_center_line.pkl"

try:
    listener = global_udp
    print("üî¥ RECORDING TRACK CENTER LINE...")
    print("1. Drive 1 Lap.")
    print("2. Keep car in CENTER.")
    print("3. Stop this cell when you cross the finish line.")
    
    # Map: Integer Distance -> (X, Z)
    # e.g. Meter 100 -> (X=500, Z=200)
    center_line_map = {}
    
    while True:
        _, lap_info, _, pos = listener.get_state()
        
        dist = lap_info['lapDistance']
        x = pos[0]
        z = pos[2] # We use X and Z for 2D map (Y is height)
        
        # Round distance to nearest meter to use as index
        dist_idx = int(dist)
        
        if dist > 0:
            center_line_map[dist_idx] = (x, z)
            print(f"\rRecorded Meter: {dist_idx} | Pos: {x:.1f}, {z:.1f}", end="")
            
        time.sleep(0.05) # 20Hz recording

except KeyboardInterrupt:
    print(f"\n\n‚úÖ Recording Stopped.")
    
    # Save to file
    with open(filename, 'wb') as f:
        pickle.dump(center_line_map, f)
    
    print(f"üíæ Map saved to {filename}")
    print(f"Total Points: {len(center_line_map)}")