In [None]:
# Fix MediaPipe Installation (optional on Jetson)
# You can skip this cell; the OpenCV color-tracking method below works without MediaPipe.

print("🔧 Attempting MediaPipe install via python3 -m pip (Jetson)")

# Use python3 -m pip so it installs to the Jupyter interpreter
!python3 -m pip uninstall -y mediapipe || true
!python3 -m pip install mediapipe==0.8.11 || true

# Test the installation
try:
    import mediapipe as mp
    print("✅ MediaPipe installed successfully!")
    print(f"MediaPipe version: {mp.__version__}")
except Exception as e:
    print(f"❌ MediaPipe not available: {e}")
    print("This is common on Jetson aarch64. It's safe to skip this cell.")
    print("Use the OpenCV color-tracking cells below instead.")

🔧 Fixing MediaPipe installation...
/bin/bash: pip: command not found
/bin/bash: pip: command not found
❌ MediaPipe installation failed: No module named 'mediapipe.python._framework_bindings'
We'll use an alternative gesture detection method.


In [3]:
import cv2
print(cv2.__version__)

4.1.1


In [None]:
# Clean folders
!rm -rf trt_pose torch2trt

# Clone repos
!git clone https://github.com/NVIDIA-AI-IOT/trt_pose.git
!git clone https://github.com/NVIDIA-AI-IOT/torch2trt.git

# Install required dependencies for trt_pose
!pip3 install tqdm cython pycocotools matplotlib

# Install torch2trt first (required for trt_pose)
!cd torch2trt && python3 setup.py install --plugins

# Install trt_pose
!cd trt_pose && python3 setup.py install

# Test installation
# Add this cell to see what's in trt_pose
import trt_pose
print("TRT Pose version:", trt_pose.__version__)
print("Available modules:", dir(trt_pose))

# Check if the function exists elsewhere
try:
    from trt_pose.models import get_pose_model
    print("✅ get_pose_model found!")
except ImportError as e:
    print("❌ get_pose_model not found:", e)
    
# Check what's in the models module
try:
    import trt_pose.models
    print("Models module contents:", dir(trt_pose.models))
except ImportError as e:
    print("Models module error:", e)

