In [None]:
import cv2
import numpy as np
import pyzed.sl as sl
import threading
from traitlets.config.configurable import SingletonConfigurable
import ipywidgets.widgets as widgets
from IPython.display import display
import time
import motors

In [None]:
r"""
  __     __  __       _         _____       _           _      _____            _             _ 
 /_ |   |  \/  |     (_)       |  __ \     | |         | |    / ____|          | |           | |
  | |   | \  / | __ _ _ _ __   | |__) |___ | |__   ___ | |_  | |     ___  _ __ | |_ _ __ ___ | |
  | |   | |\/| |/ _` | | '_ \  |  _  // _ \| '_ \ / _ \| __| | |    / _ \| '_ \| __| '__/ _ \| |
  | |_  | |  | | (_| | | | | | | | \ \ (_) | |_) | (_) | |_  | |___| (_) | | | | |_| | | (_) | |
  |_(_) |_|  |_|\__,_|_|_| |_| |_|  \_\___/|_.__/ \___/ \__|  \_____\___/|_| |_|\__|_|  \___/|_|
 """                                                                                               
                                                                                                
class LineFollowingRobot:
    """
    Initialises each class and have them reference eachother.
    Setting up a clear Chain of Command.
    and starts and stops the main threads declared below
    """
    def __init__(self):
        self.camera = CameraInput()
        self.detector = LineDetector(self.camera)
        self.controller = MotionController(self.detector)
        self.driver = MotorDriver(self.controller)                  

    def start(self):
        print("[Robot] Starting all systems.")
        self.camera.start()
        self.detector.start()
        self.controller.start()
        self.driver.start()

    def stop(self):
        print("[Robot] Stopping all systems.")
        self.driver.stop()       
        self.controller.stop()
        self.detector.stop()
        self.camera.stop()


In [None]:
r"""
  ___       _____                                 _____ _      _                
 |__ \     / ____|                               |  __ (_)    | |               
    ) |   | |     __ _ _ __ ___   ___ _ __ __ _  | |__) |  ___| | ___   _ _ __  
   / /    | |    / _` | '_ ` _ \ / _ \ '__/ _` | |  ___/ |/ __| |/ / | | | '_ \ 
  / /_ _  | |___| (_| | | | | | |  __/ | | (_| | | |   | | (__|   <| |_| | |_) |
 |____(_)  \_____\__,_|_| |_| |_|\___|_|  \__,_| |_|   |_|\___|_|\_\\__,_| .__/ 
                                                                         | |    
                                                                         |_|    

Starts up the camera and stops it safely, while running grabs frames from the camera 
and stores them in a shared variable.

"""


def bgr8_to_jpeg(value):
    return bytes(cv2.imencode('.jpg', value)[1])

class CameraInput:
    def __init__(self):
        """Initialises the Camera into VGA and sets up color_value to be a shared variable for LineDetector"""
        self.zed = sl.Camera()

        init_params = sl.InitParameters()
        init_params.camera_resolution = sl.RESOLUTION.VGA
        init_params.depth_mode = sl.DEPTH_MODE.NONE
        init_params.coordinate_units = sl.UNIT.MILLIMETER

        status = self.zed.open(init_params)
        if status != sl.ERROR_CODE.SUCCESS:
            print("Camera Open : " + repr(status))
            self.zed.close()
            exit(1)

        self.runtime = sl.RuntimeParameters()
        cam_info = self.zed.get_camera_information()
        self.width = cam_info.camera_configuration.resolution.width
        self.height = cam_info.camera_configuration.resolution.height
        self.image = sl.Mat(self.width,self.height,sl.MAT_TYPE.U8_C4, sl.MEM.CPU)

        self.color_value = None
        self.lock = threading.Lock()
        self.thread_running_flag = False

    def _capture_frames(self):
        """Opens the camera safely and captures a raw image"""
        while self.thread_running_flag:
            start_time = time.time()
            if self.zed.grab(self.runtime) == sl.ERROR_CODE.SUCCESS:
                self.zed.retrieve_image(self.image, sl.VIEW.LEFT)
                frame = self.image.get_data()
                frame_bgr = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)
                with self.lock:
                    self.color_value = frame_bgr
            elapsed = time.time() - start_time
            time.sleep(max(0.0, 1/30 - elapsed))

    def _update_display(self):
        """Shows the raw image most recently captured in a widget per x.xx seconds"""
        while self.thread_running_flag:
            with self.lock:
                frame = self.color_value
            if frame is not None:
                self.display_color.value = bgr8_to_jpeg(frame)
            time.sleep(0.05)

    def start(self):
        """Starts the Camera thread and widget thread for updated visual feedback."""
        if not self.thread_running_flag:
            self.thread_running_flag = True
            self.capture_thread = threading.Thread(target=self._capture_frames, daemon=True)
            self.capture_thread.start()

            self.display_color = widgets.Image(format='jpeg', width='50%')
            display(self.display_color)

            self.display_thread = threading.Thread(target=self._update_display, daemon=True)
            self.display_thread.start()

    def stop(self):
        """Stops threads safely and closes the camera"""
        if self.thread_running_flag:
            self.thread_running_flag = False
    
            if self.display_thread:
                self.display_thread.join(timeout=1)
    
            if self.capture_thread:
                self.capture_thread.join(timeout=1)
    
            if self.display_color:
                self.display_color.value = None
    
            self.zed.close()


