# Loading models

In [None]:
import torchvision
import torch
import torchvision
from torch2trt import TRTModule
import time

In [None]:
device = torch.device('cuda')

road_following_model = TRTModule()
road_following_model.load_state_dict(torch.load('models/road_following_model_trt.pth'))

#

block_free_model = TRTModule()
block_free_model.load_state_dict(torch.load('models/block_free_model_trt.pth'))

#

LR_model_trt = TRTModule()
LR_model_trt.load_state_dict(torch.load('models/LR_best_model_trt.pth'))

# Camera init

In [None]:
import torchvision.transforms as transforms
import torch.nn.functional as F
import cv2
import PIL.Image
import numpy as np

from jetbot import Camera, bgr8_to_jpeg

camera = Camera.instance(fps=10)
is_camera_control = True

# Linking display (for debugging)

In [None]:
from IPython.display import display
import ipywidgets
import traitlets

image_widget = ipywidgets.Image()

traitlets.dlink((camera, 'value'), (image_widget, 'value'), transform=bgr8_to_jpeg)

display(image_widget)

# Face detect and mosaic

In [None]:
width = 224
height = 224
fps = 10

fcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')
out = cv2.VideoWriter('webcam.avi', fcc, fps, (width, height))

In [None]:
import os

face_cascade = cv2.CascadeClassifier('models/haarcascade_frontalface_default.xml')
count = 0

rate = 15

def face_detect(ndarray_image):
    #input should be unnormalized images. (Ndarray)    
    
    image = ndarray_image    
    faces = face_cascade.detectMultiScale(cv2.cvtColor(image,cv2.COLOR_BGR2GRAY),1.3,5)   
    global count
    
    for (x,y,w,h) in faces:
        
        if (w or h) == 0: #if nothing found or garbage
            return image
        
        cv2.rectangle(image,(x,y),(x+w,y+h),(255,0,0),2)
        
        roi = image[y:y+h, x:x+w]

        #making picture mosaic
        roi = cv2.resize(roi, (w//rate, h//rate))
        roi = cv2.resize(roi, (w, h), interpolation=cv2.INTER_AREA)

        image[y:y+h, x:x+w] = roi
        
    return image

# Image preprocessing (Ndarray to Tensor(half))

In [None]:
def preprocess(image):
    
    mean = torch.Tensor([0.485, 0.456, 0.406]).cuda().half()
    std = torch.Tensor([0.229, 0.224, 0.225]).cuda().half()
    
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device).half()
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

# Road following / block_free functions

In [None]:
def road_following(processed_image):
    global angle, angle_last
    
    xy = road_following_model(processed_image).detach().float().cpu().numpy().flatten()
    x = xy[0]
    y = (0.5 - xy[1]) / 2.0
    
    #jetbot_spped = speed_gain_slider.value
    jetbot_spped = 0.20
    
    angle = np.arctan2(x, y)
#     pid = angle * steering_gain_slider.value + (angle - angle_last) * steering_dgain_slider.value
    pid = angle * 0.05 + (angle - angle_last) * 0.00
    angle_last = angle
    
#     steer_bias = pid + steering_bias_slider.value
    steer_bias = pid + 0.0
    
    robot.left_motor.value = max(min(jetbot_spped + steer_bias, 1.0), 0.0)
    robot.right_motor.value = max(min(jetbot_spped - steer_bias, 1.0), 0.0)

In [None]:
def block_free_detect(processed_image):
    global is_camera_control
    
    bf_detection = block_free_model(processed_image)
    bf_detection = F.softmax(bf_detection, dim=1)
    
    prob_blocked = float(bf_detection.flatten()[0])
    
    if prob_blocked < 0.6: 
      road_following(processed_image)
    
    else:        
      is_camera_control = False
      robot.set_motors(0.1, 0.1)
      left_right_detect(processed_image)

In [None]:
LR_list = []
def left_right_detect(processed_image):
    lr_detection = LR_model_trt(processed_image)
    lr_detection = F.softmax(lr_detection, dim=1)
    
    prob_right = float(lr_detection.flatten()[0])
    
    if prob_right < 0.5:
        #right     
        if len(LR_list) <3: #indeterminable
            LR_list.append('R')
    else:
        #left
        
        if len(LR_list) <3: #indeterminable
            LR_list.append('L')
    
    if len(LR_list) == 3 :
        result = max(LR_list, key=LR_list.count)
        
        if result == 'L':
            left_avoidance()
        elif result == 'R':
            right_avoidance()

# Avoidance function

In [None]:
def left_avoidance():
    robospeed = 0.14
    robot.left(0.185)
    time.sleep(0.5)
    
    while robospeed > 0.12:
        robospeed = robospeed - 0.001
        robot.set_motors(0.19, robospeed)
        time.sleep(0.15)

    
    robot.set_motors(0, 0)
    
    LR_list.clear() #list clear
    

def right_avoidance():
    robospeed = 0.14
    robot.right(0.185)
    time.sleep(0.5)
    
    while robospeed > 0.12:
        robospeed = robospeed - 0.001
        robot.set_motors(robospeed, 0.16)
        time.sleep(0.15)

    
    robot.set_motors(0, 0)
    
    LR_list.clear() #list clear

# Main excute

In [None]:
from jetbot import Robot

robot = Robot()

In [None]:
angle = 0.0
angle_last = 0.0
count = 10
def execute(change):
    global is_camera_control,count
    #change['new'] is ndarray.
    image = change['new']
    
    frame = face_detect(image)
    out.write(frame)
    
    if is_camera_control:
        block_free_detect(preprocess(image))
        count = 10
    else:
        count-=1
        if count==0:
            is_camera_control = True
        
execute({'new': camera.value})

# Passing camera images to excute function

In [None]:
camera.observe(execute, names='value')

# Execute finish

In [None]:
out.release()
camera.unobserve(execute, names='value')
time.sleep(0.1)  # add a small sleep to make sure frames have finished processing
robot.stop()
camera.stop()