# 1. Data collection

In [1]:
import traitlets
import ipywidgets.widgets as widgets
from IPython.display import display
from jetbot import Camera, bgr8_to_jpeg

camera = Camera.instance(width=224, height=224)

image = widgets.Image(format='jpeg', width=224, height=224)  # this width and height doesn't necessarily have to match the camera

camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)

display(image)

Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x02\x01\x0…

In [2]:
import os

blocked_dir = 'dataset/blocked'
free_dir = 'dataset/free'

# we have this "try/except" statement because these next functions can throw an error if the directories exist already
try:
    os.makedirs(free_dir)
    os.makedirs(blocked_dir)
except FileExistsError:
    print('Directories not created becasue they already exist')

Directories not created becasue they already exist


In [3]:
button_layout = widgets.Layout(width='128px', height='64px')
free_button = widgets.Button(description='add free', button_style='success', layout=button_layout)
blocked_button = widgets.Button(description='add blocked', button_style='danger', layout=button_layout)
free_count = widgets.IntText(layout=button_layout, value=len(os.listdir(free_dir)))
blocked_count = widgets.IntText(layout=button_layout, value=len(os.listdir(blocked_dir)))

display(widgets.HBox([free_count, free_button]))
display(widgets.HBox([blocked_count, blocked_button]))