In [None]:
r"""
  ____     _      _              _____       _            _             
 |___ \   | |    (_)            |  __ \     | |          | |            
   __) |  | |     _ _ __   ___  | |  | | ___| |_ ___  ___| |_ ___  _ __ 
  |__ <   | |    | | '_ \ / _ \ | |  | |/ _ \ __/ _ \/ __| __/ _ \| '__|
  ___) |  | |____| | | | |  __/ | |__| |  __/ ||  __/ (__| || (_) | |   
 |____(_) |______|_|_| |_|\___| |_____/ \___|\__\___|\___|\__\___/|_|   
                                                                        

Grabs the shared image from the Camera and processes it for the Motor Controller
It puts the processed image into a shared variable.

The image processed  is masking out a bell curve shape and keeping all pixels within the bell curve visible.
Then it does a yellow mask where all yellow pixels are turned into white and making black the rest of the image black.

That image is then stored as a shared variable for another class.
                                                                                                                                             

"""


class LineDetector:
    def __init__(self, camera: CameraInput):
        """"Initialises Line Detector with reference to Camera Pickup for getting raw images."""
        self.camera = camera
        self.processed_image = None
        self.running = False
        self.lock = threading.Lock()
        self.thread = None

    def _detection_loop(self):
        """ Uses a bell shaped mask -> Yellow Mask -> Saves output for Motor Controller"""
        target_fps = 30
        target_interval = 1 / target_fps
        
        while self.running:
            start_time = time.time()
            
            with self.camera.lock:
                val = self.camera.color_value
            frame = val.copy() if val is not None else None
            
            if frame is None:
                time.sleep(0.03)
                continue

            height, width, _ = frame.shape
            bell_mask = np.zeros((height, width), dtype=np.uint8)

            center_x = width // 2
            bottom_y = height - 1
            height_range = int(height * 0.5)
            start_y = bottom_y - height_range

            for y in range(start_y, bottom_y + 1):
                distance = (bottom_y - y) / height_range
                current_width = int(width * (1 - distance**2))
                if current_width > 0:
                    left = max(0, center_x - current_width // 2)
                    right = min(width, center_x + current_width // 2)
                    bell_mask[y, left:right] = 255

            bell_filtered = cv2.bitwise_and(frame, frame, mask=bell_mask)
            hsv = cv2.cvtColor(bell_filtered, cv2.COLOR_BGR2HSV)
            lower_yellow = np.array([20, 100, 100])
            upper_yellow = np.array([40, 255, 255])
            yellow_mask = cv2.inRange(hsv, lower_yellow, upper_yellow)

            with self.lock:
                self.processed_image = yellow_mask

            elapsed = time.time() - start_time
            time.sleep(max(0.0, target_interval - elapsed))

    def get_processed_image(self):
        """Allows for safe variable sharing for the Motor Controller class"""
        with self.lock:
            return self.processed_image.copy() if self.processed_image is not None else None

    def start(self):
        """Starts the single Image filtering thread"""
        self.running = True
        self.thread = threading.Thread(target=self._detection_loop, daemon=True)
        self.thread.start()

    def stop(self):
        """Stops the thread safely"""
        self.running = False
        if self.thread:
                self.thread.join(timeout=1)


In [None]:
r"""
  _  _     __  __       _                _____            _             _ _           
 | || |   |  \/  |     | |              / ____|          | |           | | |          
 | || |_  | \  / | ___ | |_ ___  _ __  | |     ___  _ __ | |_ _ __ ___ | | | ___ _ __ 
 |__   _| | |\/| |/ _ \| __/ _ \| '__| | |    / _ \| '_ \| __| '__/ _ \| | |/ _ \ '__|
    | |_  | |  | | (_) | || (_) | |    | |___| (_) | | | | |_| | | (_) | | |  __/ |   
    |_(_) |_|  |_|\___/ \__\___/|_|     \_____\___/|_| |_|\__|_|  \___/|_|_|\___|_|   
                                                                                      

Grabs the shared processed image from the shared variables and using centroid calculations and boundaries
it decides how to move the rover. The decision is saved as a motion state in a shared variable.
                                                                                      
"""



class MotionController:
    """
    Controls movement from the processed image, decides the motion in stages of 1-5
    for forward and turning, also passed the direction left or right.
    Finally visualises the Filtered image and annotates what is being calculated.
    """

    def __init__(self, detector):
        """
        Initializes the controller referencing the LineDetector.
        it sets up default motion state, history, locking, and widget
        """
        self.detector = detector
        self.running = False
        self.visualizer_running = False
        self.motion_history = []
        self.motion_state = None
        self.lock = threading.Lock()

        self.forward_stage = 3
        self.turn_stage = 0
        self.direction = "center"
        self.fallback_mode = False

        self.visual_centroids = []
        self.image_widget = widgets.Image(format='jpeg')
        display(self.image_widget)

        self.control_thread = None       
        self.visualizer_thread = None

    def _calculate_centroid(self, line):
        """
        Given a horizontal line (row of pixels) it returns the average x-position
        of all white pixels. None if it doesnt find any.
        """
        white_pixels = np.where(line > 0)[0]
        if len(white_pixels) == 0:
            return None
        x_centroid = np.mean(white_pixels)
        return (x_centroid, len(line) // 2)

    def _scan_centroids(self, image, start_percent, end_percent):
        """
        Scans multiple horizontal rows from a starting %
        to an ending % of image height. Bottom to top (1-100%)
        Returns a list of centroids found across those rows.
        """
        height, _ = image.shape
        centroids = []
        for i in range(start_percent, end_percent + 1):
            y = int(height * (i / 100))
            centroid = self._calculate_centroid(image[y, :])
            if centroid:
                centroids.append((centroid[0], y))
        return centroids

    def _control_loop(self):
        """
        Core control loop running in a separate thread.
        - Retrieves the processed image
        - Computes centroids
        - Chooses motion direction based on offset of all centroids
        - Handles fallback mode when main scan fails meaning it cant find any at 1-15%
        - Stores the decision in a motion_state dict
        - Repeats
        """
        while self.running:
            start_time = time.time()
            
            processed_image = self.detector.get_processed_image()
            if processed_image is None:
                time.sleep(0.05)
                continue

            height, width = processed_image.shape

            #Y goes downward
            main_centroids = self._scan_centroids(processed_image, 85, 100)

            # Fallback triggers if not enough main centroids
            self.fallback_mode = len(main_centroids) < 3

            if self.fallback_mode:
                all_centroids = self._scan_centroids(processed_image, 1, 100)
                fallback_centroids = [c for c in all_centroids if c[1] < int(height * 0.85)]

                with self.lock:
                    self.visual_centroids = fallback_centroids.copy()
                
                if len(fallback_centroids) > 5:
                    avg_x = np.mean([c[0] for c in fallback_centroids])
                    offset_ratio = (avg_x - width / 2) / width
                    if offset_ratio < -0.05:
                        self.direction = "left"
                    elif offset_ratio > 0.05:
                        self.direction = "right"
                    else:
                        self.direction = "center"
                    self.forward_stage = 2
                    self.turn_stage = 3
                else:
                    # Check history of the last x turns and commits to a memory risk turn.
                    left_count = self.motion_history.count("left")
                    right_count = self.motion_history.count("right")
                    
                    if left_count > right_count:
                        self.direction = "left"
                    elif right_count > left_count:
                        self.direction = "right"
                    else:
                        last_turn = self.motion_history[-1] if self.motion_history else "right"
                        self.direction = "left" if last_turn == "right" else "right"

                    self.forward_stage = 1
                    self.turn_stage = 4
            else:
                with self.lock:
                    self.visual_centroids = main_centroids.copy() 

                # Main Logic for adjusting turns or going forward.
                avg_x = np.mean([c[0] for c in main_centroids])
                center_offset = (avg_x - width / 2) / width * 100
                
                if abs(center_offset) > 8:
                    self.turn_stage = min(5, self.turn_stage + 1)
                    self.forward_stage = max(1, self.forward_stage - 1)
                    self.direction = "left" if center_offset < 0 else "right"
                elif abs(center_offset) < 2:
                    self.forward_stage = min(5, self.forward_stage + 1)
                    self.turn_stage = max(0, self.turn_stage - 1)
                    self.direction = "center"

            motion_state = {
                "forwardStage": self.forward_stage,
                "turnStage": self.turn_stage,
                "direction": self.direction
            }

            self.motion_history.append(self.direction)
            if len(self.motion_history) > 5:
                self.motion_history.pop(0)

            with self.lock:
                self.motion_state = motion_state

            elapsed = time.time() - start_time
            time.sleep(max(0.0, 1/20 - elapsed))

    def _visualizer_loop(self):
        """
        Threaded loop to draw the processed image and overlay:
        - Detected centroids
        - Center line
        - Offset %
        - Motion state (direction, forward/turn stage, fallback mode)
        Displays result in a live IPython widget.
        """
        while self.visualizer_running:
            start_time = time.time()
            
            img = self.detector.get_processed_image()
            with self.lock:
                state = self.motion_state.copy() if self.motion_state else None
                centroids = self.visual_centroids.copy()
            fallback = self.fallback_mode
    
            if img is None or state is None:
                time.sleep(0.05)
                continue
    
            height, width = img.shape
            center_x = width // 2
    
            with self.lock:
                centroids = self.visual_centroids.copy()

    
            if centroids:
                avg_x = np.mean([c[0] for c in centroids])
                avg_y = int(np.mean([c[1] for c in centroids]))
                center_offset = ((avg_x - center_x) / width) * 100
            else:
                center_offset = 0.0
    
            vis = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
    
            cv2.line(vis, (center_x, 0), (center_x, height), (255, 255, 255), 1)
            for (x, y) in centroids:
                cv2.circle(vis, (int(x), int(y)), 4, (0, 255, 0), -1)
    
            if centroids:
                cv2.circle(vis, (int(avg_x), avg_y), 8, (0, 0, 255), -1)
    
            x0, y0 = 10, 20
            lh = 18
            cv2.putText(vis, f"Direction: {state['direction']}", (x0, y0), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
            cv2.putText(vis, f"Forward: {state['forwardStage']}", (x0, y0 + lh), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
            cv2.putText(vis, f"Turn: {state['turnStage']}", (x0, y0 + lh * 2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
    
            fb_text = "Fallback: ON" if fallback else "Fallback: OFF"
            fb_color = (0, 0, 255) if fallback else (100, 255, 100)
            cv2.putText(vis, fb_text, (x0, y0 + lh * 3), cv2.FONT_HERSHEY_SIMPLEX, 0.5, fb_color, 1)
    
            offset_str = f"Offset: {center_offset:+.1f}%"
            cv2.putText(vis, offset_str, (x0, y0 + lh * 4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 1)
    
            _, jpeg = cv2.imencode('.jpg', vis)
            self.image_widget.value = jpeg.tobytes()

            elapsed = time.time() - start_time
            time.sleep(max(0.0, 1/20 - elapsed))

    def get_motion_state(self):
        """Returns a copy of the latest motion state for the motor driver class"""
        with self.lock:
            return self.motion_state.copy() if self.motion_state else None

    def start(self):
        """Starts the motion decision loop and the visualizer in background threads."""
        if not self.running:  
            self.running = True
            self.control_thread = threading.Thread(target=self._control_loop, daemon=True)
            self.control_thread.start()
        if not self.visualizer_running: 
            self.visualizer_running = True
            self.visualizer_thread = threading.Thread(target=self._visualizer_loop, daemon=True)
            self.visualizer_thread.start()

    def stop(self):
        """Stops both the control loop and visualizer"""
        self.visualizer_running = False
        self.running = False
    
        if self.visualizer_thread:
            self.visualizer_thread.join(timeout=1)  
            self.visualizer_thread = None
    
        if self.control_thread:
            self.control_thread.join(timeout=1)  
            self.control_thread = None
        



In [None]:
r"""
  _____    __  __       _               _____       _                
 | ____|  |  \/  |     | |             |  __ \     (_)               
 | |__    | \  / | ___ | |_ ___  _ __  | |  | |_ __ ___   _____ _ __ 
 |___ \   | |\/| |/ _ \| __/ _ \| '__| | |  | | '__| \ \ / / _ \ '__|
  ___) |  | |  | | (_) | || (_) | |    | |__| | |  | |\ V /  __/ |   
 |____(_) |_|  |_|\___/ \__\___/|_|    |_____/|_|  |_| \_/ \___|_|   
                                                                     

Grabs the Motor Controllers decision from a shared variable and mimics the 
decision into Rover movement.
 
 """      


class MotorDriver:
    def __init__(self, controller):
        """Initializes motor driver with hard coded speeds and a reference to the controller."""
        self.controller = controller
        self.motors = motors.MotorsYukon(mecanum=False)

        self.running = False
        self.thread = None

        self.forward_speeds = {i: i * 0.2 for i in range(6)}
        self.turn_speeds = {i: i * 0.1 for i in range(6)}
        self.speed_multiplier = 1.0

    def _control_loop(self):
        """Repeatedly updates motors based on motion states."""
        while self.running:
            start_time = time.time()
            
            state = self.controller.get_motion_state()
            if not state:
                self.stop_motors()
                time.sleep(0.05)
                continue

            f = self.forward_speeds[state["forwardStage"]]
            t = self.turn_speeds[state["turnStage"]]
            d = state["direction"]

            f *= self.speed_multiplier
            t *= self.speed_multiplier

            left = f
            right = f

            if d == "left":
                left -= t
                right += t
            elif d == "right":
                left += t
                right -= t

            left = max(0.0, min(1.0, left))
            right = max(0.0, min(1.0, right))

            self.motors.frontLeft(left)
            self.motors.backLeft(left)
            self.motors.frontRight(right)
            self.motors.backRight(right)

            elapsed = time.time() - start_time
            time.sleep(max(0.0, 1/20 - elapsed))

    def start(self):
        """Starts the motor control thread"""
        self.running = True
        self.thread = threading.Thread(target=self._control_loop, daemon=True)
        self.thread.start()

    def stop(self):
        """Stops the motor control loop safely and the Bot"""
        self.running = False
        if self.thread:
            self.thread.join(timeout=1)
        self.stop_motors()

    def stop_motors(self):
        """Stops all motors"""
        self.motors.stop()

    def set_speed_multiplier(self, value):
        """Optional to update speed variable (may not be used)"""
        self.speed_multiplier = max(0.0, min(2.0, value))




        

In [None]:
r"""
    __      _____ _             _     _    _                  _____ _    _ _____ 
   / /     / ____| |           | |   | |  | |          _     / ____| |  | |_   _|
  / /_    | (___ | |_ __ _ _ __| |_  | |  | |_ __    _| |_  | |  __| |  | | | |  
 | '_ \    \___ \| __/ _` | '__| __| | |  | | '_ \  |_   _| | | |_ | |  | | | |  
 | (_) |   ____) | || (_| | |  | |_  | |__| | |_) |   |_|   | |__| | |__| |_| |_ 
  \___(_) |_____/ \__\__,_|_|   \__|  \____/| .__/           \_____|\____/|_____|
                                            | |                                  
                                            |_|                                  
After initialising all classes this should be ran to start and stop the Robot.

"""

robot_instance = {"robot": None}

start_button = widgets.Button(description="GO", button_style='success')
stop_button = widgets.Button(description="STOP", button_style='danger')
status_label = widgets.Label(value="Status: Idle")
speed_slider = widgets.FloatSlider(
    value=1.0,
    min=0.1,
    max=2.0,
    step=0.1,
    description='Speed:',
    continuous_update=True,
    style={'description_width': 'initial'}
)


display(speed_slider)
display(widgets.HBox([start_button, stop_button]))
display(status_label)

def on_start_clicked(b):
    """Starts the Line Follower Robots threads:
    - self.camera.start()
    - self.detector.start()
    - self.controller.start()
    - self.driver.start()
    """
    if robot_instance["robot"] is None:
        robot = LineFollowingRobot()
        robot_instance["robot"] = robot
        robot.start()
        status_label.value = "Status: Running Now"
    else:
        status_label.value = "Status: Already running"


def on_stop_clicked(b):
    """Stops the Line Follower Robots threads:
    - self.camera.stop()
    - self.detector.stop()
    - self.controller.stop()
    - self.driver.stop()
    """    
    robot = robot_instance["robot"]
    if robot is not None:
        robot.stop()
        robot_instance["robot"] = None
        status_label.value = "Status: Robot stopped"
    else:
        status_label.value = "Status: Nothing to stop"




def on_slider_change(change):
    robot = robot_instance["robot"]
    if robot is not None:
        try:
            robot.driver.set_speed_multiplier(change["new"])
            status_label.value = f"Speed set to {change['new']:.1f}x"
        except Exception as e:
            status_label.value = f"Couldnt change speed: {e}"


speed_slider.observe(on_slider_change, names="value")
start_button.on_click(on_start_clicked)
stop_button.on_click(on_stop_clicked)
