In [None]:
import traitlets
import cv2
from time import sleep
from jetbot import Robot, Camera
from threading import Thread
from os import makedirs
import xbox as xbox_controller
import ipywidgets.widgets as widgets

TICKS_PER_SECOND: int = 30
JOYSTICK_DZ_THRESH: float = 0.15
PRECISION: int = 3
IMAGE_WIDTH: int = 820
IMAGE_HEIGHT: int = 616
COLLECTION_TPS: int = 10

FRAMES_DIRECTORY = './test1'


class MotorValues(traitlets.HasTraits):
    motor_speed = traitlets.Float()
    
    
class Collector(traitlets.HasTraits):
    def __init__(self, frames_directory: str, frame_index: int):
        self.frames_directory = frames_directory
        self.frame_index = frame_index
        self.should_shutdown = False
        
        self.robot: Robot = Robot()
        print('')
        self.camera: Camera = Camera.instance()
        self.camera.width = IMAGE_WIDTH
        self.camera.height = IMAGE_HEIGHT
        self.image_latest = widgets.Image(format='png', width=IMAGE_WIDTH, height=IMAGE_HEIGHT)
        
        self.controls = {'left_motor': MotorValues(),
                         'right_motor': MotorValues()}
        self.controller = xbox_controller.Joystick()
        self.ls_x = 0
        self.ls_y = 0

        # Dynamically link motor controller values to class attribute
        self.left_link = traitlets.dlink((self.controls['left_motor'], 'motor_speed'), (self.robot.left_motor, 'value'))
        self.right_link = traitlets.dlink((self.controls['right_motor'], 'motor_speed'), (self.robot.right_motor, 'value'))
        
        # Dynamically link camera output to class attribute
        self.camera_link = traitlets.dlink((self.camera, 'value'), (self.image_latest, 'value'), transform=lambda value: bytes(cv2.imencode('.png', value)[1]))
        
        # Display latest frame from camera
        display(self.image_latest)
        
        self.threads = {'movement': Thread(name='thread_movement', target=self.monitor_status),
                       'camera': Thread(name='thread_camera', target=self.record_frames)}
        
        for thread in self.threads.values():
            thread.start()
        
    def shutdown(self):
        print('Shutdown procedure initiated...')
        self.controller.close()
        self.left_link.unlink()
        self.right_link.unlink()
        self.camera_link.unlink()
        self.robot.stop()
        print('Shutdown complete')
        exit(0)
        
    def save_frame(self, steering_vector: float = 0):
        """Saves a frame into a file in the frames_directory."""
        file_path = f'{self.frames_directory}/{self.frame_index}-{steering_vector}.png'
        with open(file_path, 'wb') as f:
            f.write(self.image_latest.value)
            
    def calculate_steering_vector(self):
        pass
        
    def record_frames(self):
        while not self.should_shutdown:
            self.save_frame()
            self.frame_index += 1
            
            # Throttle number of frames saved
            sleep(1. / COLLECTION_TPS)
            
    
    def monitor_status(self):
        while not self.should_shutdown:
            # Handle left joystick axes positions: right and up are positive (after inverting y axis)
            self.ls_x = round(self.controller.leftX(), PRECISION)
            self.ls_y = round(self.controller.leftY(), PRECISION)

            # Compensate for controller deadzone
            if abs(self.ls_x) < JOYSTICK_DZ_THRESH:
                self.ls_x = 0.0
            if abs(self.ls_y) < JOYSTICK_DZ_THRESH:
                self.ls_y = 0.0

            # Calculate intermediaries for tank control conversion
            v = (1 - abs(self.ls_x)) * (self.ls_y / 1) + self.ls_y
            w = (1 - abs(self.ls_y)) * (self.ls_x / 1) + self.ls_x

            # Handle right trigger position, normalize value
            right_trigger = round(self.controller.rightTrigger(), PRECISION)

            # Translate intermediaries and apply right trigger speed multiplier to compute tank control values
            self.controls['left_motor'].motor_speed = (v + w) / 2 * right_trigger
            self.controls['right_motor'].motor_speed = (v - w) / 2 * right_trigger

                
if __name__ == '__main__':
    makedirs(FRAMES_DIRECTORY, exist_ok=True)
    try:
        collector = Collector(FRAMES_DIRECTORY, 0)
    except Exception as e:
        collector.shutdown()
        print(e)
        
    try:
        for thread in collector.threads.values():
            thread.join()
    except KeyboardInterrupt:
        collector.should_shutdown = True
        for thread in collector.threads.values():
            thread.join()
            
        collector.shutdown()

Image(value=b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\xe0\x00\x00\x00\xe0\x08\x02\x00\x00\x00\x95O\xfd…

In [2]:
# Cell for manual robot shutdown
collector.left_link.unlink()
collector.right_link.unlink()
collector.camera_link.unlink()
collector.controller.close()
collector.robot.stop()

NameError: name 'collector' is not defined