HBox(children=(IntText(value=30, layout=Layout(height='64px', width='128px')), Button(button_style='success', …

HBox(children=(IntText(value=11, layout=Layout(height='64px', width='128px')), Button(button_style='danger', d…

In [4]:
from uuid import uuid1

def save_snapshot(directory):
    image_path = os.path.join(directory, str(uuid1()) + '.jpg')
    with open(image_path, 'wb') as f:
        f.write(image.value)

def save_free():
    global free_dir, free_count
    save_snapshot(free_dir)
    free_count.value = len(os.listdir(free_dir))
    
def save_blocked():
    global blocked_dir, blocked_count
    save_snapshot(blocked_dir)
    blocked_count.value = len(os.listdir(blocked_dir))
    
# attach the callbacks, we use a 'lambda' function to ignore the
# parameter that the on_click event would provide to our function
# because we don't need it.
free_button.on_click(lambda x: save_free())
blocked_button.on_click(lambda x: save_blocked())

In [5]:
display(image)
display(widgets.HBox([free_count, free_button]))
display(widgets.HBox([blocked_count, blocked_button]))

Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x02\x01\x0…

HBox(children=(IntText(value=30, layout=Layout(height='64px', width='128px')), Button(button_style='success', …

HBox(children=(IntText(value=11, layout=Layout(height='64px', width='128px')), Button(button_style='danger', d…

In [6]:
!zip -r -q dataset.zip dataset

# 2. Train model

In [6]:
import torch
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms

In [None]:
!unzip -q dataset.zip

In [7]:
dataset = datasets.ImageFolder(
    'dataset',
    transforms.Compose([
        transforms.ColorJitter(0.1, 0.1, 0.1, 0.1),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
)

In [8]:
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [len(dataset) - 10, 10])

In [9]:
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=16,
    shuffle=True,
    num_workers=4
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=16,
    shuffle=True,
    num_workers=4
)

In [None]:
model = models.alexnet(pretrained=True)

In [None]:
model.classifier[6] = torch.nn.Linear(model.classifier[6].in_features, 2)

In [None]:
device = torch.device('cuda')
model = model.to(device)

In [None]:
NUM_EPOCHS = 30
BEST_MODEL_PATH = 'best_model.pth'
best_accuracy = 0.0

optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

for epoch in range(NUM_EPOCHS):
    
    for images, labels in iter(train_loader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = F.cross_entropy(outputs, labels)
        loss.backward()
        optimizer.step()
    
    test_error_count = 0.0
    for images, labels in iter(test_loader):
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        test_error_count += float(torch.sum(torch.abs(labels - outputs.argmax(1))))
    
    test_accuracy = 1.0 - float(test_error_count) / float(len(test_dataset))
    print('%d: %f' % (epoch, test_accuracy))
    if test_accuracy > best_accuracy:
        torch.save(model.state_dict(), BEST_MODEL_PATH)
        best_accuracy = test_accuracy

# 3. Buzzer control

In [11]:
import os
os.system("sudo chmod 777 /dev/ttyTHS1")

256

In [12]:
import RPi.GPIO as GPIO

BEEP_pin = 6 # buzzer pin number

GPIO.setmode(GPIO.BCM)
GPIO.setup(BEEP_pin, GPIO.OUT, initial = GPIO.LOW) # starting option



In [13]:
import time

def Beep(beeptime):
    GPIO.output(BEEP_pin, GPIO.HIGH) # buzzer on
    time.sleep(beeptime) # sound time
    GPIO.output(BEEP_pin, GPIO.LOW) # buzzer off

In [14]:
Beep(0.3)

# 4. Automatic avoid

In [1]:
import torch
import torchvision

model = torchvision.models.alexnet(pretrained=False)
model.classifier[6] = torch.nn.Linear(model.classifier[6].in_features, 2)

In [2]:
model.load_state_dict(torch.load('best_model.pth'))

In [3]:
device = torch.device('cuda')
model = model.to(device)

In [4]:
import cv2
import numpy as np

mean = 255.0 * np.array([0.485, 0.456, 0.406])
stdev = 255.0 * np.array([0.229, 0.224, 0.225])

normalize = torchvision.transforms.Normalize(mean, stdev)

def preprocess(camera_value):
    global device, normalize
    x = camera_value
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
    x = x.transpose((2, 0, 1))
    x = torch.from_numpy(x).float()
    x = normalize(x)
    x = x.to(device)
    x = x[None, ...]
    return x

In [5]:
import traitlets
from IPython.display import display
import ipywidgets.widgets as widgets
from jetbot import Camera, bgr8_to_jpeg

camera = Camera.instance(width=224, height=224,fps=10)
image = widgets.Image(format='jpeg', width=224, height=224)
blocked_slider = widgets.FloatSlider(description='blocked', min=0.0, max=1.0, orientation='vertical')

camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)

display(widgets.HBox([image, blocked_slider]))

HBox(children=(Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C…

In [9]:
from jetbot import Robot

robot = Robot()

In [7]:
import torch.nn.functional as F
import time

def update(change):
    global blocked_slider, robot
    x = change['new'] 
    x = preprocess(x)
    y = model(x)
    
    # we apply the `softmax` function to normalize the output vector so it sums to 1 (which makes it a probability distribution)
    y = F.softmax(y, dim=1)
    
    prob_blocked = float(y.flatten()[0])
    
    blocked_slider.value = prob_blocked
    
    
    if prob_blocked > 0.7 :
        robot.stop()
        Beep(0.3)
        robot.left(0.5)
        robot.forward(0.5)
        robot.right(0.5)
        return 0
    else :
        robot.stop()
        robot.forward(0.5)
        return 1
    
#     if prob_blocked < 0.7:
#         robot.forward(0.6)
#     else:
#         robot.left(0.6)
    #if prob_blocked < 0.78:
        #robot.stop()
        #robot.forward(0.7)
    #else:
        #robot.stop()
        #robot.left(0.7)
    
    #time.sleep(0.001)
        
update({'new': camera.value})  # we call the function once to intialize

In [None]:
while(1) :
    if update({'new': camera.value}) == 0:
    print('robot blocked.')
    else :
        print('automatic avoided.')
        break

In [None]:
camera.observe(update, names='value')  # this attaches the 'update' function to the 'value' traitlet of our camera

In [None]:
camera.unobserve(update, names='value')
time.sleep(1)
robot.stop()

In [None]:
camera_link.unlink()  # don't stream to browser (will still run camera)

In [None]:
camera_link.link()  # stream to browser (wont run camera)

# Testing

In [45]:
robot.stop()
Beep(0.3)

robot.left(2)
time.sleep(0.5)
robot.stop()
robot.forward(2)
time.sleep(0.5)
robot.stop()
robot.right(2)
time.sleep(0.5)
robot.stop()

In [16]:
robot.stop()

In [42]:
robot.forward(1)
time.sleep(0.4)
robot.stop()