# VLA Navigation - Data Collection

This notebook collects training data for fine-tuning OpenVLA on JetBot navigation tasks.

## Data Collection Process

1. Control the JetBot using a gamepad controller
2. Speak or type a natural language instruction (e.g., "go to the red ball")
3. The robot's current motor commands are recorded along with the camera image
4. Data is saved as image + JSON metadata pairs

## Data Format

For each sample, we save:
- `<uuid>.jpg` - Camera image (224x224)
- `<uuid>.json` - Metadata including instruction and motor commands

```json
{
    "instruction": "go to the red ball",
    "action": {
        "left_speed": 0.3,
        "right_speed": 0.5
    },
    "timestamp": 1234567890.123
}
```

## Tips for Good Data Collection

- Vary robot positions and orientations
- Use diverse instructions for the same behavior
- Include edge cases (turning, stopping, reversing)
- Aim for 500-1000 samples for initial fine-tuning

### Import Libraries

In [None]:
# IPython Libraries for display and widgets
import traitlets
import ipywidgets.widgets as widgets
from IPython.display import display

# Camera and Motor Interface for JetBot
from jetbot import Robot, Camera, bgr8_to_jpeg

# Basic Python packages
from uuid import uuid1
import os
import json
import glob
import datetime
import numpy as np
import cv2
import time

### Create Dataset Directory

In [None]:
DATASET_DIR = 'dataset_vla'

# Set to True to clear all existing samples and start fresh
CLEAR_DATASET = False

if CLEAR_DATASET:
    import shutil
    if os.path.exists(DATASET_DIR):
        shutil.rmtree(DATASET_DIR)
        print(f'Cleared existing dataset: {DATASET_DIR}')

try:
    os.makedirs(DATASET_DIR)
    print(f'Created directory: {DATASET_DIR}')
except FileExistsError:
    print(f'Directory already exists: {DATASET_DIR}')

# Count existing samples
existing_samples = len(glob.glob(os.path.join(DATASET_DIR, '*.json')))
print(f'Existing samples: {existing_samples}')

### Initialize Camera and Robot

In [None]:
# Initialize camera
camera = Camera.instance(width=224, height=224)
time.sleep(1)  # Wait for camera to initialize

# Initialize robot
robot = Robot()

print('Camera and robot initialized')

### Create Display Widgets

In [None]:
# Camera display
image_widget = widgets.Image(format='jpeg', width=224, height=224)
traitlets.dlink((camera, 'value'), (image_widget, 'value'), transform=bgr8_to_jpeg)

# Instruction input
instruction_widget = widgets.Text(
    value='navigate forward avoiding obstacles',
    placeholder='Enter instruction...',
    description='Instruction:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='400px')
)

# Quick instruction buttons
quick_instructions = [
    'navigate forward avoiding obstacles',
    'go to the left',
    'go to the right',
    'turn around',
    'stop',
    'go backwards'
]

def set_instruction(instruction):
    def handler(b):
        instruction_widget.value = instruction
    return handler

quick_buttons = []
for instr in quick_instructions:
    btn = widgets.Button(description=instr[:20] + '...' if len(instr) > 20 else instr)
    btn.on_click(set_instruction(instr))
    quick_buttons.append(btn)

# Motor speed displays
left_speed_widget = widgets.FloatSlider(
    min=-1.0, max=1.0, step=0.01,
    description='Left Motor:',
    readout=True,
    style={'description_width': 'initial'}
)

right_speed_widget = widgets.FloatSlider(
    min=-1.0, max=1.0, step=0.01,
    description='Right Motor:',
    readout=True,
    style={'description_width': 'initial'}
)

# Sample counter
count_widget = widgets.IntText(
    description='Samples:',
    value=existing_samples,
    disabled=True
)

# Status display
status_widget = widgets.Label(value='Ready to collect data')

### Create Gamepad Controller

Connect a gamepad and determine its index at http://html5gamepad.com

In [None]:
# Try different controller indices if index=0 doesn't work
# Change this to 1, 2, or 3 if your controller isn't detected
CONTROLLER_INDEX = 0

controller = widgets.Controller(index=CONTROLLER_INDEX)