Cloning into 'trt_pose'...
remote: Enumerating objects: 1521, done.[K
remote: Total 1521 (delta 0), reused 0 (delta 0), pack-reused 1521 (from 1)[K
Receiving objects: 100% (1521/1521), 2.02 MiB | 2.78 MiB/s, done.
Resolving deltas: 100% (847/847), done.
Cloning into 'torch2trt'...
remote: Enumerating objects: 4452, done.[K
remote: Counting objects: 100% (766/766), done.[K
remote: Compressing objects: 100% (158/158), done.[K
remote: Total 4452 (delta 650), reused 625 (delta 608), pack-reused 3686 (from 1)[K
Receiving objects: 100% (4452/4452), 7.77 MiB | 7.57 MiB/s, done.
Resolving deltas: 100% (2584/2584), done.
running install
running bdist_egg
running egg_info
creating torch2trt.egg-info
writing torch2trt.egg-info/PKG-INFO
writing dependency_links to torch2trt.egg-info/dependency_links.txt
writing top-level names to torch2trt.egg-info/top_level.txt
writing manifest file 'torch2trt.egg-info/SOURCES.txt'
reading manifest file 'torch2trt.egg-info/SOURCES.txt'
writing manifest file

In [None]:
# Alternative: TRT Pose for Full Body Gesture Control
# This is an advanced option using NVIDIA's optimized pose estimation

import json
import torch
import torch2trt
from trt_pose.coco import coco_category_to_topology
from trt_pose.models import get_pose_model
import cv2
import numpy as np
from jetbot import Robot

class TRTPoseController:
    def __init__(self):
        self.robot = Robot()
        
        # Load pre-trained model (you'll need to download this)
        # This is a more advanced setup for full body pose estimation
        print("TRT Pose Controller initialized")
        print("Note: This requires downloading pre-trained models")
        print("For now, use the MediaPipe version above for immediate results")
    
    def load_model(self, model_path):
        """Load TRT optimized pose model"""
        # This would load a pre-trained TRT model
        # Implementation depends on specific model files
        pass
    
    def detect_pose_gestures(self, frame):
        """Detect gestures using full body pose estimation"""
        # This would use TRT Pose for more complex gesture recognition
        # including body posture, arm positions, etc.
        pass

# Uncomment to use TRT Pose (requires model files)
# trt_controller = TRTPoseController()


## Simple Color-Based Gesture Control (Jetson-friendly)

This approach uses plain OpenCV to track a colored object (e.g., a bright glove or colored card) and converts its screen position into JetBot movement commands:

- Track color in HSV space using a mask and contours
- Compute the largest contour's centroid
- Map centroid position to movement: left/right/forward/stop

Tips:
- Use a distinct color (e.g., neon green, bright blue). Avoid background colors.
- You can tune the HSV ranges live.
- Works reliably on aarch64 Jetson without extra ML dependencies.


In [None]:
# Simple HSV Color Tracker with JetBot mapping
import cv2
import numpy as np
from jetbot import Robot

# Configure HSV bounds (example: green). Adjust as needed.
LOWER_HSV = np.array([40, 70, 70])    # H in [0,179], S,V in [0,255]
UPPER_HSV = np.array([80, 255, 255])

# Movement parameters
FORWARD_SPEED = 0.25
TURN_SPEED = 0.22
STOP_AREA_THRESHOLD = 300  # minimum contour area to consider valid target
CENTER_TOLERANCE = 60      # pixels tolerance around center to go straight

robot = Robot()

# Utility to compute centroid safely
def compute_centroid(contour):
    m = cv2.moments(contour)
    if m["m00"] == 0:
        return None
    cx = int(m["m10"] / m["m00"]) 
    cy = int(m["m01"] / m["m00"]) 
    return (cx, cy)

# Decide movement based on centroid position relative to frame center
def decide_motion(frame_width, frame_height, centroid):
    if centroid is None:
        return "stop"
    cx, cy = centroid
    center_x = frame_width // 2

    if abs(cx - center_x) <= CENTER_TOLERANCE:
        return "forward"
    return "left" if cx < center_x else "right"

# Execute robot motion
last_action = None

def apply_motion(action):
    global last_action
    if action == last_action:
        return
    # Stop first to avoid combining motions
    robot.stop()
    if action == "forward":
        robot.set_motors(FORWARD_SPEED, FORWARD_SPEED)
    elif action == "left":
        robot.set_motors(-TURN_SPEED, TURN_SPEED)
    elif action == "right":
        robot.set_motors(TURN_SPEED, -TURN_SPEED)
    else:
        robot.stop()
    last_action = action

print("HSV tracker ready. Use the next cell to run the control loop.")


In [None]:
# Run control loop (press stop cell to end safely)
import time

# Camera index 0 is typical on Jetson; change if needed
CAM_INDEX = 0

cap = cv2.VideoCapture(CAM_INDEX)
if not cap.isOpened():
    raise RuntimeError("Could not open camera. Try CAM_INDEX=1 or check /dev/video*")

print("Starting control. Show the target color to the camera.")

try:
    while True:
        ok, frame = cap.read()
        if not ok:
            print("Frame grab failed; stopping.")
            break

        frame_height, frame_width = frame.shape[:2]
        # Convert to HSV and threshold
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
        mask = cv2.inRange(hsv, LOWER_HSV, UPPER_HSV)
        mask = cv2.erode(mask, None, iterations=2)
        mask = cv2.dilate(mask, None, iterations=2)

        # Find largest contour
        contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        target_contour = None
        max_area = STOP_AREA_THRESHOLD
        for c in contours:
            area = cv2.contourArea(c)
            if area > max_area:
                max_area = area
                target_contour = c

        centroid = compute_centroid(target_contour) if target_contour is not None else None
        action = decide_motion(frame_width, frame_height, centroid)
        apply_motion(action)

        # Visualize (optional; comment these two lines for headless)
        if target_contour is not None and centroid is not None:
            cv2.drawContours(frame, [target_contour], -1, (0,255,0), 2)
            cv2.circle(frame, centroid, 5, (0,0,255), -1)
        cv2.imshow('mask', mask)
        cv2.imshow('frame', frame)

        # Quit if q pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            print("Quit key pressed.")
            break

        # Keep loop responsive
        time.sleep(0.01)

finally:
    robot.stop()
    cap.release()
    cv2.destroyAllWindows()
    print("Stopped and cleaned up.")


In [None]:
# Safety stop (run this if anything goes wrong)
try:
    robot.stop()
    print("Robot stopped.")
except NameError:
    from jetbot import Robot
    Robot().stop()
    print("Robot stopped (new instance).")


In [None]:
# Inline camera preview (runs in this notebook)
from IPython.display import display, clear_output
from PIL import Image
import io

# Use same camera index as the control loop
CAM_INDEX = 0

cap = cv2.VideoCapture(CAM_INDEX)
if not cap.isOpened():
    raise RuntimeError("Could not open camera. Try CAM_INDEX=1 or check /dev/video*")

print("Showing live camera. Stop the cell (Kernel → Interrupt) to end.")

try:
    while True:
        ok, frame = cap.read()
        if not ok:
            print("Frame grab failed; stopping.")
            break

        # Convert BGR to RGB for display
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img = Image.fromarray(rgb)

        # Encode as JPEG for efficient inline display
        buf = io.BytesIO()
        img.save(buf, format='JPEG')
        buf.seek(0)

        clear_output(wait=True)
        display(img)

except KeyboardInterrupt:
    pass
finally:
    cap.release()
    print("Preview stopped.")


In [None]:
# Auto-detect and open Jetson camera (USB or CSI)
import cv2

def build_csi_gstreamer_pipeline(width=1280, height=720, framerate=30, flip_method=0):
    return (
        f"nvarguscamerasrc ! "
        f"video/x-raw(memory:NVMM), width={width}, height={height}, format=NV12, framerate={framerate}/1 ! "
        f"nvvidconv flip-method={flip_method} ! "
        f"video/x-raw, format=BGRx ! videoconvert ! "
        f"video/x-raw, format=BGR ! appsink"
    )

# Try V4L2 USB cameras first, then CSI via GStreamer

def open_any_camera(max_index=4):
    # Try USB cams at indices 0..max_index-1
    for idx in range(max_index):
        cap = cv2.VideoCapture(idx, cv2.CAP_V4L2)
        if cap.isOpened():
            ok, frame = cap.read()
            if ok:
                print(f"Opened USB camera at index {idx} (V4L2)")
                return cap, f"USB index {idx}"
        cap.release()
    # Try CSI camera via GStreamer
    gst = build_csi_gstreamer_pipeline()
    cap = cv2.VideoCapture(gst, cv2.CAP_GSTREAMER)
    if cap.isOpened():
        ok, frame = cap.read()
        if ok:
            print("Opened CSI camera via GStreamer pipeline")
            return cap, "CSI GStreamer"
    cap.release()
    return None, None

cap_auto, method = open_any_camera()
if cap_auto is None:
    raise RuntimeError("No camera opened. Close other camera-using cells/processes, then retry. If using CSI, ensure nvargus is running.")

print("Grabbing a test frame...")
ok, frame = cap_auto.read()
print("Frame OK:" if ok else "Frame failed:", frame.shape if ok else None)

# Show one frame inline (optional)
if ok:
    from IPython.display import display
    import PIL.Image as Image
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    display(Image.fromarray(rgb))

cap_auto.release()
print(f"Closed camera ({method}). You can now set CAM_INDEX appropriately or use the CSI pipeline.")
