In [1]:
from jetbot import ObjectDetector

model = ObjectDetector('ssd_mobilenet_v2_coco.engine')    

In [2]:
from jetbot import Camera

#camera = Camera.instance(width=224, height=224, fps=10)
camera = Camera.instance(width=300, height=300)

In [3]:
detections = model(camera.value)

print(detections)

[[{'label': 62, 'confidence': 0.9580695033073425, 'bbox': [0.005749940872192383, 0.38120654225349426, 0.2992469370365143, 0.8833380937576294]}, {'label': 72, 'confidence': 0.672331690788269, 'bbox': [0.07962486147880554, 0.1782410740852356, 0.35596951842308044, 0.40081506967544556]}, {'label': 72, 'confidence': 0.39140594005584717, 'bbox': [0.4887627065181732, 0.19235262274742126, 0.7384192943572998, 0.40062639117240906]}, {'label': 73, 'confidence': 0.358465313911438, 'bbox': [0.07962486147880554, 0.1782410740852356, 0.35596951842308044, 0.40081506967544556]}, {'label': 67, 'confidence': 0.35488012433052063, 'bbox': [0.16370928287506104, 0.39940571784973145, 0.5380027294158936, 0.875756025314331]}]]


In [4]:
from IPython.display import display
import ipywidgets.widgets as widgets

detections_widget = widgets.Textarea()
detections_widget.value = str(detections)
display(detections_widget)

