In [None]:
from jetracer.nvidia_racecar import NvidiaRacecar
import matplotlib.pyplot as plt
from jetcam.csi_camera import CSICamera
from torch import nn
from torchvision import transforms
from IPython.display import clear_output
import torch
import PIL
import pathlib
import time
import threading
import os
import numpy as np

In [None]:
device = torch.device('cuda:0')

In [None]:
car = NvidiaRacecar()

In [None]:
camera = CSICamera(width=224, height=224)

In [None]:
class imagereader:
    def __init__(self):
        self.camera = camera
        self._oldimage = None
        self.camera.running = True
    
    def read(self):
        while True:
            i = self.camera.value
            try:
                diff = np.abs(i[0] - self._oldimage[0])
                diff = np.where(diff < 250, diff, 0)
                diff = diff.sum()
                if diff > 0:
                    break
            except:
                break
        self._oldimage = i
        return PIL.Image.fromarray(self._oldimage[...,::-1])

In [None]:
ir = imagereader()

In [None]:
img = ir.read()

In [None]:
img

In [None]:
def preprocess_image(img):
    """
    Augments an image for processing.
    
    Args:
        img: PIL image
        
    returns: tensor with an augmented version of the image
    """
    img = transforms.functional.to_grayscale(img)
    X = transforms.functional.to_tensor(img) 
    return X

In [None]:
class drive:
    """
    ContextManager to drive the car and always stop when the context terminates
    """
    def __init__(self, car, speed=-0.5):
        self.car = car
        self.speed = speed
        
    def __enter__(self):
        self.car.throttle = self.speed
        
    def __exit__(self, *args):
        self.car.throttle = 0

In [None]:
stopmodel = nn.Conv1d(1, 1, kernel_size=3, padding=0)
stopmodel.weight.requires_grad = False
stopmodel.weight[...] = 1
stopmodel.bias.data = torch.tensor([0.0])
stopmodel.to(device)

In [None]:
def auto_stop(X, threshold=1.3):
    """
    Args:
        X: tensor of the current image of the car

    Returns: bool, True when the trajectory is no longer visible
    """
    r = stopmodel(X[:,:,-1,:])
    return (torch.min(r) > threshold).item()

In [None]:
# demonstrate auto_stop
auto_stop(preprocess_image(ir.read()).unsqueeze(0).to(device))

In [None]:
class RaceMonster(nn.Module):
    def __init__(self):
        super().__init__()
        self.w1 = nn.Linear(224*224, 100)
        self.w2 = nn.Linear(100, 1)
        self.relu= nn.ReLU()
        
    def forward(self, X):
        return self.w2(self.relu(self.w1(X.view(len(X), -1)))).view(-1)

In [None]:
filename = 'Perceptron.all.state'
model = RaceMonster()
model.load_state_dict(torch.load(filename))
model.to(device)                       

In [None]:
# alleen sturen
count = 0
with torch.no_grad():                         # no training
    start = time.time()
    while True:
        count += 1
        clear_output()
        print(f'fps {count / (time.time() - start)}')   # print fps
        image = ir.read()                    # lees plaatje
        X = preprocess_image(image)          # preprocess
        X = X.unsqueeze(0)                   # (1,1,224,224)
        X = X.to(device)                     # naar gpu
        direction = model(X).cpu().item()    # voorspelling
        car.steering = direction             # stuur auto

In [None]:
# stukje rijden
with drive(car, speed=-0.5):
    with torch.no_grad():                         # no training
        while True:
            image = ir.read()
            X = preprocess_image(image)
            X = X.unsqueeze(0)
            X = X.to(device)
            direction = model(X).cpu().item()
            car.steering = direction
            if auto_stop(X):
                break