# Classification for stop or behavior signals in front of Road Following model

## Classification Model is the Controller followed by Road following model either from Jetbot or Jetracer

In [1]:
import torch
from torch2trt import TRTModule
import torchvision


device = torch.device('cuda')

#for classification with behavior per class/index -
CATEGORIES_classification = ['background1','redlight','greenlight','bottle']
model_trt_classification = TRTModule()
model_trt_classification.load_state_dict(torch.load('trt_classification_behavior_model.pth.pth')) #category jetracer model for jetbot model from training and build with TRT


# Road following script from Jetbot, if from Jetraceer with categories add ['apex','bottle']
CATEGORIES = []
category_roadindex= 0 # len(CATEGORIES) Categories can not be switched while driving 
model_trt = TRTModule()
model_trt.load_state_dict(torch.load('best_steering_model_xy_trt.pth')) #jetbot road following model from Jetbot road following training and build TRT (no category)

<All keys matched successfully>

### Creating the Pre-Processing Function

We have now loaded our model, but there's a slight issue. The format that we trained our model doesnt exactly match the format of the camera. To do that, we need to do some preprocessing. This involves the following steps:

1. Convert from HWC layout to CHW layout
2. Normalize using same parameters as we did during training (our camera provides values in [0, 255] range and training loaded images in [0, 1] range so we need to scale by 255.0
3. Transfer the data from CPU memory to GPU memory
4. Add a batch dimension

In [2]:
import torchvision.transforms as transforms
import torch.nn.functional as F
import cv2
import PIL.Image
import numpy as np

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda().half()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda().half()

def preprocess(image):
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device).half()
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

In [3]:
from jetbot import Camera, bgr8_to_jpeg

camera = Camera()

In [4]:
import ipywidgets
import ipywidgets.widgets as widgets
import time
import IPython
from IPython.display import display
import traitlets

target_widget = widgets.Image(format='jpeg', width=224, height=224)
x_slider = ipywidgets.FloatSlider(min=-1.5, max=1.5, description='x')
y_slider = ipywidgets.FloatSlider(min=-1.5, max=1.5, orientation='horizontal', description='y')

def display_xy(camera_image):
    image = np.copy(camera_image)
    x = x_slider.value
    y = y_slider.value
    x = int(x * 224 / 2 + 112)
    y = int(y * -224 / 2 + 112)
    image = cv2.circle(image, (x, y), 8, (0, 255, 0), 3)
    image = cv2.circle(image, (112, 224), 8, (0, 0,255), 3)
    image = cv2.line(image, (x,y), (112,224), (255,0,0), 3)
    jpeg_image = bgr8_to_jpeg(image)
    return jpeg_image

#time.sleep(1)
traitlets.dlink((camera, 'value'), (target_widget, 'value'), transform=display_xy)




<traitlets.traitlets.directional_link at 0x7ed4647b00>

We'll also create our robot instance which we'll need to drive the motors.

In [5]:
from jetbot import Robot

robot = Robot()

FPS frames per second,
* blocked: probability of blocked road (automatic updated), Manu. time...: how many frames bot should pause (20FPS*0.05s); Manu. bl thr...: manual threshold (start with 0.95), Manu.tunr: manual slider for 0 degree of backwards turn, 0 is straight back, speed is fixed for 0.3

The steering slider will display our estimated steering value.  Please remember, this value isn't the actual angle of the target, but simply a value that is
nearly proportional.  When the actual angle is ``0``, this will be zero, and it will increase / decrease with the actual angle.  

In [6]:
import ipywidgets

#category_widget.value=0
score_widgets = []

category_widget = ipywidgets.Dropdown(options=CATEGORIES_classification, description='category')

#TB  sliding average for reducing noise level of angle
#category_widget.value=0

def shift_and_add(new, test_list):
# using list slicing and "+" operator 
# shift last element to first 
    test_list = test_list[-1:] + test_list[:-1] 
    test_list[0]= new
    array_ave=sum(test_list)/len(test_list)
    
    return array_ave, test_list 


def start_category(change):
    global category_block
    if not CATEGORIES_classification: 
        category_block=0
    else:
        category_block=CATEGORIES_classification.index(category_widget.value)
        print(category_block)
    return category_block

