In [None]:
print("Importing packages...")
import cv2
import numpy as np
import torch
import time
from IPython.display import display, clear_output
from PIL import Image
import io
import gym
import gym_donkeycar
from jetracer.nvidia_racecar import NvidiaRacecar
from jetcam.csi_camera import CSICamera
from stable_baselines3 import PPO
print("Packages imported.")

In [None]:
print("Loading model...")
try:
    model = PPO.load("model_edecay_98305_robert.zip")
except KeyError:
    raise Exception("ModelError: Model trained in Stable-Baselines2, but trying to load using Stable-Baselines3.")
print("Model loaded!")

print("Initializing car...")
car = NvidiaRacecar()
print("Car initialized!")

print("Initializing video capture...")
camera = CSICamera(width=224, height=224, capture_fps=65)
print("Camera ready.")

In [None]:
def detect_orange(image: np.ndarray, lower_bound, upper_bound) -> np.ndarray:
    """
    Detect orange areas in an image and return a binary mask highlighting those areas.
    Args:
        image: RGB image array of shape (H, W, 3)
    Returns:
        Binary mask of shape (H, W) where orange areas are white (255) and others are black (0)
    """
    # Convert image to HSV
    hsv_image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
    
    # Apply the color range mask for orange
    orange_mask = cv2.inRange(hsv_image, lower_bound, upper_bound)
            
    # Optionally apply some morphological operations to clean up the mask
    kernel = np.ones((3, 3), np.uint8)
    orange_mask = cv2.morphologyEx(orange_mask, cv2.MORPH_OPEN, kernel)
    orange_mask = cv2.morphologyEx(orange_mask, cv2.MORPH_CLOSE, kernel)

    return orange_mask


def preprocess_image(image, lower_bound, upper_bound, crop_height=135):
    # Crop top portion
    cropped = image[-crop_height:, :, :]

    # Detect orange areas in the cropped image
    orange_mask = detect_orange(cropped, lower_bound, upper_bound)
    
    orange_mask = orange_mask.astype(np.float32) / 255.0
    
    return orange_mask[..., np.newaxis], orange_mask  # Add a channel dimension



In [None]:
print("Starting program...")
ENGAGE_MOTOR = False
ENABLE_BACKING_UP = False
STEERING_GAIN = 0.5
print(f"ENGAGE_MOTOR: {ENGAGE_MOTOR} | ENABLE_BACKING_UP: {ENABLE_BACKING_UP} | STEERING_GAIN {STEERING_GAIN}")

print("Warming up camera...")


try:
    while True:
        image = camera.read()
        
        orange_lower = (100, 50, 50)  # Lower bound for hue, saturation, value
        orange_upper = (125, 255, 255)  # Upper bound for hue, saturation, value
        
        # Preprocess frame and predict action
        input_tensor, orange_mask_raw = preprocess_image(image, lower_bound=orange_lower, upper_bound=orange_upper)
        input_tensor = input_tensor.astype(np.float32)
        input_tensor = np.expand_dims(input_tensor, axis=0)  # Add batch dimension
        
        orange_detected = np.count_nonzero(orange_mask_raw) >= 35

        if orange_detected or not ENABLE_BACKING_UP:
            # Run model inference
            with torch.no_grad():
                action, _states = model.predict(input_tensor)
            print(action)
            # Convert action to steering and throttle values
            steering = float(action[0][0])
            throttle = float(action[0][1])
        if not orange_detected and ENABLE_BACKING_UP:
            print("No orange!")
            steering = 0.0
            throttle = -0.3
        print(f"Steering: {steering}, Throttle: {-throttle}")
        
        # Apply controls to the car
        if ENGAGE_MOTOR:
            car.steering = -steering  # Steering controls might be reversed?
            if throttle >= 0:
                throttle = min(throttle, 0.3)  # Limit speed
                throttle += 0.2
            car.throttle = -throttle  # Throttle controls are inverted
        
        # Display the camera feed and preprocessed feed inline in the notebook
        rgb_frame = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        pil_img = Image.fromarray(rgb_frame).resize((224, 224))
        
        # Prepare the preprocessed image for display
        orange_mask = input_tensor[0, :, :, 0]  # Remove batch and channel dimensions
        # Convert mask to a displayable image
        mask_img = Image.fromarray((orange_mask * 255).astype(np.uint8)).resize((224, 224))

        # Combine the original image and the mask image side by side
        combined_width = pil_img.width + mask_img.width
        combined_img = Image.new('RGB', (combined_width, pil_img.height))
        combined_img.paste(pil_img, (0, 0))
        combined_img.paste(mask_img.convert('RGB'), (pil_img.width, 0))

        buf_combined = io.BytesIO()
        combined_img.save(buf_combined, format='PNG')
        buf_combined.seek(0)

        clear_output(wait=True)
        display(Image.open(buf_combined))
                
except KeyboardInterrupt:
    print("Exiting...")
finally:
    # Release resources
    car.throttle = 0
    car.steering = 0
    print("Resources released and program exited.")
    
    