### Camera

In [1]:
# Step 1: Clean slate
from jetracer.camera_utils import JetRacerCamera, release_cam
import time

# Step 2: Release any existing cameras
release_cam()
time.sleep(2)

# Step 3: Create camera for your use case
camera = JetRacerCamera('training')  # or 'inference' or 'safe'

# Step 4: Start camera with error handling
if camera.start():
    print("✅ Camera initialized successfully!")
    print(f"Resolution: {camera.width}x{camera.height}")
    print(f"Mode: {camera.mode}")
    
    # Test image capture
    img = camera.read()
    if img is not None:
        print(f"✅ Capturing images: {img.shape}")
        print("🎯 Camera ready for use!")
    else:
        print("❌ Camera not capturing images")
else:
    print("❌ Camera failed to initialize")
    print("🔧 Try restarting Jupyter kernel")

🔄 Releasing all cameras...
Found camera objects: ['CSICamera', 'JetRacerCamera']
✓ Stopped CSICamera.running
✓ Deleted CSICamera
✓ Stopped JetRacerCamera.running
✓ Garbage collection completed
⏳ Waiting for camera hardware to release...
✅ Camera release complete!
GST_ARGUS: Creating output stream
CONSUMER: Waiting until producer is connected...
GST_ARGUS: Available Sensor modes :
GST_ARGUS: 3280 x 2464 FR = 21.000000 fps Duration = 47619048 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 3280 x 1848 FR = 28.000001 fps Duration = 35714284 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1920 x 1080 FR = 29.999999 fps Duration = 33333334 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1640 x 1232 FR = 29.999999 fps Duration = 33333334 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_A



✓ Camera started successfully
✓ Raw image shape: (480, 640, 3)
✓ Processed image shape: (224, 224, 3)
✅ Camera initialized successfully!
Resolution: 224x224
Mode: training
✅ Capturing images: (224, 224, 3)
🎯 Camera ready for use!


In [2]:
# Get the current frame from camera
image = camera.value

# Display image info
print(f"Image shape: {image.shape}")
print(f"Image type: {type(image)}")

Image shape: (224, 224, 3)
Image type: <class 'numpy.ndarray'>


### Task

In [10]:
import torchvision.transforms as transforms
from xy_dataset import XYDataset

TASK = 'road_following'

CATEGORIES = ['apex']

DATASETS = ['A', 'B']

