### 重置摄像头

In [1]:
!echo 'jetson' | sudo -S systemctl restart nvargus-daemon && printf '\n'

[sudo] password for jetson: 


### 初始化

In [5]:
import torch
import torchvision
from torch2trt import torch2trt
from torch2trt import TRTModule
TASK="test"
CATEGORIES = ['apex']
SIGNAL_CATEGORIES = ['禁行','红绿灯','只许直行','只许左转','只许右行','禁止直行','禁止左转','禁止右转','人行道','无信号']
CROSS_CATEGORIES = ['道路','十字路口','左三叉路口','右三叉路口','左右三叉路口']
device = torch.device('cuda')

# 加载路径跟随模型
modelroad = torchvision.models.resnet18(pretrained=False)
modelroad.fc = torch.nn.Linear(512, 2 * len(CATEGORIES))
modelroad = modelroad.cuda().eval().half()
modelroad.load_state_dict(torch.load(TASK+'/path/model.pth'))

data = torch.zeros((1, 3, 224, 224)).cuda().half()

modelroad_trt = torch2trt(modelroad, [data], fp16_mode=True)

torch.save(modelroad_trt.state_dict(), TASK+'/road_model_trt.pth')

modelroad_trt = TRTModule()
modelroad_trt.load_state_dict(torch.load(TASK+'/road_model_trt.pth'))

# 加载交通信号分类模型
modelsignal = torchvision.models.squeezenet1_1(pretrained=False)
modelsignal.classifier[1] = torch.nn.Conv2d(512, len(SIGNAL_CATEGORIES), kernel_size=1)
modelsignal.num_classes = len(SIGNAL_CATEGORIES)

modelsignal = modelsignal.cuda().eval().half()
modelsignal.load_state_dict(torch.load(TASK+'/signal/model.pth'))

data = torch.zeros((1, 3, 224, 224)).cuda().half()

modelsignal_trt = torch2trt(modelsignal, [data], fp16_mode=True)

torch.save(modelsignal_trt.state_dict(), TASK+'/signal_model_trt.pth')

modelsignal_trt = TRTModule()
modelsignal_trt.load_state_dict(torch.load(TASK+'/signal_model_trt.pth'))


# 加载路口分类模型
modelcross = torchvision.models.squeezenet1_1(pretrained=False)
modelcross.classifier[1] = torch.nn.Conv2d(512, len(CROSS_CATEGORIES), kernel_size=1)
modelcross.num_classes = len(CROSS_CATEGORIES)

modelcross = modelcross.cuda().eval().half()
modelcross.load_state_dict(torch.load(TASK+'/cross/model.pth'))

data = torch.zeros((1, 3, 224, 224)).cuda().half()

modelcross_trt = torch2trt(modelcross, [data], fp16_mode=True)

torch.save(modelcross_trt.state_dict(), TASK+'/cross_model_trt.pth')

modelcross_trt = TRTModule()
modelcross_trt.load_state_dict(torch.load(TASK+'/cross_model_trt.pth'))

from jetracer.nvidia_racecar import NvidiaRacecar

car = NvidiaRacecar()

初始化摄像头

In [6]:
from jetcam.csi_camera import CSICamera

camera = CSICamera(width=224, height=224, capture_fps=65)

开始跑

In [7]:
from utils import preprocess
import numpy as np
import threading
import ipywidgets
import time

car.steering_gain = -0.55
car.steering_offset = -0.13
car.throttle_gain = 0.5
# car.throttle_gain = 0.5
# car.throttle = 0.25
speed_widget = ipywidgets.IntText(description='速度', value=20)
run_button = ipywidgets.Button(description='启动')
state = 1


        
    

