In [1]:
import cv2
# from ultralytics import YOLO
import ipywidgets.widgets as widgets
from IPython.display import display

#create widgets for the displaying of the image
display_color = widgets.Image(format='jpeg', width='45%') 
display_depth = widgets.Image(format='jpeg', width='45%')  
layout=widgets.Layout(width='100%')



#Convert a NumPy array to JPEG-encoded data for display
def bgr8_to_jpeg(value):
    return bytes(cv2.imencode('.jpg',value)[1])


#Start the camera system
import traitlets
import cv2
import numpy as np
import pyzed.sl as sl
import math
import numpy as np
import sys
import math
import threading
from traitlets.config.configurable import SingletonConfigurable

# Define a Camera class that inherits from SingletonConfigurable
class Camera(SingletonConfigurable):
    color_value = traitlets.Any() # monitor the color_value variable
    def __init__(self):
        super(Camera, self).__init__()
        self.depth_image = np.ones((100,100))
        self.zed = sl.Camera()
        # Create a InitParameters object and set configuration parameters
        init_params = sl.InitParameters()
        init_params.camera_resolution = sl.RESOLUTION.VGA #VGA(672*376), HD720(1280*720), HD1080 (1920*1080) or ...
        init_params.depth_mode = sl.DEPTH_MODE.ULTRA  # Use ULTRA depth mode
        init_params.coordinate_units = sl.UNIT.MILLIMETER  # Use meter units (for depth measurements)
        init_params.camera_fps = 60

        # Open the camera
        status = self.zed.open(init_params)
        if status != sl.ERROR_CODE.SUCCESS: #Ensure the camera has opened succesfully
            print("Camera Open : "+repr(status)+". Exit program.")
            self.zed.close()
            exit(1)

         # Create and set RuntimeParameters after opening the camera
        self.runtime = sl.RuntimeParameters()

        #flag to control the thread
        self.thread_runnning_flag = False

        # Get the height and width
        camera_info = self.zed.get_camera_information()
        self.width = camera_info.camera_configuration.resolution.width
        self.height = camera_info.camera_configuration.resolution.height
        self.image = sl.Mat(self.width,self.height,sl.MAT_TYPE.U8_C4, sl.MEM.CPU)
        self.depth = sl.Mat(self.width,self.height,sl.MAT_TYPE.F32_C1, sl.MEM.CPU)
        self.point_cloud = sl.Mat(self.width,self.height,sl.MAT_TYPE.F32_C4, sl.MEM.CPU) 

    def _capture_frames(self): #For data capturing only

        while(self.thread_runnning_flag==True): #continue until the thread_runnning_flag is set to be False
            if self.zed.grab(self.runtime) == sl.ERROR_CODE.SUCCESS:
                
                # Retrieve Left image
                self.zed.retrieve_image(self.image, sl.VIEW.LEFT)
                # Retrieve depth map. Depth is aligned on the left image
                self.zed.retrieve_measure(self.depth, sl.MEASURE.DEPTH)
    
                self.color_value_BGRA = self.image.get_data()
                self.color_value= cv2.cvtColor(self.color_value_BGRA, cv2.COLOR_BGRA2BGR)
                self.depth_image = np.asanyarray(self.depth.get_data())  
                # print(self.depth_image)

                
    def start(self): #start the data capture thread
        if self.thread_runnning_flag == False: #only process if no thread is running yet
            self.thread_runnning_flag=True #flag to control the operation of the _capture_frames function
            self.thread = threading.Thread(target=self._capture_frames) #link thread with the function
            self.thread.start() #start the thread

    def stop(self): #stop the data capture thread
        if self.thread_runnning_flag == True:
            self.thread_runnning_flag = False #exit the while loop in the _capture_frames
            self.thread.join() #wait the exiting of the thread       

def bgr8_to_jpeg(value):#convert numpy array to jpeg coded data for displaying 
    return bytes(cv2.imencode('.jpg',value)[1])

#create a camera object


In [2]:
import cv2
import ipywidgets.widgets as widgets
from IPython.display import display
import time
import motors
import multiprocessing
import numpy as np

robot = motors.MotorsYukon(mecanum=False)