category_widget.observe(start_category, names='value')

#repeated for initialization
if not CATEGORIES_classification: 
    print("List is empty.")
    category_index=0;
else:
    category_index=CATEGORIES_classification.index(category_widget.value)   
    for category in CATEGORIES_classification:
        score_widget = ipywidgets.FloatSlider(min=0.0, max=1.0, description=category, orientation='vertical')
        score_widgets.append(score_widget) 

    live_execution_widget = ipywidgets.HBox(score_widgets)


In [7]:
steering_slider = ipywidgets.FloatSlider(min=-1.0, max=1.0, description='steering')
speed_slider = ipywidgets.FloatSlider(min=-2, max=2.0, orientation='horizontal', description='speed')

speed_gain_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, step=0.01, description='speed gain')
steering_gain_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, step=0.01, value=0.09, description='steering gain')
steering_dgain_slider = ipywidgets.FloatSlider(min=0.0, max=0.5, step=0.001, value=0.24, description='steering kd')
steering_bias_slider = ipywidgets.FloatSlider(min=-0.3, max=0.3, step=0.01, value=0.0, description='steering bias')

#display(speed_gain_slider, steering_gain_slider, steering_dgain_slider, steering_bias_slider)
robot_control=widgets.VBox([speed_gain_slider, steering_gain_slider, steering_dgain_slider, steering_bias_slider])


#behavior
blocked_slider = ipywidgets.FloatSlider(description='blocked', min=0.0, max=1.2, orientation='horizontal')
stopduration_slider= ipywidgets.IntSlider(min=1, max=1000, step=1, value=40, description='Manu. time stop') #anti-collision stop time
block_threshold= ipywidgets.FloatSlider(min=0, max=1.2, step=0.01, value=0.9, description='Manu. bl threshold') #anti-collision block probability

turn_degree= ipywidgets.FloatSlider(min=-90, max=90, step=0.1, value=0, description='Manu. turn degree')
speedblock_gain_slider = ipywidgets.FloatSlider(min=-1.0, max=1.0, step=0.01, description='speed block')

behavior_control=widgets.VBox([blocked_slider,block_threshold, stopduration_slider, turn_degree,speedblock_gain_slider])

#category_widget = ipywidgets.Dropdown(options=np.array(CATEGORIES_classification), description='category')
prediction_widget = ipywidgets.Text(description='prediction')



predictions_disp=widgets.VBox([category_widget,prediction_widget,robot_control])


display(widgets.HBox([target_widget]))
d2 = IPython.display.display("", display_id=2)
display(widgets.HBox([predictions_disp,behavior_control,live_execution_widget]))
display(y_slider, x_slider, speed_slider, steering_slider)

#create new New View for Output
#once a slider is blue then arrow keys left right can be used


