In [1]:
import cv2
import torch
device = torch.device('cuda')

In [2]:
import torchvision
from torch2trt import TRTModule

model_trt_collision = TRTModule()
model_trt = TRTModule()
model_trt_collision.load_state_dict(torch.load('best_model_trt.pth')) 
model_trt.load_state_dict(torch.load('model_final_opti.pth'))

<All keys matched successfully>

In [3]:
import torchvision.transforms as transforms
import torch.nn.functional as F
import cv2
import PIL.Image
import numpy as np

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda().half()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda().half()

normalize = torchvision.transforms.Normalize(mean, std)

def preprocess(image):
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device).half()
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

In [4]:
import traitlets
from IPython.display import display
import ipywidgets.widgets as widgets
from jetcam.csi_camera import CSICamera
import cv2

def bgr8_to_jpeg(value, quality=75):
    return bytes(cv2.imencode('.jpeg', value)[1])

camera = CSICamera(width=224, height=224)
image = widgets.Image(format='jpeg', width=224, height=224)
blocked_slider = widgets.FloatSlider(description='blocked', min=0.0, max=1.0, orientation='vertical')

camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)

display(widgets.VBox([widgets.HBox([image, blocked_slider])]))

VBox(children=(HBox(children=(Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x…

In [5]:
from jetracer.nvidia_racecar import NvidiaRacecar

car = NvidiaRacecar()

In [None]:
import csv
import torch.nn.functional as F
from utils import preprocess
import time
from math import sin, cos, tan, pi

# Road Following parameters
STEERING_GAIN = -2.00
STEERING_BIAS = -0.15

# Vehicle specifications
WHEEL_BASE = 2.0  # Distance between front and rear axles (example value)

# Initialize state variables
count_stops = 0
stop_time = 10  # The number of frames to remain stopped
position_x, position_y = 0, 0  # Assuming a starting position of (0, 0)
orientation = 0  # Assuming a starting orientation of 0 radians
last_time = time.time()

# Open (or create) the CSV file
csv_file = open('robot_log.csv', 'a', newline='')  # Append mode
csv_writer = csv.writer(csv_file)

# Optional: Write header if the file is new
if csv_file.tell() == 0:
    csv_writer.writerow(['Time', 'Throttle', 'Steering', 'Position X', 'Position Y', 'Orientation'])

def update_position(steering, speed, time_elapsed):
    global position_x, position_y, orientation
    
    if steering == 0:  # Moving straight
        position_x += speed * time_elapsed * cos(orientation)
        position_y += speed * time_elapsed * sin(orientation)
    else:
        # Calculate turning radius
        radius = WHEEL_BASE / tan(steering)
        
        # Calculate the change in orientation
        delta_orientation = speed * time_elapsed / radius
        
        # Update orientation
        orientation = (orientation + delta_orientation) % (2 * pi)
        
        # Calculate the center of rotation
        cx = position_x - sin(orientation) * radius
        cy = position_y + cos(orientation) * radius
        
        # Update position
        position_x = cx + sin(orientation + delta_orientation) * radius
        position_y = cy - cos(orientation + delta_orientation) * radius

def update():
    global count_stops, STEERING_GAIN, STEERING_BIAS, position_x, position_y, orientation, last_time
    
    # Calculate time elapsed since last update
    current_time = time.time()
    time_elapsed = current_time - last_time
    last_time = current_time

    # Process the image and get predictions for both models
    image = preprocess(camera.read()).half()
    
    # Collision Avoidance model
    prob_blocked = F.softmax(model_trt_collision(image), dim=1).flatten()[0]
    blocked_slider.value = prob_blocked.item()
    
    if prob_blocked >= 0.5:
        count_stops += 1
        if count_stops < stop_time:
            car.steering = 0
            car.throttle = 0
        else:
            count_stops = 0
    else:
        count_stops = 0
        # Road Following model
        x = model_trt(image).detach().cpu().numpy().flatten()[0]
        
        # Compute the steering value
        steer_val = x * STEERING_GAIN + STEERING_BIAS
        
        car.steering = steer_val
        car.throttle = -0.2  # Or you can use an adjustable speed variable
    
    # Update the vehicle position
    update_position(car.steering, car.throttle, time_elapsed)

    # Save throttle, steering, and position to the CSV file
    current_time_str = time.strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp as a string
    csv_writer.writerow([current_time_str, car.throttle, car.steering, position_x, position_y, orientation])
    
    time.sleep(0.001)

# Start the loop to process camera frames
try:
    while True:
        update()
finally:
    csv_file.close()  # Ensure the CSV file is closed when exiting the loop

In [None]:
camera.observe(update)  # Attach the update function to the camera

In [None]:
import time

try:
    camera.unobserve(update)
except ValueError:
    print("The update function was not being observed. Skipping unobserve.")

time.sleep(0.1)  # add a small sleep to make sure frames have finished processing

car.throttle = 0  # Assuming the car has a throttle attribute

In [None]:
camera_link.unlink()  # don't stream to browser (will still run camera)

In [None]:
camera_link.link()  # stream to browser (wont run camera)