# Debug: Show controller state
print(f"Controller index: {CONTROLLER_INDEX}")
print(f"Controller detected: {controller}")
print(f"Number of buttons: {len(controller.buttons)}")
print(f"Number of axes: {len(controller.axes)}")

# Live button state display
button_display = widgets.Label(value="Press any button on gamepad...")

def update_button_display(change):
    for i, btn in enumerate(controller.buttons):
        if btn.value:
            button_display.value = f"Button {i} is pressed!"
            return
    button_display.value = "No button pressed"

for btn in controller.buttons:
    btn.observe(update_button_display, names='value')

display(controller)
display(button_display)
print("\nIf no buttons are detected, try changing CONTROLLER_INDEX to 1, 2, or 3")

### Connect Gamepad to Robot Control

- Left stick Y-axis: Forward/backward speed
- Right stick X-axis: Turning
- Button 0 (A/X): Save sample
- Button 1 (B/Circle): Emergency stop

In [None]:
# Motor control state
current_left_speed = 0.0
current_right_speed = 0.0

# Motor calibration - right motor is weaker, needs 1.5x boost
LEFT_OFFSET = 1.0
RIGHT_OFFSET = 1.5

# Inversion settings
INVERT_FORWARD = True
INVERT_TURN = True

# Deadzone and sensitivity
DEADZONE = 0.15
TURN_SENSITIVITY = 0.5

def apply_deadzone(value):
    if abs(value) < DEADZONE:
        return 0.0
    return value

def update_motors(change=None):
    global current_left_speed, current_right_speed
    
    forward = apply_deadzone(controller.axes[1].value)
    turn = apply_deadzone(controller.axes[2].value)
    
    if INVERT_FORWARD:
        forward = -forward
    if INVERT_TURN:
        turn = -turn
    
    turn = turn * TURN_SENSITIVITY
    
    # Calculate base speeds
    left_base = forward + turn
    right_base = forward - turn
    
    # Apply calibration offsets
    current_left_speed = max(-1.0, min(1.0, left_base * LEFT_OFFSET))
    current_right_speed = max(-1.0, min(1.0, right_base * RIGHT_OFFSET))
    
    robot.set_motors(current_left_speed, current_right_speed)
    
    left_speed_widget.value = current_left_speed
    right_speed_widget.value = current_right_speed

controller.axes[1].observe(update_motors, names='value')
controller.axes[2].observe(update_motors, names='value')

print('Gamepad connected with motor calibration')
print(f'LEFT_OFFSET: {LEFT_OFFSET}, RIGHT_OFFSET: {RIGHT_OFFSET}')

### Data Collection Functions

In [None]:
def save_sample(change):
    """Save current camera frame with instruction and motor commands."""
    print(f"save_sample called with change: {change}")  # DEBUG
    if not change['new']:
        print("Ignoring button release")  # DEBUG
        return
    
    print("Saving sample...")  # DEBUG
    
    # Generate unique ID
    sample_id = str(uuid1())
    
    # Get current state
    instruction = instruction_widget.value
    left_speed = current_left_speed
    right_speed = current_right_speed
    image = camera.value
    
    # Save image
    image_path = os.path.join(DATASET_DIR, f'{sample_id}.jpg')
    cv2.imwrite(image_path, image)
    print(f"Saved image to: {image_path}")  # DEBUG
    
    # Save metadata
    meta = {
        'instruction': instruction,
        'action': {
            'left_speed': float(left_speed),
            'right_speed': float(right_speed)
        },
        'timestamp': time.time()
    }
    
    meta_path = os.path.join(DATASET_DIR, f'{sample_id}.json')
    with open(meta_path, 'w') as f:
        json.dump(meta, f, indent=2)
    print(f"Saved metadata to: {meta_path}")  # DEBUG
    
    # Update counter
    count_widget.value = len(glob.glob(os.path.join(DATASET_DIR, '*.json')))
    print(f"New count: {count_widget.value}")  # DEBUG
    
    # Update status
    status_widget.value = f'Saved: {instruction[:30]}... L:{left_speed:.2f} R:{right_speed:.2f}'

def emergency_stop(change):
    """Stop the robot immediately."""
    if change['new']:
        robot.stop()
        status_widget.value = 'EMERGENCY STOP'