Textarea(value="[[{'label': 62, 'confidence': 0.9580695033073425, 'bbox': [0.005749940872192383, 0.38120654225…

In [5]:
image_number = 0
object_number = 0

print(detections[image_number][object_number])

{'label': 62, 'confidence': 0.9580695033073425, 'bbox': [0.005749940872192383, 0.38120654225349426, 0.2992469370365143, 0.8833380937576294]}


In [6]:
import torch
import torchvision
import torch.nn.functional as F
import cv2
import numpy as np

# collision_model = torchvision.models.alexnet(pretrained=False)
# collision_model.classifier[6] = torch.nn.Linear(collision_model.classifier[6].in_features, 2)
# collision_model.load_state_dict(torch.load('best_model.pth'))
# device = torch.device('cuda')
# collision_model = collision_model.to(device)

mean = 255.0 * np.array([0.485, 0.456, 0.406])
stdev = 255.0 * np.array([0.229, 0.224, 0.225])

normalize = torchvision.transforms.Normalize(mean, stdev)

def preprocess(camera_value):
    global device, normalize
    x = camera_value
    # Image zoomed to 224,224 versus 224,244 obstacle avoidance model
    x = cv2.resize(x, (224, 224))
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
    x = x.transpose((2, 0, 1))
    x = torch.from_numpy(x).float()
    x = normalize(x)
    x = x.to(device)
    x = x[None, ...]
    return x

In [7]:
from jetbot import Robot

robot = Robot()

In [8]:
import PID

global follow_speed
follow_speed = 0.5
global turn_gain
turn_gain = 1.7
global follow_speed_pid, follow_speed_pid_model
follow_speed_pid_model = 1
# follow_speed_pid = PID.PositionalPID(3, 0, 0)
follow_speed_pid = PID.PositionalPID(1.5, 0, 0.05)
global turn_gain_pid
turn_gain_pid = PID.PositionalPID(0.15, 0, 0.05)
global object_yservo_pid
object_yservo_pid = PID.PositionalPID(3, 0.5, 0)

In [9]:
from jetbot import bgr8_to_jpeg

image_widget = widgets.Image(format='jpeg', width=300, height=300)
label_widget = widgets.IntText(value=47, description='tracked label')

display(widgets.VBox([
    widgets.HBox([image_widget]),
    label_widget,
]))

width = int(image_widget.width)
height = int(image_widget.height)

def detection_center(detection):
    """Calculate the center x, y coordinates of the object"""
    bbox = detection['bbox']
    center_x = (bbox[0] + bbox[2]) / 2.0 - 0.5
    center_y = (bbox[1] + bbox[3]) / 2.0 - 0.5
    return (center_x, center_y)
    
def norm(vec):
    """Calculate the length of a two-dimensional vector"""
    return np.sqrt(vec[0]**2 + vec[1]**2)

def closest_detection(detections):
    """Find the detection closest to the center of the image"""
    closest_detection = None
    for det in detections:
        center = detection_center(det)
        if closest_detection is None:
            closest_detection = det
        elif norm(detection_center(det)) < norm(detection_center(closest_detection)):
            closest_detection = det
    return closest_detection
        
def execute(change):
    global follow_speed
    global turn_gain
    global follow_speed_pid
    target_value_speed = 0
    #Update image value
    image = change['new']

    # Execute a conflict model to determine if it is blocking
    # execute collision model to determine if blocked
    # collision_output = collision_model(preprocess(image)).detach().cpu()
    # Apply the "softmax" function to normalize the output vector to have a sum of 1 (this makes it a probability distribution)
    # prob_blocked = float(F.softmax(collision_output.flatten(), dim=0)[0])
    # blocked_widget.value = prob_blocked
    
    # Blocking does not execute the following object detection program, directly return to the next round of data refresh
    # turn left if blocked
    # if prob_blocked > 0.5:
    #     robot.left(0.3)
    #     image_widget.value = bgr8_to_jpeg(image)
    #     return
        
    # compute all detected objects
    detections = model(image)
    # First, mark the non-tracking objects that appear with blue lines.
    # Draw all detections on image
    for det in detections[0]:
        bbox = det['bbox']
        cv2.rectangle(image, (int(300 * bbox[0]), int(300 * bbox[1])), (int(300 * bbox[2]), int(300 * bbox[3])), (255, 0, 0), 2)
    # Select the object you want to track, select the value of label_widget,1 is person,
    # select detections that match selected class label
    matching_detections = [d for d in detections[0] if d['label'] == int(label_widget.value)]
    
    # Then mark the object to be tracked with green lines.
    # get detection closest to center of field of view and draw it
    det = closest_detection(matching_detections)
    if det is not None:
        bbox = det['bbox']
        cv2.rectangle(image, (int(300 * bbox[0]), int(300 * bbox[1])), (int(300 * bbox[2]), int(300 * bbox[3])), (0, 255, 0), 4)

    # otherwise go forward if no target detected
    '''
    blob:https---------A string of tags generated by the blob object in html5 after being assigned to the video tag.
    '''
    if det is None:
        # robot.forward(float(follow_speed))
        robot.stop()
    # otherwsie steer towards target
    else:
        # move robot forward and steer proportional target's x-distance from center
        center = detection_center(det)

        #Follow speed PID adjustment
        follow_speed_pid.SystemOutput = 90000 * (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
        if label_widget.value == 1:
            follow_speed_pid.SetStepSignal(30000)
        elif label_widget.value == 53 or label_widget.value == 55: # 53:Apple 55:orange
            follow_speed_pid.SetStepSignal(10000)
        else:
            follow_speed_pid.SetStepSignal(10000)
        follow_speed_pid.SetInertiaTime(0.2, 0.1)

        #Steering gain PID adjustment
        turn_gain_pid.SystemOutput = center[0]
        turn_gain_pid.SetStepSignal(0)
        turn_gain_pid.SetInertiaTime(0.2, 0.1)

        #Limit steering gain to the effective range
        if label_widget.value == 1:
            target_value_turn_gain = 0.6 + abs(turn_gain_pid.SystemOutput)
        elif label_widget.value == 53 or label_widget.value == 55:
            target_value_turn_gain = 0.5 + abs(turn_gain_pid.SystemOutput)
        else:
            target_value_turn_gain = 0.5 + abs(turn_gain_pid.SystemOutput)
        if target_value_turn_gain < 0:
            target_value_turn_gain = 0
        elif target_value_turn_gain > 2:
            target_value_turn_gain = 2

        #Keep the output motor speed within the effective driving range
        target_value_speed = 0.46 + follow_speed_pid.SystemOutput / 90000
        target_value_speedl = target_value_speed + target_value_turn_gain * center[0]
        target_value_speedr = target_value_speed - target_value_turn_gain * center[0]
        if target_value_speedl<0.5:
            target_value_speedl=0
        elif target_value_speedl>1:
            target_value_speedl = 1
        if target_value_speedr<0.5:
            target_value_speedr=0
        elif target_value_speedr>1:
            target_value_speedr = 1

        robot.set_motors(target_value_speedl, target_value_speedr)
        
        if label_widget.value != 1:
            #Vertical angle camera PID adjustment
            object_yservo_pid.SystemOutput =bbox[1]*300+150*(bbox[3] - bbox[1])
            # print('Y-axis object center position value:')
            # print(object_yservo_pid.SystemOutput)
            object_yservo_pid.SetStepSignal(150)
            object_yservo_pid.SetInertiaTime(0.2, 0.1)
            #Limit the vertical angle camera angle to the specified range
            target_value_object_yservo = int(2048 + object_yservo_pid.SystemOutput)
            servo_device.Servo_serial_control(2, target_value_object_yservo)
    # Update image display to widget
    image_widget.value = bgr8_to_jpeg(image)
    
execute({'new': camera.value})

VBox(children=(HBox(children=(Image(value=b'', format='jpeg', height='300', width='300'),)), IntText(value=47,…

In [13]:
#camera.unobserve_all()
#camera.observe(execute, names='value')

import threading
import inspect
import ctypes
''' definition of the close thread function'''
def _async_raise(tid, exctype):
  """raises the exception, performs cleanup if needed"""
  tid = ctypes.c_long(tid)
  if not inspect.isclass(exctype):
    exctype = type(exctype)
  res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
  if res == 0:
    raise ValueError("invalid thread id")
  elif res != 1:
    # """if it returns a number greater than one, you're in trouble,
    # and you should call it again with exc=NULL to revert the effect"""
    ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
    raise SystemError("PyThreadState_SetAsyncExc failed")
def stop_thread(thread):
  _async_raise(thread.ident, SystemExit)
'''The function of thread 1, which continuously calls the detection function'''
def test():
    while True:
        execute({'new': camera.value})        

thread1 = threading.Thread(target=test)
thread1.setDaemon(False)
thread1.start()


Exception in thread Thread-7:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-13-f96b5e2a7d5d>", line 26, in test
    execute({'new': camera.value})
  File "<ipython-input-9-38856c137e31>", line 139, in execute
    servo_device.Servo_serial_control(2, target_value_object_yservo)
NameError: name 'servo_device' is not defined



In [12]:
import time
stop_thread(thread1)
camera.unobserve_all()
time.sleep(1.0)
robot.stop()