To begin, we need to transfer the device from CPU memory to the GPU.

In [1]:
import torch
device = torch.device('cuda')

Load the TRT optimized models.

In [2]:
from torch2trt import TRTModule

model_trt = TRTModule()
model_trt.load_state_dict(torch.load('best_steering_model_xy_trt.pth'))

model_trt_collision = TRTModule()
model_trt_collision.load_state_dict(torch.load('new_best_model_trt.pth'))

<All keys matched successfully>

# Create Preprocessing Function
It's the same for both models.

In [3]:
import torchvision.transforms as transforms
import torch.nn.functional as F
import cv2
import PIL.Image
import numpy as np

#normalize = torchvision.transforms.Normalize(mean, std)
mean = torch.Tensor([0.485, 0.456, 0.406]).cuda().half()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda().half()

def preprocess(image):
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device).half()
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

# Start & Display the Camera

In [4]:
from IPython.display import display
import ipywidgets
import traitlets
from jetbot import bgr8_to_jpeg
from jetbot.camera.zmq_camera import ZmqCamera as Camera

camera = Camera()
image_widget = ipywidgets.Image()
traitlets.dlink((camera, 'value'), (image_widget, 'value'), transform=bgr8_to_jpeg)

<traitlets.traitlets.directional_link at 0x7ee5232320>

# Create Robot Instance
This is used to drive the motors.

In [5]:
from jetbot import Robot

robot = Robot()

# Define Sliders to Control JetBot

Now, we will define sliders to control JetBot
> Note: We have initialize the slider values for best known configurations, however these might not work for your dataset, therefore please increase or decrease the sliders according to your setup and environment

1. Speed Control (speed_gain_slider): To start your JetBot increase ``speed_gain_slider`` 
2. Steering Gain Control (steering_gain_slider): If you see JetBot is wobbling, you need to reduce ``steering_gain_slider`` till it is smooth
3. Steering Bias control (steering_bias_slider): If you see JetBot is biased towards extreme right or extreme left side of the track, you should control this slider till JetBot start following line or track in the center.  This accounts for motor biases as well as camera offsets

> Note: You should play around above mentioned sliders with lower speed to get smooth JetBot road following behavior.

## Road Following Sliders

In [6]:
speed_gain_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, step=0.01, value=0.2, description='speed gain')
steering_gain_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, step=0.01, value=0.05, description='steering gain')
steering_dgain_slider = ipywidgets.FloatSlider(min=0.0, max=0.5, step=0.001, value=0.0, description='steering kd')
steering_bias_slider = ipywidgets.FloatSlider(min=-0.3, max=0.3, step=0.01, value=-0.02, description='steering bias')

display(speed_gain_slider, steering_gain_slider, steering_dgain_slider, steering_bias_slider)

FloatSlider(value=0.2, description='speed gain', max=1.0, step=0.01)

FloatSlider(value=0.05, description='steering gain', max=1.0, step=0.01)

FloatSlider(value=0.0, description='steering kd', max=0.5, step=0.001)

FloatSlider(value=-0.02, description='steering bias', max=0.3, min=-0.3, step=0.01)

## Collision Avoidance Sliders

In [8]:
blocked_slider = ipywidgets.widgets.FloatSlider(description='blocked', min=0.0, max=1.0, orientation='vertical')
stopduration_slider= ipywidgets.IntSlider(min=1, max=1000, step=1, value=10, description='time for stop') 
blocked_threshold= ipywidgets.FloatSlider(min=0, max=1.0, step=0.01, value=0.5, description='blocked threshold')

display(image_widget)
display(ipywidgets.HBox([blocked_slider, blocked_threshold, stopduration_slider]))

Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x02\x01\x0…