# Debug: Print when any button is pressed
def debug_button(index):
    def handler(change):
        if change['new']:
            print(f"Button {index} pressed!")
    return handler

# Unobserve any existing handlers
for b in controller.buttons:
    b.unobserve_all()

# Add debug observers for all buttons
for i, b in enumerate(controller.buttons):
    b.observe(debug_button(i), names='value')

# Button 0 (A/X): Save sample
controller.buttons[0].observe(save_sample, names='value')

# Button 1 (B/Circle): Emergency stop
controller.buttons[1].observe(emergency_stop, names='value')

print('Data collection buttons configured:')
print('  - Button 0 (A/X): Save sample')
print('  - Button 1 (B/Circle): Emergency stop')
print('  - DEBUG: All button presses will be logged')

### Display Data Collection Interface

In [None]:
# Create manual save/stop buttons
save_button = widgets.Button(
    description='Save Sample',
    button_style='success',
    icon='save'
)

def manual_save(b):
    save_sample({'new': True})

save_button.on_click(manual_save)

stop_button = widgets.Button(
    description='STOP',
    button_style='danger',
    icon='stop'
)

def manual_stop(b):
    robot.stop()
    status_widget.value = 'STOPPED'

stop_button.on_click(manual_stop)

# Layout the interface with manual buttons included
display(widgets.VBox([
    widgets.HBox([image_widget]),
    instruction_widget,
    widgets.HBox(quick_buttons[:3]),
    widgets.HBox(quick_buttons[3:]),
    left_speed_widget,
    right_speed_widget,
    widgets.HBox([count_widget]),
    status_widget,
    widgets.HBox([save_button, stop_button])
]))

In [None]:
def view_samples(n=5):
    """Display the last n collected samples."""
    json_files = sorted(glob.glob(os.path.join(DATASET_DIR, '*.json')), 
                        key=os.path.getmtime, reverse=True)[:n]
    
    for json_path in json_files:
        with open(json_path, 'r') as f:
            meta = json.load(f)
        
        base_name = os.path.splitext(os.path.basename(json_path))[0]
        image_path = os.path.join(DATASET_DIR, f'{base_name}.jpg')
        
        if os.path.exists(image_path):
            img = cv2.imread(image_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            
            print(f"\nInstruction: {meta['instruction']}")
            print(f"Action: L={meta['action']['left_speed']:.2f}, R={meta['action']['right_speed']:.2f}")
            
            from matplotlib import pyplot as plt
            plt.figure(figsize=(3, 3))
            plt.imshow(img)
            plt.axis('off')
            plt.show()

# Uncomment to view samples
# view_samples(5)

### Dataset Statistics

In [None]:
def dataset_stats():
    """Print dataset statistics."""
    json_files = glob.glob(os.path.join(DATASET_DIR, '*.json'))
    
    if not json_files:
        print('No samples collected yet')
        return
    
    instructions = []
    left_speeds = []
    right_speeds = []
    
    for json_path in json_files:
        with open(json_path, 'r') as f:
            meta = json.load(f)
        instructions.append(meta['instruction'])
        left_speeds.append(meta['action']['left_speed'])
        right_speeds.append(meta['action']['right_speed'])
    
    print(f'Total samples: {len(json_files)}')
    print(f'Unique instructions: {len(set(instructions))}')
    print(f'\nInstruction examples:')
    for instr in list(set(instructions))[:5]:
        print(f'  - {instr}')
    print(f'\nAction statistics:')
    print(f'  Left speed:  mean={np.mean(left_speeds):.3f}, std={np.std(left_speeds):.3f}')
    print(f'  Right speed: mean={np.mean(right_speeds):.3f}, std={np.std(right_speeds):.3f}')

dataset_stats()

### Cleanup

In [None]:
# Stop robot and camera when done
robot.stop()
camera.stop()
print('Robot and camera stopped')

### Export Dataset

Run this cell to zip the dataset for transfer to GPU server for training.

In [None]:
def timestr():
    return str(datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))

zip_filename = f'vla_dataset_{timestr()}.zip'
!zip -r -q {zip_filename} {DATASET_DIR}
print(f'Dataset exported to: {zip_filename}')
print('Download this file and transfer to your GPU server for training.')