TRANSFORMS = transforms.Compose([
    transforms.ColorJitter(0.2, 0.2, 0.2, 0.2),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

datasets = {}
for name in DATASETS:
    datasets[name] = XYDataset(TASK + '_' + name, CATEGORIES, TRANSFORMS, random_hflip=True)

In [11]:
datasets[DATASETS[0]]

<xy_dataset.XYDataset at 0xfffeb176f970>

### Data Collection

In [2]:
# Working JetRacer Data Collection Interface
# Run this in your Jupyter notebook

import cv2
import ipywidgets
import threading
import time
import os
import uuid
from IPython.display import display, clear_output
from jetracer.camera_utils import JetRacerCamera, release_cam

print("🎯 JETRACER DATA COLLECTION SETUP")
print("=" * 40)

# Step 1: Initialize camera
release_cam()
time.sleep(2)

camera = JetRacerCamera('training')  # 224x224 for training
if not camera.start():
    print("❌ Camera failed to start")
    exit()

print("✅ Camera initialized for data collection")

# Step 2: Set up data collection parameters
CATEGORIES = ['apex']  # Road following target points
DATASETS = ['A', 'B']  # Different data collection sessions

# Create data directories
base_dir = "/home/checker/Autonomous-Racecar/data/training_images"
for dataset in DATASETS:
    for category in CATEGORIES:
        os.makedirs(f"{base_dir}/road_following_{dataset}/{category}", exist_ok=True)

print(f"✅ Data directories ready: {base_dir}")

# Step 3: Data collection class
class DataCollector:
    def __init__(self):
        self.current_dataset = DATASETS[0]
        self.current_category = CATEGORIES[0]
        self.count = 0
        self.collecting = False
        
    def get_save_path(self):
        return f"{base_dir}/road_following_{self.current_dataset}/{self.current_category}"
    
    def get_count(self):
        path = self.get_save_path()
        if os.path.exists(path):
            return len([f for f in os.listdir(path) if f.endswith('.jpg')])
        return 0
    
    def save_image(self, image, x, y):
        # Save image with click coordinates in filename
        filename = f"{x}_{y}_{str(uuid.uuid4())}.jpg"
        filepath = os.path.join(self.get_save_path(), filename)
        
        # Save the image
        cv2.imwrite(filepath, image)
        
        # Update count
        self.count = self.get_count()
        print(f"💾 Saved: {filename} (Total: {self.count})")
        
        return filepath

# Initialize data collector
collector = DataCollector()

# Step 4: Create interactive widgets
print("🎛️ Creating interactive interface...")

# Image display widget
image_widget = ipywidgets.Image(
    format='jpeg',
    width=400,
    height=400,
)

# Control widgets
dataset_widget = ipywidgets.Dropdown(
    options=DATASETS,
    value=DATASETS[0],
    description='Dataset:'
)

category_widget = ipywidgets.Dropdown(
    options=CATEGORIES,
    value=CATEGORIES[0],
    description='Category:'
)

count_widget = ipywidgets.IntText(
    value=collector.get_count(),
    description='Count:',
    disabled=True
)

# Click coordinates display
click_info = ipywidgets.HTML(value="<b>Click on the image to collect data</b>")

# Collection status
status_widget = ipywidgets.HTML(value="<b>Status: Ready</b>")

# Step 5: Update functions
def update_dataset(change):
    collector.current_dataset = change['new']
    collector.count = collector.get_count()
    count_widget.value = collector.count
    status_widget.value = f"<b>Status: Dataset {collector.current_dataset} selected</b>"

def update_category(change):
    collector.current_category = change['new']
    collector.count = collector.get_count()
    count_widget.value = collector.count
    status_widget.value = f"<b>Status: Category {collector.current_category} selected</b>"

dataset_widget.observe(update_dataset, names='value')
category_widget.observe(update_category, names='value')

# Step 6: Camera feed and click handling
def update_camera_feed():
    """Update camera feed continuously"""
    while collector.collecting:
        try:
            img = camera.read()
            if img is not None:
                # Convert BGR to RGB for display
                img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                
                # Encode as JPEG
                _, buffer = cv2.imencode('.jpg', img_rgb)
                image_widget.value = buffer.tobytes()
                
        except Exception as e:
            print(f"Camera feed error: {e}")
            break
        
        time.sleep(0.1)  # 10 FPS display

# Click handler for data collection
def handle_click(x, y, width, height):
    """Handle click on image for data collection"""
    try:
        # Get current camera image
        img = camera.read()
        if img is not None:
            # Scale click coordinates to image size
            img_x = int((x / width) * camera.width)
            img_y = int((y / height) * camera.height)
            
            # Save the image with click coordinates
            filepath = collector.save_image(img, img_x, img_y)
            
            # Update displays
            count_widget.value = collector.count
            click_info.value = f"<b>Last click: ({img_x}, {img_y}) - Saved!</b>"
            status_widget.value = f"<b>Status: Saved image #{collector.count}</b>"
            
            # Show preview with click point
            preview_img = img.copy()
            cv2.circle(preview_img, (img_x, img_y), 8, (0, 255, 0), 3)
            preview_rgb = cv2.cvtColor(preview_img, cv2.COLOR_BGR2RGB)
            _, buffer = cv2.imencode('.jpg', preview_rgb)
            image_widget.value = buffer.tobytes()
            
        else:
            status_widget.value = "<b>Status: No camera image available</b>"
            
    except Exception as e:
        status_widget.value = f"<b>Status: Error - {e}</b>"

# Step 7: JavaScript for click handling
click_handler_js = """
<script>
function setupClickHandler() {
    const images = document.querySelectorAll('.widget-image img');
    images.forEach(img => {
        img.style.cursor = 'crosshair';
        img.onclick = function(event) {
            const rect = img.getBoundingClientRect();
            const x = event.clientX - rect.left;
            const y = event.clientY - rect.top;
            
            // Send click to Python
            const kernel = Jupyter.notebook.kernel;
            kernel.execute(`handle_click(${x}, ${y}, ${rect.width}, ${rect.height})`);
        };
    });
}

// Setup click handler after a short delay
setTimeout(setupClickHandler, 1000);
</script>
"""

# Step 8: Control buttons
start_button = ipywidgets.Button(description="Start Collection")
stop_button = ipywidgets.Button(description="Stop Collection")

def start_collection(b):
    collector.collecting = True
    start_button.disabled = True
    stop_button.disabled = False
    status_widget.value = "<b>Status: Collecting data - Click on image!</b>"
    
    # Start camera feed thread
    threading.Thread(target=update_camera_feed, daemon=True).start()

def stop_collection(b):
    collector.collecting = False
    start_button.disabled = False
    stop_button.disabled = True
    status_widget.value = "<b>Status: Collection stopped</b>"

start_button.on_click(start_collection)
stop_button.on_click(stop_collection)

# Step 9: Create layout
controls = ipywidgets.VBox([
    dataset_widget,
    category_widget,
    count_widget,
    ipywidgets.HBox([start_button, stop_button]),
    status_widget,
    click_info
])

layout = ipywidgets.HBox([
    image_widget,
    controls
])

# Step 10: Display everything
print("🎯 Data Collection Interface Ready!")
print("\nInstructions:")
print("1. Click 'Start Collection'")
print("2. Click on the camera image where you want the car to go")
print("3. Images will be saved automatically with click coordinates")
print("4. Use different datasets (A, B) for different training sessions")

display(layout)

# Add click handler JavaScript
from IPython.display import HTML
display(HTML(click_handler_js))

print("\n✅ Setup complete! Start collecting data by clicking the button above.")

🎯 JETRACER DATA COLLECTION SETUP
🔄 Releasing all cameras...
Found camera objects: ['JetRacerCamera']
✓ Stopped JetRacerCamera.running
✓ Garbage collection completed
⏳ Waiting for camera hardware to release...
✅ Camera release complete!


Error generated. /dvs/git/dirty/git-master_linux/multimedia/nvgstreamer/gst-nvarguscamera/gstnvarguscamerasrc.cpp, execute:805 Failed to create CaptureSession


RuntimeError: Could not initialize camera.  Please see error trace.

### Model

In [5]:
import torch
import torchvision

device = torch.device('cuda')
output_dim = 2 * len(dataset.categories)  # x, y coordinate for each category

# ALEXNET
# model = torchvision.models.alexnet(pretrained=True)
# model.classifier[-1] = torch.nn.Linear(4096, output_dim)

# SQUEEZENET 
# model = torchvision.models.squeezenet1_1(pretrained=True)
# model.classifier[1] = torch.nn.Conv2d(512, output_dim, kernel_size=1)
# model.num_classes = len(dataset.categories)

# RESNET 18
model = torchvision.models.resnet18(pretrained=True)
model.fc = torch.nn.Linear(512, output_dim)

# RESNET 34
# model = torchvision.models.resnet34(pretrained=True)
# model.fc = torch.nn.Linear(512, output_dim)

# DENSENET 121
# model = torchvision.models.densenet121(pretrained=True)
# model.classifier = torch.nn.Linear(model.num_features, output_dim)

model = model.to(device)

model_save_button = ipywidgets.Button(description='save model')
model_load_button = ipywidgets.Button(description='load model')
model_path_widget = ipywidgets.Text(description='model path', value='road_following_model.pth')

def load_model(c):
    model.load_state_dict(torch.load(model_path_widget.value))
model_load_button.on_click(load_model)
    
def save_model(c):
    torch.save(model.state_dict(), model_path_widget.value)
model_save_button.on_click(save_model)

model_widget = ipywidgets.VBox([
    model_path_widget,
    ipywidgets.HBox([model_load_button, model_save_button])
])


display(model_widget)



VBox(children=(Text(value='road_following_model.pth', description='model path'), HBox(children=(Button(descrip…

### Live Execution

In [6]:
import threading
import time
from utils import preprocess
import torch.nn.functional as F

state_widget = ipywidgets.ToggleButtons(options=['stop', 'live'], description='state', value='stop')
prediction_widget = ipywidgets.Image(format='jpeg', width=camera.width, height=camera.height)

def live(state_widget, model, camera, prediction_widget):
    global dataset
    while state_widget.value == 'live':
        image = camera.value
        preprocessed = preprocess(image)
        output = model(preprocessed).detach().cpu().numpy().flatten()
        category_index = dataset.categories.index(category_widget.value)
        x = output[2 * category_index]
        y = output[2 * category_index + 1]
        
        x = int(camera.width * (x / 2.0 + 0.5))
        y = int(camera.height * (y / 2.0 + 0.5))
        
        prediction = image.copy()
        prediction = cv2.circle(prediction, (x, y), 8, (255, 0, 0), 3)
        prediction_widget.value = bgr8_to_jpeg(prediction)
            
def start_live(change):
    if change['new'] == 'live':
        execute_thread = threading.Thread(target=live, args=(state_widget, model, camera, prediction_widget))
        execute_thread.start()

state_widget.observe(start_live, names='value')

live_execution_widget = ipywidgets.VBox([
    prediction_widget,
    state_widget
])

display(live_execution_widget)

VBox(children=(Image(value=b'', format='jpeg', height='720', width='1280'), ToggleButtons(description='state',…

In [7]:
BATCH_SIZE = 8

optimizer = torch.optim.Adam(model.parameters())
# optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)

epochs_widget = ipywidgets.IntText(description='epochs', value=1)
eval_button = ipywidgets.Button(description='evaluate')
train_button = ipywidgets.Button(description='train')
loss_widget = ipywidgets.FloatText(description='loss')
progress_widget = ipywidgets.FloatProgress(min=0.0, max=1.0, description='progress')

def train_eval(is_training):
    global BATCH_SIZE, LEARNING_RATE, MOMENTUM, model, dataset, optimizer, eval_button, train_button, accuracy_widget, loss_widget, progress_widget, state_widget
    
    try:
        train_loader = torch.utils.data.DataLoader(
            dataset,
            batch_size=BATCH_SIZE,
            shuffle=True
        )

        state_widget.value = 'stop'
        train_button.disabled = True
        eval_button.disabled = True
        time.sleep(1)

        if is_training:
            model = model.train()
        else:
            model = model.eval()

        while epochs_widget.value > 0:
            i = 0
            sum_loss = 0.0
            error_count = 0.0
            for images, category_idx, xy in iter(train_loader):
                # send data to device
                images = images.to(device)
                xy = xy.to(device)

                if is_training:
                    # zero gradients of parameters
                    optimizer.zero_grad()

                # execute model to get outputs
                outputs = model(images)

                # compute MSE loss over x, y coordinates for associated categories
                loss = 0.0
                for batch_idx, cat_idx in enumerate(list(category_idx.flatten())):
                    loss += torch.mean((outputs[batch_idx][2 * cat_idx:2 * cat_idx+2] - xy[batch_idx])**2)
                loss /= len(category_idx)

                if is_training:
                    # run backpropogation to accumulate gradients
                    loss.backward()

                    # step optimizer to adjust parameters
                    optimizer.step()

                # increment progress
                count = len(category_idx.flatten())
                i += count
                sum_loss += float(loss)
                progress_widget.value = i / len(dataset)
                loss_widget.value = sum_loss / i
                
            if is_training:
                epochs_widget.value = epochs_widget.value - 1
            else:
                break
    except e:
        pass
    model = model.eval()

    train_button.disabled = False
    eval_button.disabled = False
    state_widget.value = 'live'
    
train_button.on_click(lambda c: train_eval(is_training=True))
eval_button.on_click(lambda c: train_eval(is_training=False))
    
train_eval_widget = ipywidgets.VBox([
    epochs_widget,
    progress_widget,
    loss_widget,
    ipywidgets.HBox([train_button, eval_button])
])

display(train_eval_widget)

VBox(children=(IntText(value=1, description='epochs'), FloatProgress(value=0.0, description='progress', max=1.…

### All together!

The following widget can be used to label a multi-class x, y dataset.  It supports labeling only one instance of each class per image (ie: only one dog), but multiple classes (ie: dog, cat, horse) per image are possible.

Click the image on the top left to save an image of ``category`` to ``dataset`` at the clicked location.

| Widget | Description |
|--------|-------------|
| dataset | Selects the active dataset |
| category | Selects the active category |
| epochs | Sets the number of epochs to train for |
| train | Trains on the active dataset for the number of epochs specified |
| evaluate | Evaluates the accuracy on the active dataset over one epoch |
| model path | Sets the active model path |
| load | Loads a model from the active model path |
| save | Saves a model to the active model path |
| stop | Disables the live demo |
| live | Enables the live demo |

In [1]:
# 🏁 AUTONOMOUS DRIVING - Complete CPU Version
import torch
import torchvision
import numpy as np
import time
import cv2
import smbus
import threading
import PIL.Image
import torchvision.transforms as transforms

print("🏁 AUTONOMOUS DRIVING - Complete CPU")
print("=" * 40)

# Force CPU everywhere
torch.cuda.is_available = lambda: False
device = torch.device('cpu')

class AutonomousRacecar:
    def __init__(self):
        self.bus = smbus.SMBus(7)
        self.address = 0x40
        
        # Servo channels
        self.steering_channel = 0
        self.throttle_channel = 1
        
        # Current positions
        self.current_steering = 1500
        self.current_throttle = 1500
        
        # Position limits
        self.min_pulse = 1000
        self.max_pulse = 2000
        self.center_pulse = 1500
        
        # FULL throttle range for speed
        self.max_forward = 1200
        self.throttle_neutral = 1500
        
        print(f"🚗 Using throttle range: {self.max_forward}-{self.throttle_neutral}μs")
        
        # Calibrated steering offset
        self.steering_offset_pulse = int(0.17 * 500)  # 85μs offset
        
        # Signal refresh
        self.keep_refreshing = False
        self.refresh_thread = None
        
        # Initialize
        self.initialize_pca9685()
        self.start_signal_refresh()
        
        print("✅ Autonomous Racecar ready!")
    
    def initialize_pca9685(self):
        try:
            self.bus.write_byte_data(self.address, 0x00, 0x10)
            time.sleep(0.005)
            prescale = int(25000000 / (4096 * 50) - 1)
            self.bus.write_byte_data(self.address, 0xFE, prescale)
            self.bus.write_byte_data(self.address, 0x00, 0x20)
            time.sleep(0.005)
            self.bus.write_byte_data(self.address, 0x01, 0x04)
            self.bus.write_byte_data(self.address, 0xFA, 0x00)
            self.bus.write_byte_data(self.address, 0xFB, 0x00)
            self.bus.write_byte_data(self.address, 0xFC, 0x00)
            self.bus.write_byte_data(self.address, 0xFD, 0x00)
            
            self.set_servo_position(self.steering_channel, self.center_pulse)
            self.set_servo_position(self.throttle_channel, self.throttle_neutral)
            print("✅ PCA9685 initialized")
        except Exception as e:
            print(f"❌ Init failed: {e}")
            raise
    
    def set_servo_position(self, channel, pulse_us):
        pulse_us = max(self.min_pulse, min(self.max_pulse, pulse_us))
        pwm_val = int((pulse_us * 4096) / 20000)
        base_reg = 0x06 + 4 * channel
        
        try:
            self.bus.write_byte_data(self.address, base_reg, 0)
            self.bus.write_byte_data(self.address, base_reg + 1, 0)
            self.bus.write_byte_data(self.address, base_reg + 2, pwm_val & 0xFF)
            self.bus.write_byte_data(self.address, base_reg + 3, (pwm_val >> 8) & 0xFF)
        except Exception as e:
            print(f"❌ Servo error: {e}")
    
    def start_signal_refresh(self):
        self.keep_refreshing = True
        def refresh_loop():
            while self.keep_refreshing:
                self.set_servo_position(self.throttle_channel, self.current_throttle)
                self.set_servo_position(self.steering_channel, self.current_steering)
                time.sleep(0.02)
        
        self.refresh_thread = threading.Thread(target=refresh_loop, daemon=True)
        self.refresh_thread.start()
    
    def stop_signal_refresh(self):
        self.keep_refreshing = False
    
    def set_steering(self, value):
        steering_pulse = self.center_pulse + int(value * 500) + self.steering_offset_pulse
        steering_pulse = max(self.min_pulse, min(self.max_pulse, steering_pulse))
        self.current_steering = steering_pulse
        self.set_servo_position(self.steering_channel, steering_pulse)
    
    def set_throttle(self, value):
        if value > 0:
            new_throttle = self.throttle_neutral - int(value * (self.throttle_neutral - self.max_forward))
            new_throttle = max(self.max_forward, new_throttle)
        else:
            new_throttle = self.throttle_neutral
        
        self.current_throttle = new_throttle
        self.set_servo_position(self.throttle_channel, new_throttle)
    
    def stop(self):
        self.current_steering = self.center_pulse
        self.current_throttle = self.throttle_neutral
        self.set_servo_position(self.steering_channel, self.current_steering)
        self.set_servo_position(self.throttle_channel, self.current_throttle)
        print("🛑 STOPPED")
    
    def cleanup(self):
        self.stop_signal_refresh()
        self.stop()

def initialize_camera():
    print("🎬 Initializing camera...")
    try:
        from jetcam.csi_camera import CSICamera
        camera = CSICamera(width=224, height=224, capture_fps=21)
        camera.running = True
        time.sleep(3)
        
        test_img = camera.value
        if test_img is not None:
            print(f"✅ Camera working! Shape: {test_img.shape}")
            return camera
        else:
            print("❌ Camera not capturing")
            return None
    except Exception as e:
        print(f"❌ Camera failed: {e}")
        return None

def get_camera_image(camera):
    if camera is None:
        return None
    try:
        img = camera.value
        if img is not None and img.shape[:2] != (224, 224):
            img = cv2.resize(img, (224, 224))
        return img
    except Exception as e:
        return None

def cpu_preprocess(image):
    """CPU-only preprocessing - replaces utils.preprocess()"""
    # Convert to PIL Image
    image_pil = PIL.Image.fromarray(image)
    
    # Create CPU-only transform
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    
    # Apply transform and ensure CPU
    tensor = transform(image_pil)
    tensor = tensor.unsqueeze(0)  # Add batch dimension
    
    # Force CPU
    tensor = tensor.to(device)
    
    return tensor

def load_trained_model():
    """Load model with complete CPU forcing"""
    print("🤖 Loading model on CPU...")
    CATEGORIES = ['apex']
    
    # Create model
    model = torchvision.models.resnet18(weights=None)  # Use newer parameter
    model.fc = torch.nn.Linear(512, 2 * len(CATEGORIES))
    
    # FORCE CPU
    model = model.to(device)
    
    try:
        # Load weights with CPU forcing
        state_dict = torch.load('road_following_model_cpu.pth', 
                               map_location=device, weights_only=True)
        model.load_state_dict(state_dict)
        model.eval()
        
        # Double-check everything is on CPU
        for name, param in model.named_parameters():
            param.data = param.data.to(device)
        
        print("✅ Model loaded completely on CPU!")
        print(f"✅ Model device: {next(model.parameters()).device}")
        return model
    except Exception as e:
        print(f"❌ Model failed: {e}")
        return None

class DrivingParams:
    THROTTLE = 0.2
    STEERING_GAIN = 1.5
    STEERING_BIAS = 0.0
    MAX_STEERING = 1.0
    MIN_STEERING = -1.0
    UPDATE_RATE = 0.05

def start_autonomous_driving():
    """Complete CPU autonomous driving"""
    print("\n🏁 STARTING COMPLETE CPU AUTONOMOUS DRIVING!")
    print("=" * 40)
    
    model = load_trained_model()
    if model is None:
        return
    
    print("⚠️  SAFETY REMINDERS:")
    print("   • Keep manual override ready!")
    print("   • Press Ctrl+C to stop")
    print(f"   • Complete CPU processing")
    print(f"   • Base throttle: {DrivingParams.THROTTLE}")
    
    print("\n⏱️  Starting in 5 seconds...")
    for i in range(5, 0, -1):
        print(f"   {i}...")
        time.sleep(1)
    
    print("🚀 GO!")
    
    camera = initialize_camera()
    car = AutonomousRacecar()
    
    if camera is None:
        car.cleanup()
        return
    
    car.set_throttle(DrivingParams.THROTTLE)
    expected_pulse = 1500 - int(DrivingParams.THROTTLE * 300)
    print(f"🚗 Throttle set to {DrivingParams.THROTTLE} (~{expected_pulse}μs)")
    
    frame_count = 0
    start_time = time.time()
    
    try:
        print("🎯 Complete CPU autonomous driving active!")
        
        while True:
            # Get image
            image = get_camera_image(camera)
            if image is None:
                continue
            
            # CPU-only preprocessing (no utils.preprocess)
            processed_image = cpu_preprocess(image)
            
            # CPU-only inference
            with torch.no_grad():
                output = model(processed_image)
                steering_prediction = float(output[0][0].cpu())
            
            # Apply steering
            ai_steering_command = (steering_prediction * DrivingParams.STEERING_GAIN + 
                                 DrivingParams.STEERING_BIAS)
            ai_steering_command = max(DrivingParams.MIN_STEERING, 
                                    min(DrivingParams.MAX_STEERING, ai_steering_command))
            
            car.set_steering(ai_steering_command)
            
            frame_count += 1
            if frame_count % 50 == 0:
                elapsed = time.time() - start_time
                fps = frame_count / elapsed
                print(f"📊 Frame {frame_count} | FPS: {fps:.1f} | "
                      f"Steering: {ai_steering_command:+.3f}")
            
            time.sleep(DrivingParams.UPDATE_RATE)
            
    except KeyboardInterrupt:
        print("\n🛑 STOPPING AUTONOMOUS DRIVING")
    except Exception as e:
        print(f"\n❌ Error: {e}")
        import traceback
        traceback.print_exc()
    finally:
        car.cleanup()
        if camera:
            camera.running = False
        print("🛑 Complete CPU autonomous driving stopped safely")

print("\n🚀 COMPLETE CPU AUTONOMOUS DRIVING READY!")
print("Functions available:")
print("  start_autonomous_driving() - Start complete CPU racing")

print(f"\n⚙️ COMPLETE CPU SETTINGS:")
print(f"   Device: CPU only")
print(f"   Preprocessing: CPU only")
print(f"   Model: CPU only")
print(f"   Base throttle: {DrivingParams.THROTTLE}")

print(f"\n🏁 Ready for complete CPU autonomous driving!")

🏁 AUTONOMOUS DRIVING - Complete CPU

🚀 COMPLETE CPU AUTONOMOUS DRIVING READY!
Functions available:
  start_autonomous_driving() - Start complete CPU racing

⚙️ COMPLETE CPU SETTINGS:
   Device: CPU only
   Preprocessing: CPU only
   Model: CPU only
   Base throttle: 0.2

🏁 Ready for complete CPU autonomous driving!


In [2]:
test_throttle()

NameError: name 'test_throttle' is not defined

In [9]:
# Quick camera test
camera = initialize_camera()

if camera:
    print("📷 Testing camera capture...")
    for i in range(5):
        img = get_camera_image(camera)
        if img is not None:
            print(f"✅ Capture {i+1}: {img.shape}")
        else:
            print(f"❌ Capture {i+1}: Failed")
        time.sleep(0.5)
    
    camera.running = False
    print("✅ Camera test complete!")
else:
    print("❌ Camera failed to initialize")

🎬 Initializing camera...
GST_ARGUS: Creating output stream
CONSUMER: Waiting until producer is connected...
GST_ARGUS: Available Sensor modes :
GST_ARGUS: 3280 x 2464 FR = 21.000000 fps Duration = 47619048 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 3280 x 1848 FR = 28.000001 fps Duration = 35714284 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1920 x 1080 FR = 29.999999 fps Duration = 33333334 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1640 x 1232 FR = 29.999999 fps Duration = 33333334 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1280 x 720 FR = 59.999999 fps Duration = 16666667 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: Running with following settings:
   Camera index = 0 
   Camera mode  = 4 
   Out



✅ Camera working! Shape: (224, 224, 3)
📷 Testing camera capture...
✅ Capture 1: (224, 224, 3)
✅ Capture 2: (224, 224, 3)
✅ Capture 3: (224, 224, 3)
✅ Capture 4: (224, 224, 3)
✅ Capture 5: (224, 224, 3)
✅ Camera test complete!


In [None]:
start_autonomous_driving()


🏁 STARTING COMPLETE CPU AUTONOMOUS DRIVING!
🤖 Loading model on CPU...
✅ Model loaded completely on CPU!
✅ Model device: cpu
⚠️  SAFETY REMINDERS:
   • Keep manual override ready!
   • Press Ctrl+C to stop
   • Complete CPU processing
   • Base throttle: 0.4

⏱️  Starting in 5 seconds...
   5...
   4...
   3...
   2...
   1...
🚀 GO!
🎬 Initializing camera...
GST_ARGUS: Creating output stream
CONSUMER: Waiting until producer is connected...
GST_ARGUS: Available Sensor modes :
GST_ARGUS: 3280 x 2464 FR = 21.000000 fps Duration = 47619048 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 3280 x 1848 FR = 28.000001 fps Duration = 35714284 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1920 x 1080 FR = 29.999999 fps Duration = 33333334 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1640 x 1232 FR = 29.999999 fps Duration = 33333334 



✅ Camera working! Shape: (224, 224, 3)
🚗 Using throttle range: 1200-1500μs
✅ PCA9685 initialized
✅ Autonomous Racecar ready!
🚗 Throttle set to 0.4 (~1380μs)
🎯 Complete CPU autonomous driving active!
📊 Frame 50 | FPS: 5.4 | Steering: +0.002
📊 Frame 100 | FPS: 5.5 | Steering: +0.008
📊 Frame 150 | FPS: 5.5 | Steering: +0.007
📊 Frame 200 | FPS: 5.5 | Steering: +0.001
📊 Frame 250 | FPS: 5.5 | Steering: +0.002
📊 Frame 300 | FPS: 5.5 | Steering: +0.002
📊 Frame 350 | FPS: 5.5 | Steering: +0.002
📊 Frame 400 | FPS: 5.6 | Steering: +0.002
📊 Frame 450 | FPS: 5.6 | Steering: +0.002
📊 Frame 500 | FPS: 5.6 | Steering: +0.002
📊 Frame 550 | FPS: 5.6 | Steering: +0.002
📊 Frame 600 | FPS: 5.6 | Steering: +0.002
📊 Frame 650 | FPS: 5.6 | Steering: +0.002
📊 Frame 700 | FPS: 5.6 | Steering: +0.002
📊 Frame 750 | FPS: 5.6 | Steering: +0.002
📊 Frame 800 | FPS: 5.6 | Steering: +0.002
📊 Frame 850 | FPS: 5.6 | Steering: +0.002
📊 Frame 900 | FPS: 5.6 | Steering: +0.002
📊 Frame 950 | FPS: 5.6 | Steering: +0.002
📊 Fr

In [12]:
# Find your model file
import os

print("🔍 Looking for your model file...")

# Check current directory
current_files = [f for f in os.listdir('.') if f.endswith('.pth')]
print(f"📁 Current directory .pth files: {current_files}")

# Check common locations
common_paths = [
    './road_following_model_cpu.pth',
    './notebooks/road_following_model_cpu.pth', 
    '/home/checker/road_following_model_cpu.pth',
    '/home/checker/Autonomous-Racecar/road_following_model_cpu.pth',
    '/home/checker/notebooks/road_following_model_cpu.pth'
]

print("\n📍 Checking common locations:")
for path in common_paths:
    if os.path.exists(path):
        print(f"✅ FOUND: {path}")
        print(f"   Size: {os.path.getsize(path):,} bytes")
    else:
        print(f"❌ Not found: {path}")

# Search for any .pth files
print("\n🔍 Searching for ALL .pth files...")
for root, dirs, files in os.walk('/home/checker'):
    for file in files:
        if file.endswith('.pth') and 'road_following' in file:
            full_path = os.path.join(root, file)
            size = os.path.getsize(full_path)
            print(f"📄 Found: {full_path} ({size:,} bytes)")

🔍 Looking for your model file...
📁 Current directory .pth files: ['road_following_model_cpu.pth']

📍 Checking common locations:
✅ FOUND: ./road_following_model_cpu.pth
   Size: 44,789,468 bytes
❌ Not found: ./notebooks/road_following_model_cpu.pth
❌ Not found: /home/checker/road_following_model_cpu.pth
❌ Not found: /home/checker/Autonomous-Racecar/road_following_model_cpu.pth
❌ Not found: /home/checker/notebooks/road_following_model_cpu.pth

🔍 Searching for ALL .pth files...
📄 Found: /home/checker/jetracer/notebooks/road_following_model_cpu.pth (44,789,468 bytes)
📄 Found: /home/checker/Autonomous-Racecar/notebooks/road_following_model_cpu.pth (44,789,468 bytes)


In [7]:
# 🔧 CORRECT THROTTLE TEST - Using 1400-1500 Range
import smbus
import time

print("🔧 THROTTLE TEST - Correct Range 1400-1500")
print("=" * 40)

class CorrectThrottleTest:
    def __init__(self):
        self.bus = smbus.SMBus(7)
        self.address = 0x40
        self.throttle_channel = 1
        
        # CORRECT throttle range
        self.center_pulse = 1500
        self.max_forward = 1400   # Forward = lower values
        self.max_reverse = 1500   # Only goes down to center, no reverse?
        
        print(f"🚗 Using CORRECT throttle range:")
        print(f"   Forward: {self.max_forward}μs")
        print(f"   Neutral: {self.center_pulse}μs")
        print(f"   Range: {self.max_forward}-{self.center_pulse}μs")
        
        self.initialize_pca9685()
        print("✅ Correct throttle test ready")
    
    def initialize_pca9685(self):
        try:
            self.bus.write_byte_data(self.address, 0x00, 0x10)
            time.sleep(0.005)
            prescale = int(25000000 / (4096 * 50) - 1)
            self.bus.write_byte_data(self.address, 0xFE, prescale)
            self.bus.write_byte_data(self.address, 0x00, 0x20)
            time.sleep(0.005)
            self.bus.write_byte_data(self.address, 0x01, 0x04)
            self.bus.write_byte_data(self.address, 0xFA, 0x00)
            self.bus.write_byte_data(self.address, 0xFB, 0x00)
            self.bus.write_byte_data(self.address, 0xFC, 0x00)
            self.bus.write_byte_data(self.address, 0xFD, 0x00)
            self.set_servo_position(self.throttle_channel, self.center_pulse)
            print("✅ PCA9685 initialized")
        except Exception as e:
            print(f"❌ Init failed: {e}")
            raise
    
    def set_servo_position(self, channel, pulse_us):
        pulse_us = max(1000, min(2000, pulse_us))
        pwm_val = int((pulse_us * 4096) / 20000)
        base_reg = 0x06 + 4 * channel
        
        try:
            self.bus.write_byte_data(self.address, base_reg, 0)
            self.bus.write_byte_data(self.address, base_reg + 1, 0)
            self.bus.write_byte_data(self.address, base_reg + 2, pwm_val & 0xFF)
            self.bus.write_byte_data(self.address, base_reg + 3, (pwm_val >> 8) & 0xFF)
            print(f"Set throttle to {pulse_us}μs")
        except Exception as e:
            print(f"❌ Servo error: {e}")
    
    def test_correct_range(self):
        """Test the correct 1400-1500 range"""
        print("\n🚗 TESTING CORRECT RANGE: 1400-1500")
        
        # Start at neutral
        print("1. Neutral (1500μs)...")
        self.set_servo_position(self.throttle_channel, 1500)
        time.sleep(2)
        
        # Test different forward speeds in the correct range
        forward_speeds = [1480, 1460, 1440, 1420, 1400]
        
        for speed in forward_speeds:
            print(f"2. Forward speed ({speed}μs)...")
            self.set_servo_position(self.throttle_channel, speed)
            time.sleep(3)
            
            print("   Back to neutral...")
            self.set_servo_position(self.throttle_channel, 1500)
            time.sleep(1)
        
        print("✅ Correct range test complete!")

# Test the correct range
try:
    throttle_test = CorrectThrottleTest()
    throttle_test.test_correct_range()
except Exception as e:
    print(f"❌ Test failed: {e}")

🔧 THROTTLE TEST - Correct Range 1400-1500
🚗 Using CORRECT throttle range:
   Forward: 1400μs
   Neutral: 1500μs
   Range: 1400-1500μs
Set throttle to 1500μs
✅ PCA9685 initialized
✅ Correct throttle test ready

🚗 TESTING CORRECT RANGE: 1400-1500
1. Neutral (1500μs)...
Set throttle to 1500μs
2. Forward speed (1480μs)...
Set throttle to 1480μs
   Back to neutral...
Set throttle to 1500μs
2. Forward speed (1460μs)...
Set throttle to 1460μs
   Back to neutral...
Set throttle to 1500μs
2. Forward speed (1440μs)...
Set throttle to 1440μs
   Back to neutral...
Set throttle to 1500μs
2. Forward speed (1420μs)...
Set throttle to 1420μs
   Back to neutral...
Set throttle to 1500μs
2. Forward speed (1400μs)...
Set throttle to 1400μs
   Back to neutral...
Set throttle to 1500μs
✅ Correct range test complete!
