
The COCO object recognition model is loaded on the JetBot and an array of object indexes is created based on the image captured by the camera. 

In [2]:
from jetbot import ObjectDetector


model = ObjectDetector('../ssd_mobilenet_v2_coco.engine')

Reset the camera and run it.

In [4]:
import os

os.system('systemctl restart nvargus-daemon')

from jetbot import Camera

camera = Camera.instance(width=300, height=300)

detections = model(camera.value)

print(detections)

[[{'label': 65, 'confidence': 0.7483522891998291, 'bbox': [0.026932626962661743, 0.0971139669418335, 0.9828534126281738, 0.977319598197937]}, {'label': 62, 'confidence': 0.480966180562973, 'bbox': [0.005629017949104309, 0.14956727623939514, 0.37860894203186035, 0.5679789781570435]}]]


Load a widget to display detected objects.

In [5]:
from IPython.display import display
import ipywidgets.widgets as widgets

detections_widget = widgets.Textarea()

detections_widget.value = str(detections)

display(detections_widget)

Textarea(value="[[{'label': 65, 'confidence': 0.7483522891998291, 'bbox': [0.026932626962661743, 0.09711396694…

Detections: The “label” value indicates which object has been detected, the “confidence” value is the probability that the recognized object is the actual object, and the “bbox” values are the bounding box parameters of the object in the image.

In [6]:
image_number = 0
object_number = 0

print(detections[image_number][object_number])

{'label': 65, 'confidence': 0.7483522891998291, 'bbox': [0.026932626962661743, 0.0971139669418335, 0.9828534126281738, 0.977319598197937]}


Load the best_path training model for collision avoidance. Preprocess camera values.

In [8]:
import torch
import torchvision
import torch.nn.functional as F
import cv2
import numpy as np

collision_model = torchvision.models.alexnet(pretrained=False)
collision_model.classifier[6] = torch.nn.Linear(collision_model.classifier[6].in_features, 2)
collision_model.load_state_dict(torch.load('best_model_FreeBlocked.pth'))
device = torch.device('cuda')
collision_model = collision_model.to(device)

mean = 255.0 * np.array([0.485, 0.456, 0.406])
stdev = 255.0 * np.array([0.229, 0.224, 0.225])

normalize = torchvision.transforms.Normalize(mean, stdev)

def preprocess(camera_value):
    global device, normalize
    x = camera_value
    x = cv2.resize(x, (224, 224))
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
    x = x.transpose((2, 0, 1))
    x = torch.from_numpy(x).float()
    x = normalize(x)
    x = x.to(device)
    x = x[None, ...]
    return x

UnpicklingError: invalid load key, 'v'.

In [None]:
Load the JetBot controls.

In [None]:
from jetbot import Robot

robot = Robot()

Load the driving instructions based on object detection.

In [None]:
from jetbot import bgr8_to_jpeg

blocked_widget = widgets.FloatSlider(min=0.0, max=1.0, value=0.0, description='blocked')
image_widget = widgets.Image(format='jpeg', width=300, height=300)
label_widget = widgets.IntText(value=1, description='tracked label')

display(widgets.VBox([
    widgets.HBox([image_widget, blocked_widget]),
    label_widget
]))

width = int(image_widget.width)
height = int(image_widget.height)

def detection_center(detection):
    """Computes the center x, y coordinates of the object"""
    bbox = detection['bbox']
    center_x = (bbox[0] + bbox[2]) / 2.0 - 0.5
    center_y = (bbox[1] + bbox[3]) / 2.0 - 0.5
    return (center_x, center_y)
    
def norm(vec):
    """Computes the length of the 2D vector"""
    return np.sqrt(vec[0]**2 + vec[1]**2)

def closest_detection(detections):
    """Finds the detection closest to the image center"""
    closest_detection = None
    for det in detections:
        center = detection_center(det)
        if closest_detection is None:
            closest_detection = det
        elif norm(detection_center(det)) < norm(detection_center(closest_detection)):
            closest_detection = det
    return closest_detection

# Simple PD controller (Kp - proportional term, Kd - derivative term)
Kp = 0.18
Kd = 0.05

min_drive = .5
min_blocked = .6
min_turn = .5
max_turn = .7
frwd_value = 0.6                    # Default value to drive forward (0 = no action, 1 = full motor capacity)
bkwd_value = 0.4
rot_value_when_exploring = 0.5        # Default value to spin to the right when robot is in exploration mode (0 = no action, 1 = full motor capacity)
min_prob_free_to_drive_frwd = .35    # Min probability prob_free for robot to drive forward 
max_n_frames_stuck = 20               # Limit on the number of frames the robot is stuck for. Once this limit is reached, robot goes into exploration mode (makes large right turn)
frame_counter = 0                     # Frame counter 
n_frames_stuck = 0                    # Initialize counter of the number of successive frames the robot is stuck for
exploration = False                   # Initialize binary variable which determines if robot is in exploration mode (when True.) Used to mark the related frames with red background  
data_log = []                         # Initialize the array whcih will store a history of telemetry readings and robot actions (for analysis and tuning)
recent_obstacles = []                 # Initialize the array to store the last frame data

def update(change):
    global robot, frame_counter, n_frames_stuck, exploration
    x = change['new'] 
    x = preprocess(x)
    y = model(x)
    
    # apply the `softmax` function to normalize the output vector so it sums to 1 (which makes it a probability distribution)
    y = F.softmax(y, dim=1)
    
    y = y.flatten()
   
    # update list of recent detections
    while (len(recent_obstacles) >= 2):
        recent_obstacles.pop(0)
    recent_obstacles.append([prob_free,prob_blocked])
    
    # check if robot got stuck and update n_frames_stuck counter
    if prob_free < min_prob_free_to_drive_frwd:
        n_frames_stuck = n_frames_stuck + 1 
    else:
        n_frames_stuck = 0
        
    # calculate errors at times t (current) and t-1 (prev)    
    # error(t) and error(t-1): prob_left-prob_right   
    if len(recent_obstacles) == 2:
        current_probs = recent_obstacles[1]
        prev_probs = recent_obstacles[0]
    else:
        current_probs = [prob_free,prob_blocked]
        prev_probs = current_probs
                
    # error = prob_left-prob_right        
    current_error = current_probs[1] - current_probs[2]
    prev_error    = prev_probs[1] - prev_probs[2]

    # increment frame counter 
    frame_counter = frame_counter + 1
    
    # define functions which determine (and return) robot actions
    def forward(value):
        robot.forward(value)
        return ("FWRD",round(value,2))

    def left(value):
        robot.left(value)
        return ("LEFT",round(value,2))

    def right(value):
        robot.right(value)
        return ("RIGHT",round(value,2))
    
    def reverse(value):
        robot.backward(value)
        return ("BKWRD",round(value,2))
    
    action = ""
  
    # estimate rotational value to turn left (if negative) or right (if positive)
    # 0 = no action, 1 = full motor capacity)
    rot_value = - Kp * current_error - Kd * (current_error - prev_error)
    
    # store propotional and differential controller components for frame-by-frame analysis
    p_component = - Kp * current_error
    d_component = - Kd * (current_error - prev_error)
        
def execute(change):
    image = change['new']
    
    # execute collision model to determine if blocked
    collision_output = collision_model(preprocess(image)).detach().cpu()
    prob_blocked = float(F.softmax(collision_output.flatten(), dim=0)[0])
    prob_free = float(F.softmax(collision_output.flatten(), dim=0)[1])
    blocked_widget.value = prob_blocked
    
    # turn left if blocked
    if n_frames_stuck < max_n_frames_stuck:

        if prob_blocked > 0.6:
            robot.left(max_turn)
            image_widget.value = bgr8_to_jpeg(image)
            return
        
        # compute all detected objects
        detections = model(image)
        
        # draw all detections on image
        for det in detections[0]:
            bbox = det['bbox']
            cv2.rectangle(image, (int(width * bbox[0]), int(height * bbox[1])), (int(width * bbox[2]), int(height * bbox[3])), (255, 0, 0), 2)
    
        # select detections that match selected class label
        matching_detections = [d for d in detections[0] if d['label'] == int(label_widget.value)]
        
        # get detection closest to center of field of view and draw it
        det = closest_detection(matching_detections)
        if det is not None:
            bbox = det['bbox']
            cv2.rectangle(image, (int(width * bbox[0]), int(height * bbox[1])), (int(width * bbox[2]), int(height * bbox[3])), (0, 255, 0), 5)
        
        # otherwise go forward if no target detected
        if det is None:
            robot.forward(float(frwd_value))
        
        # otherwsie steer towards target
        elif:
            # move robot forward and steer proportional target's x-distance from center
            center = detection_center(det)
            robot.set_motors(
                float(frwd_value + min_turn * center[0]),
                float(frwd_value - min_turn * center[0])
            )
    else:
                    exploration = True
        #robot_rotates = True
        action = reverse(bkwd_value)
        time.sleep(0.2)
        n_frames_stuck = 0
    
    time.sleep(0.001)
    
    # reset variables
    exploration = False
    robot_rotates = False
    
    # update image widget
    image_widget.value = bgr8_to_jpeg(image)
    
execute({'new': camera.value})

Connect the execute function to each camera update

In [None]:
camera.unobserve_all()
camera.observe(execute, names='value')

End processing and stop the robot

In [None]:
import time

camera.unobserve_all()
time.sleep(1.0)
robot.stop()

In [None]:
Placeholder