#create widgets for the displaying of the image
display_color = widgets.Image(format='jpeg', width='45%') 
display_depth = widgets.Image(format='jpeg', width='45%')  
layout=widgets.Layout(width='100%')

sidebyside = widgets.HBox([display_color, display_depth],layout=layout) #horizontal 
display(sidebyside) #display the widget
global trigger_movement
trigger_movement = 0

last_seen_direction = "straight"

previous_positions = []
previous_dir = ""

#callback function, invoked when traitlets detects the changing of the color image
# Tracking state (global)
def func(change):
    global camera, trigger_movement, last_seen_direction, previous_positions, previous_dir

    frame = change['new']
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    # Define range for yellow color in HSV
    lower_yellow = np.array([20, 100, 100])
    upper_yellow = np.array([40, 255, 255])

    # Threshold the HSV image to get only yellow colors
    mask = cv2.inRange(hsv, lower_yellow, upper_yellow)

    # Focus on bottom half of the frame (ROI)
    height, width = mask.shape
    mask[:height//2, :] = 0  # Ignore top half

    # Detect curve direction
    height, width = mask.shape
    scan_rows = [int(height*0.95), int(height*0.90)]
    
    x_positions = []
    # previous_dx = 0
    for y in scan_rows:
        row = mask[y, :]
        white_indices = np.where(row == 255)[0]
        if len(white_indices) > 0:
            mean_x = int(np.mean(white_indices))
            x_positions.append(mean_x)



    if len(x_positions) == len(scan_rows):
        dx = x_positions[-1] - x_positions[0]
        # print(dx)

        if len(previous_positions) >= 7:
            previous_positions.pop(0)

        if dx < -2:
            previous_positions.append(-1)
        elif dx > 2:
            previous_positions.append(1)
        else:
            previous_positions.append(0)

        sum_of_dxes = np.sum(previous_positions)
        # print(f"sum of dxes {sum_of_dxes}, previous_positions: {previous_positions}")

        if sum_of_dxes >= 1:
            last_seen_direction = "right"
        elif sum_of_dxes <= -1:
            last_seen_direction = "left"
        else:
            last_seen_direction = "straight"

# ----------------------------NEW------------------------------------


        
    # if len(x_positions) == len(scan_rows):
    #     dx = x_positions[-1] - x_positions[0]
    #     print(dx)
        
    #     if dx < -2:
    #         last_seen_direction = "left"
    #         previous_dir = "left"
    #     elif dx > 2:
    #         last_seen_direction = "right"
    #         previous_dir = "right"
    #     else:
    #         last_seen_direction = previous_dir
      
        
# ------------------------------------------------------------------
        
    
    # if len(x_positions) == len(scan_rows):
    #     dx = x_positions[-1] - x_positions[0]
    #     print(dx)

    #     if abs(previous_dx - dx and (previous_dx > 0 and dx < 0) or (previous_dx < 0 and dx > 0)) < 100:
        
    #         if dx < -10:
    #             last_seen_direction = "left"
    #         elif dx > 10:
    #             last_seen_direction = "right"
    #         else:
    #             last_seen_direction = "straight"
    #     else:

    #         if previous_dx < 0:
    #             last_seen_direction = "left"
    #         elif previous_dx > 0:
    #             last_seen_direction = "right"
    #         else:
    #             last_seen_direction = "straight"
            
    #     previous_dx = dx

    # if len(x_positions) == len(scan_rows):
    #     dx = x_positions[-1] - x_positions[0]
    #     print(dx)
    #     if len(previous_positions) < 3:
    #         previous_positions.append(dx)
    #     else:
    #         previous_positions.pop(0)
    #         previous_positions.append(dx)

    #     new_dx = 0
    #     neg_count = 0
    #     if len(previous_positions) == 3:
    #         for i in range(0, len(previous_positions)):
    #             if previous_positions[i] < 0:
    #                 neg_count += 1
    #         if neg_count >= 2:
    #             last_seen_direction = "left"
    #         elif neg_count < 2:
    #             last_seen_direction = "right"
    #         else:
    #             last_seen_direction = "straight"
    

            
    # Draw yellow circles to visualize scan points
    if len(x_positions) == len(scan_rows):
        for i in range(len(scan_rows)):
            frame = cv2.circle(frame, (x_positions[i], scan_rows[i]), 5, (255, 255, 0), -1)


    

    # Calculate centroid of the yellow region
    moments = cv2.moments(mask)
    if moments["m00"] != 0:
        cx = int(moments["m10"] / moments["m00"])
        cy = int(moments["m01"] / moments["m00"])
        # print(f"Centroid: ({cx}, {cy})")

        # Draw red dot on centroid
        frame = cv2.circle(frame, (cx, cy), 10, (0, 0, 255), -1)
    
        # Draw center line and safe zone band
        center_x = width // 2 + 50
        safe_band = int((width * 0.4) / 2)
        frame = cv2.line(frame, (center_x, 0), (center_x, height), (255, 0, 0), 2)  # Blue center line
        frame = cv2.line(frame, (center_x - safe_band, 0), (center_x - safe_band, height), (0, 255, 0), 2)  # Left green line
        frame = cv2.line(frame, (center_x + safe_band, 0), (center_x + safe_band, height), (0, 255, 0), 2)  # Right green line


        # Visual feedback
        frame = cv2.circle(frame, (cx, cy), 10, (0, 0, 255), -1)

        # Movement logic
        # Define the "safe zone" width (center 40% of the screen)
        safe_zone_percentage = 0.4
        safe_band = int((width * safe_zone_percentage) / 2)
        
        # Calculate error
        # center_x = width // 2
        error = cx - center_x
        
        # print(f"Centroid: {cx}, Error: {error}")
        
        # Movement logic
        base_speed = 1500
        turn_speed = 0.7
        
        if abs(error) <= safe_band:
            # Line is within the central 40% go forward
            robot.forward(base_speed)
        elif error > 0:
            # Line is to the right → turn right
            robot.spinRight(turn_speed)
        elif error < 0:
            # Line is to the left → turn left
            robot.spinLeft(turn_speed)
    else:
        print(f"Line lost, turning in last seen direction: {last_seen_direction}")
        if last_seen_direction == "left":
            robot.spinLeft(0.3)
        elif last_seen_direction == "right":
            robot.spinRight(0.3)
        else:
            robot.stop()


    # Display the frame and mask
    scale = 0.4
    resized_frame = cv2.resize(frame, None, fx=scale, fy=scale)
    resized_mask = cv2.resize(cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR), None, fx=scale, fy=scale)

    display_color.value = bgr8_to_jpeg(resized_frame)
    display_depth.value = bgr8_to_jpeg(resized_mask)
          