# 运行的主循环
def run(car,camera,speed_widget,run_button):
    global state
    loop = 0
    # 红绿灯信号 1表示要停5秒
    signalRGB = 0
    #路口方向  0：直行，-1左转  1右转
    chooseDirect = 2 
    forbiddenDirect ={}
    enableDirect ={}
    #是否不再路口 0表示进入路口
    inRoad = 0
    # 确认交通信号的阈值
    condition = 0.9  
    
    # 路口方向未定，判断交通信号
    def checkSignal(output):
        if (output[SIGNAL_CATEGORIES.index('只许直行')]> condition):
            chooseDirect = 0
        elif (output[SIGNAL_CATEGORIES.index('只许左转')]> condition):
            chooseDirect = -1
        elif (output[SIGNAL_CATEGORIES.index('只许右转')]> condition):
            chooseDirect = 1
        else :
            #后面三种情形不互斥
            if (output[SIGNAL_CATEGORIES.index('禁止直行')]>condition):
                forbiddenDirect.append(0)
            if (output[SIGNAL_CATEGORIES.index('禁止左转')]>condition):
                forbiddenDirect.append(-1)
            if (output[SIGNAL_CATEGORIES.index('禁止右转')]>condition):
                forbiddenDirect.append(1)
                
    # 判断路口并处理
    def checkCrossing(output):  
        if (output[CROSS_CATEGORIES.index('十字路口')]>condition):
            enableDirect = {-1,1,0}
        elif (output[CROSS_CATEGORIES.index('左三叉路口')]>condition):
            enableDirect = {-1,0}
        elif (output[CROSS_CATEGORIES.index('右三叉路口')]>condition):
            enableDirect = {1,0}
        elif (output[CROSS_CATEGORIES.index('左右三叉路口')]>condition):
            enableDirect = {-1,1}   
            
            
    # 根据可选方向，禁行方向和路口方向确定方向。如果SRGB=1，停车，sleep 5秒，设置SRGB=2。 直接设置转向。
    def inCrossing():
        if signalRGB == 1:
            # 红绿灯 停5秒
            signalRGB = 2
            car.throttle = -0.1
            time.sleep(0.5)
            car.throttle = 0
            time.sleep(4.5)
            car.throttle = speed_widget.value*0.01
        if (chooseDirect==2):
            #确定可以行走的方向
            enableDirect=enableDirect-forbiddenDirect
            if (len(enableDirect)!=0):
                #确定方向
                chooseDirect=enableDirect[0]
        if (chooseDirect!=2):
            car.steering=0.45*chooseDirect
    while True:
        if (state == 1):
            car.throttle = 0
            car.steering = 0
            time.sleep(0.5)
        else:
            throttle = 0
            # 准备摄像头数据
            image = camera.read()
            image = preprocess(image).half()
            
            if (loop%5==0) and (inRoad == 1):
                # 识别交通信号 每5帧处理一次(进入路口后就不处理)
                output = modelsignal_trt(image)
                output = F.softmax(output, dim=1).detach().cpu().numpy().flatten()
                #category_index = output.argmax()
                
                if ((signalRGB==0) and  (output[SIGNAL_CATEGORIES.index('红绿灯')]>condition)):
                    signalRGB==1
                if (chooseDirect==2):
                    # 路口方向未定，判断方向路标
                    checkSignal(output)
                    
            if (loop%5==1) or (inRoad == 0):     
                #判断路口并处理 每5帧处理一次 与处理信号错开 进入路口后每帧都需要处理
                output = modelcross_trt(image)
                output = F.softmax(output, dim=1).detach().cpu().numpy().flatten()
                
                if (len(enableDirect)==0):
                    checkCrossing(output)
                    
                # 进入路口
                if (len(enableDirect)!=0):
                    inRoad = 0
                    inCrossing()
                    
                # 没有进入路口      
                if (len(enableDirect)==0) or (output[CROSS_CATEGORIES.index('道路')]>condition):
                    inRoad = 1
                    chooseDirect = 2 
                    forbiddenDirect ={}
                    enableDirect ={}                    
                    
            loop += 1
            #在道路上才由road模型决定方向
            if inRoad == 1:
                output = modelroad_trt(image).detach().cpu().numpy().flatten()
                x = float(output[0])
                car.steering = x 
                car.throttle = speed_widget.value*0.01
        
        
execute_thread = threading.Thread(target=run, args=(car,camera,speed_widget,run_button))

def runclick(c):
    global state
    print(run_button.description)
    if run_button.description == '启动':
        run_button.description = '停止'
        state = 0
    else:
        run_button.description = '启动'
        state = 1

run_button.on_click(runclick)

data_collection_widget = ipywidgets.HBox([
    run_button,
    speed_widget
])

display(data_collection_widget)
execute_thread.start()


HBox(children=(Button(description='启动', style=ButtonStyle()), IntText(value=20, description='速度')))