HBox(children=(FloatSlider(value=0.0, description='blocked', max=1.0, orientation='vertical'), FloatSlider(val…

# Setup Publisher to Send Messages to Fog

In [9]:
import zmq

# Setup the publisher and socket
pub_context = zmq.Context()
pub_socket = pub_context.socket(zmq.PUB)
pub_socket.bind('tcp://*:2000')
pub_topic = 10001

# Setup Subscriber to Receive Messages from Fog/Cloud

In [10]:
# Setup the subscriber and socket
sub_context = zmq.Context()
sub_socket = sub_context.socket(zmq.SUB)
sub_socket.connect('tcp://192.168.254.161:2001')
sub_socket.setsockopt_string(zmq.SUBSCRIBE, '11000')

In [11]:
# Setup the subscriber and socket
cloud_sub_context = zmq.Context()
cloud_sub_socket = cloud_sub_context.socket(zmq.SUB)
cloud_sub_socket.connect('tcp://192.168.254.161:2002')
cloud_sub_socket.setsockopt_string(zmq.SUBSCRIBE, '11011')

# Function to Run When Camera's Value Changes

In [19]:
import math
import time

angle = 0.0
angle_last = 0.0
count_stops = 0.0
go_on = 0.0

# The number of frames to remain stopped
stop_time = 10
x = 0.0
y = 0.0
speed_value = speed_gain_slider.value

# Set flag for `Blocked` message delivery to fog
sent_msg = False


def execute(change):
    global angle, angle_last, blocked_slider, robot, count_stops, stop_time, go_on, x, y, blocked_threshold
    global speed_value, steer_gain, steer_dgain, steer_bias
    global sent_msg
    
    #print('running function')
    
    steer_gain = steering_gain_slider.value
    steer_dgain = steering_dgain_slider.value
    steer_bias = steering_bias_slider.value
    go_on = 1
    
    # Preprocess the image
    image_preproc = preprocess(change['new']).to(device)
    
    # Collision Avoidance Model
    prob_blocked = float(F.softmax(model_trt_collision(image_preproc), dim=1).flatten()[0])
    blocked_slider.value = prob_blocked
    stop_time = stopduration_slider.value
    if go_on == 1:
        
        # Probability threshold should be above 0.5
        if prob_blocked > blocked_threshold.value:
            
            # Stop the car
            speed_gain_slider.value = 0.0
            speed_value = speed_gain_slider.value
            robot.left_motor.value = 0.0
            robot.right_motor.value = 0.0
            
            # Get light status from fog
            light_msg = ''
            try:
                light_msg = sub_socket.recv()
                print(f'Light Status: {light_msg}')
            except:
                pass
            
            # Send message only once
            if not sent_msg:
                pub_socket.send_string('%d %s' % (pub_topic, 'Object Encountered!'))
                print('Sent message to fog')
                sent_msg = True
                
                # Get new route from Cloud
                try:
                    speed_gain_slider.value = 0.0
                    speed_value = speed_gain_slider.value
                    print('trying to get new route')
                    route = cloud_sub_socket.recv()
                    print(route)
                except Exception as e:
                    print(f'Received error: {e}')
                
            count_stops += 1
            go_on = 2
            print('Finishing object avoidance loop...')
        else:
            # Start of Road Following Model
            sent_msg = False
            go_on = 1
            count_stops = 0
            xy = model_trt(image_preproc).detach().float().cpu().numpy().flatten()
            x = xy[0]
            y = (0.5 - xy[1]) / 2.0
            speed_gain_slider.value = 0.2
            speed_value = speed_gain_slider.value
    else:
        count_stops += 1
        if count_stops < stop_time:
            # Set x, y steering and speed to 0.0
            x = 0.0
            y = 0.0
            speed_value = 0
        else:
            go_on = 1
            count_stops = 0
            
    angle = math.atan2(x, y)
    pid = angle * steer_gain + (angle - angle_last) * steer_dgain
    steer_val = pid + steer_bias
    angle_last = angle
    robot.left_motor.value = max(min(speed_value + steer_val, 1.0), 0.0)
    robot.right_motor.value = max(min(speed_value - steer_val, 1.0), 0.0)
    
execute({'new': camera.value})   
    

# Attach the Function to Camera for Processing

In [20]:
camera.observe(execute, names='value')

Light Status: b'11000 b"10011 b\'Red Light\\\\r\\\\n\'"'
Sent message to fog
trying to get new route
b'11011 {"data": {"message": "Heres Your New Route!"}}'
Finishing object avoidance loop...
Light Status: b'11000 b"10011 b\'Red Light\\\\r\\\\n\'"'
Finishing object avoidance loop...
Light Status: b'11000 b"10011 b\'Red Light\\\\r\\\\n\'"'
Finishing object avoidance loop...
Light Status: b'11000 b"10011 b\'Red Light\\\\r\\\\n\'"'
Finishing object avoidance loop...
Light Status: b'11000 b"10011 b\'Red Light\\\\r\\\\n\'"'
Sent message to fog
trying to get new route
b'11011 {"data": {"message": "Heres Your New Route!"}}'
Finishing object avoidance loop...
Light Status: b'11000 b"10011 b\'Red Light\\\\r\\\\n\'"'
Finishing object avoidance loop...
Light Status: b'11000 b"10011 b\'Red Light\\\\r\\\\n\'"'
Finishing object avoidance loop...
Light Status: b'11000 b"10011 b\'Red Light\\\\r\\\\n\'"'
Finishing object avoidance loop...
Light Status: b'11000 b"10011 b\'Red Light\\\\r\\\\n\'"'
Finishi

# Run Cell Below to Stop

In [21]:
import time

camera.unobserve(execute, names='value')
time.sleep(0.1)  # add a small sleep to make sure frames have finished processing
robot.stop()

In [26]:
robot.stop()