HBox(children=(Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C…

HBox(children=(VBox(children=(Dropdown(description='category', options=('background1', 'redlight', 'greenlight…

FloatSlider(value=0.0, description='y', max=1.5, min=-1.5)

FloatSlider(value=0.0, description='x', max=1.5, min=-1.5)

FloatSlider(value=0.0, description='speed', max=2.0, min=-2.0)

FloatSlider(value=0.0, description='steering', max=1.0, min=-1.0)

Next, we'll create a function that will get called whenever the camera's value changes. This function will do the following steps

1. Pre-process the camera image
2. Execute the neural network
3. Compute the approximate steering value
4. Control the motors using proportional / derivative control (PD)

In [8]:
import time
import math


count=0
count_stops=0
stop_time=50 #(for how many frames the bot should go backwards, see and of script)
angle = 0.0
angle_last = 0.0
go_on=1
max_x=camera.width
min_x=0
max_y=camera.height
min_y=0
x=0
y=0
test_list=[0,0,0,0,0,0]
to_radian=math.pi/180
category_block=0

def execute(change):
    global angle, angle_last, count, count_stops,stop_time,go_on,block_threshold,max_x, min_x,max_y,min_y,x,y, test_list,to_radian, turn_degree, category_block,category_roadindex
    count +=1
    t1 = time.time()
    
    image = change['new']
    image_preproc = preprocess(image)
    #anti_collision model)-----
    output=model_trt_classification(image_preproc)
    output = F.softmax(output, dim=1).detach().cpu().numpy().flatten()
    category_index = output.argmax()
    prediction_widget.value = CATEGORIES_classification[category_index]
        
    #chose class or add behavior------------------------
    for i, score in enumerate(list(output)):
        score_widgets[i].value = score
            
    prob_blocked = float(output.flatten()[category_block]) #second class
    #prediction_widget.value=str(category_block)
    blocked_slider.value = prob_blocked
    stop_time=stopduration_slider.value
   
    if go_on==1:
        if prob_blocked > block_threshold.value: #in case collision avoidance model hast recognized an object (not specific one) (prob_blocked) then stop for some time about ~ frames*0.05ms 
            count_stops +=1
            go_on=2
            #anti_collision------- behaviour is defined after the second else we loss one frame:
        else: #start road following if no object
            go_on=1
            count_stops=0
            #-------------------
            xy = model_trt(image_preproc).detach().float().cpu().numpy().flatten()  
            x = float(xy[2*category_roadindex])
            y = float(xy[2*category_roadindex + 1] )  
            x = int(max_x * (x / 2.0 + 0.5))
            y = int(max_y * (y / 2.0 + 0.5))
            x_joysticklike=((x-max_x/2.0)-min_x)/(max_x-min_x)
            y_joysticklike=((max_y-y)-min_y)/(max_y-min_y)     
            x_slider.value = x_joysticklike
            y_slider.value = y_joysticklike
            angle = np.arctan2(x_joysticklike, y_joysticklike)
            speed_slider.value = speed_gain_slider.value #
    else:
        count_stops=count_stops+1
        if count_stops<stop_time: #how many frames bot should pause, anti_collision-----------------add behavior here, backward -0.5, turn values from 0-224
            #here could be added a behavior function (def) for the different classed------------------------ returning angle of movement in degree_choice
            degree_choice=turn_degree.value #in degree keep between -45 (left) to 45 degree (right)  
            angle=degree_choice*to_radian            
            speed_slider.value=speedblock_gain_slider.value #-0.3 # speedblock_gain_slider.value set speed zero or negative bewtween -1 to 1
            x_slider.value = (speed_slider.value*math.tan(angle))
            y_slider.value = abs(speed_slider.value)
        else:
            go_on=1
            count_stops=0
        
    
     
    #---------        
       
    #angle, test_list=shift_and_add(angle, test_list) #reduce noise by rolling average of 6 frames 
    pid = angle * steering_gain_slider.value + (angle - angle_last) * steering_dgain_slider.value
    angle_last = angle 
    steering_slider.value = pid + steering_bias_slider.value
    if speed_slider.value>=0:
        robot.left_motor.value = max(min(speed_slider.value + steering_slider.value, 1.0), 0.0)
        robot.right_motor.value = max(min(speed_slider.value - steering_slider.value, 1.0), 0.0)
    else:
        robot.left_motor.value = max(min(speed_slider.value + steering_slider.value, 0), -1.0)
        robot.right_motor.value = max(min(speed_slider.value - steering_slider.value, 0), -1.0)
        
    #---------    
    
    t2 = time.time()
    s = f"""{int(1/(t2-t1))} FPS"""
    d2.update(IPython.display.HTML(s) )
    
    
execute({'new': camera.value})

>WARNING: This code will move the robot!! Please make sure your robot has clearance and it is on Lego or Track you have collected data on. The road follower should work, but the neural network is only as good as the data it's trained on!

In [9]:
camera.observe(execute, names='value')

Awesome! If your robot is plugged in it should now be generating new commands with each new camera frame. 

You can now place JetBot on  Lego or Track you have collected data on and see whether it can follow track.

If you want to stop this behavior, you can unattach this callback by executing the code below.

In [10]:
import time
camera.unobserve(execute, names='value')
time.sleep(0.1)  # add a small sleep to make sure frames have finished processing
robot.stop()

### Conclusion
That's it for this live demo! Hopefully you had some fun seeing your JetBot moving smoothly on track follwing the road!!!

If your JetBot wasn't following road very well, try to spot where it fails. The beauty is that we can collect more data for these failure scenarios and the JetBot should get even better :)