camera = Camera()
camera.start() # start capturing the data
camera.observe(func, names=['color_value'])
# camera.stop()

HBox(children=(Image(value=b'', format='jpeg', width='45%'), Image(value=b'', format='jpeg', width='45%')), la…

[2025-05-07 09:40:21 UTC][ZED][INFO] Logging level INFO
[2025-05-07 09:40:21 UTC][ZED][INFO] Logging level INFO
[2025-05-07 09:40:21 UTC][ZED][INFO] Logging level INFO
[2025-05-07 09:40:22 UTC][ZED][INFO] [Init]  Depth mode: ULTRA
[2025-05-07 09:40:23 UTC][ZED][INFO] [Init]  Camera successfully opened.
[2025-05-07 09:40:23 UTC][ZED][INFO] [Init]  Camera FW version: 1523
[2025-05-07 09:40:23 UTC][ZED][INFO] [Init]  Video mode: VGA@60
[2025-05-07 09:40:23 UTC][ZED][INFO] [Init]  Serial Number: S/N 37413003


Exception in thread Thread-5 (_capture_frames):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.10/dist-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_12879/373053748.py", line 76, in _capture_frames
  File "/usr/local/lib/python3.10/dist-packages/traitlets/traitlets.py", line 716, in __set__
    self.set(obj, value)
  File "/usr/local/lib/python3.10/dist-packages/traitlets/traitlets.py", line 706, in set
    obj._notify_trait(self.name, old_value, new_value)
  File "/usr/local/lib/python3.10/dist-packages/traitlets/traitlets.py", line 1513, in _notify_trait
    self.notify_change(
  File "/usr/local/lib/python3.10/dist-packages/traitlets/traitlets.py", line 1525, in notify_change
    return self._notify_observe

In [3]:
robot.stop()
camera.stop()

In [4]:
robot.stop()