# Generating the Dataset for Task 1

For task 1 (line following), the robot relies on a CNN that must be trained using a dataset of images containing the rope that it will be following. To achieve this, we capture images using the camera on the robot, crop them to only include the data we need (i.e. the rope), extract all yellow-coloured objects from the image (which we assume will only be the rope), and determine if the rope is in the centre of the image or not. We can then use this result to move the robot forward, left, or right, all whilst saving captured images, which are used to build up the dataset. In total, we have captured almost 3000 images to train our CNN on.

## Imports

In [1]:
import cv2
import numpy as np
import ipywidgets as widgets
from ipywidgets import interact
from IPython.display import display
from IPython.display import FileLink
import time
import motors
import pyzed.sl as sl
import math
import sys
import threading
import matplotlib.pyplot as plt
import os
import shutil

## Extracting and Centering the Rope

This function is used to extract any yellow objects from the image, and return a new image with only yellow objects visible, and all others colours set to black. This simplifies the dataset for the CNN to make a prediction on.

In [None]:
def extract_yellow(frame):
    # Convert image to HSV format
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    # Lower and upper bounds of the colour yellow in HSV format
    lower_bound = np.array([30,40,120])
    upper_bound = np.array([75, 255, 255])

    # Extract yellow colours
    mask = cv2.inRange(hsv, lower_bound, upper_bound)

    # Makes all other colours black (so only yellow is visible)
    return cv2.bitwise_and(frame, frame, mask=mask), mask

The following function takes the yellow-extracted image, draws a bounding box around the largest object (which we expect to be the rope), then calculates the centre of the bounding box in the horizontal axis. Using the result of this function, we can compare the rope's centre value to the robot's centre value (using the width of its captured images) to determine whether the robot is centred on the rope or needs to re-adjust.

In [None]:
def centre_rope(frame, mask):
    # Extract edges of object
    contours,_ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    if contours:
        largest_contour = max(contours, key=cv2.contourArea)

        # Draw bounding box around extracted object (rope)
        x,_,w,_ = cv2.boundingRect(largest_contour)
        
        # Calculate horizontal centre of bounding box
        rope_centre_x = x + w // 2

        return rope_centre_x, mask
    
    return None, mask

## Camera

The following code is mostly sourced from the Lab Tutorials (especially Tutorial 2 Part B). This code deals with initialising the camera and the captured footage in widgets for easy monitoring and debugging. The class is extended to allow getting the last taken image so it can be saved as a .PNG file

In [None]:
# Create widgets for the displaying colour and depth images
display_color = widgets.Image(format='jpeg', width='30%')
display_depth = widgets.Image(format='jpeg', width='30%')
layout=widgets.Layout(width='100%')
sidebyside = widgets.HBox([display_color, display_depth],layout=layout)
display(sidebyside)

class Camera():
    def __init__(self):
        super(Camera, self).__init__()

        # Initialise camera
        self.zed = sl.Camera()
        
        # Provide initial configuration
        init_params = sl.InitParameters()
        init_params.camera_resolution = sl.RESOLUTION.VGA
        init_params.depth_mode = sl.DEPTH_MODE.ULTRA
        init_params.coordinate_units = sl.UNIT.MILLIMETER

        # Open the camera
        status = self.zed.open(init_params)

        #Ensure the camera has opened succesfully
        if status != sl.ERROR_CODE.SUCCESS: 
            print("Camera Open : "+repr(status)+". Exit program.")
            self.zed.close()
            exit(1)

         # Create and set RuntimeParameters after opening the camera
        self.runtime = sl.RuntimeParameters()

        # Flag to control the thread
        self.thread_running_flag = False

        # Get the height and width of camera images
        camera_info = self.zed.get_camera_information()
        self.width = camera_info.camera_configuration.resolution.width
        self.height = camera_info.camera_configuration.resolution.height
        self.image = sl.Mat(self.width,self.height,sl.MAT_TYPE.U8_C4, sl.MEM.CPU)
        self.depth = sl.Mat(self.width,self.height,sl.MAT_TYPE.F32_C1, sl.MEM.CPU)

        self.output = None
        self.color_value = None

    def _capture_frames(self):
        while(self.thread_running_flag==True): 
           
           # Ensures camera is active and working
            if self.zed.grab(self.runtime) == sl.ERROR_CODE.SUCCESS:

                # Retrieve Left image
                self.zed.retrieve_image(self.image, sl.VIEW.LEFT)

                # Retrieve depth map. Depth is aligned on the left image
                self.zed.retrieve_measure(self.depth, sl.MEASURE.DEPTH)

                # Display colour image in widget
                self.color_value = self.image.get_data()
                cv2.putText(self.color_value, 'o', (self.width//2,self.height//2), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
                display_color.value = bgr8_to_jpeg(self.color_value)

                # Display depth image in widget
                self.depth_image = np.asanyarray(self.depth.get_data())
                depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(self.depth_image, alpha=0.03), cv2.COLORMAP_JET) 
                cv2.putText(depth_colormap, str(self.depth_image[self.height//2,self.width//2]), (self.width//2,self.height//2), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
                display_depth.value = bgr8_to_jpeg(depth_colormap)
    
    def start(self):
        # Starts capture_frames thread
        if self.thread_running_flag == False: 
            self.thread_running_flag=True 
            self.thread = threading.Thread(target=self._capture_frames)
            self.thread.start()      

    def stop(self): 
        # Ends all running threads
        if self.thread_running_flag == True:
            self.thread_running_flag = False
            self.thread.join()   
            self.output.release() 

    def get_image(self):
        # Gets the most recently captured image
        image = self.color_value
        return image

def bgr8_to_jpeg(value):
    # Convert numpy array to jpeg coded data for displaying in widget
    return bytes(cv2.imencode('.jpg',value)[1])

HBox(children=(Image(value=b'', format='jpeg', width='30%'), Image(value=b'', format='jpeg', width='30%')), la…

## Movement

This function determines whether the robot should move forward, left, or right, depending on whether the rope is centrally aligned with the robot (within a threshold). With each movement, an image is captured and stored in the root folder so it can be cumulated into a complete dataset for training the CNN. Each image is named with the direction the robot moved when it saw that image, which provides a very simple labelling method that hugely speeds up the creation of the dataset.

In [5]:
def move_robot(rope_centre_x, frame_width, image, count, threshold=50):
    # Calculate the centre of the robot's viewpoint
    screen_centre_x = frame_width // 2
    
    if rope_centre_x is None:
        # Stop if rope is lost
        robot.stop()
        return
        
    if abs(rope_centre_x - screen_centre_x) < threshold:
        # Continue forward if rope is approximately centred
        robot.forward(0.2)
        time.sleep(0.2)

        # Save image
        cv2.imwrite(f"forward_{str(count)}.png", image)
        return
        
    elif rope_centre_x < screen_centre_x:
        # Move left if rope is positioned to the left of the centre of the image
        robot.left(0.1)
        time.sleep(0.1)

        # Save image
        cv2.imwrite(f"left_{str(count)}.png", image)
        return
        
    else: 
        # Move right if rope is positioned to the right of the centre of the image
        robot.right(0.1)
        time.sleep(0.1)

        # Save image
        cv2.imwrite(f"right_{str(count)}.png", image)
        return

## Putting it all together

Finally, we combine all logic implemented so far to initialise the robot and camera, then allow it to move based on the position of the rope, capturing images as it goes.

In [7]:
robot = motors.MotorsYukon(mecanum=False)
camera = Camera()
camera.start()

[2025-03-26 09:55:38 UTC][ZED][INFO] Logging level INFO
[2025-03-26 09:55:38 UTC][ZED][INFO] Logging level INFO
[2025-03-26 09:55:38 UTC][ZED][INFO] Logging level INFO
[2025-03-26 09:55:39 UTC][ZED][INFO] [Init]  Depth mode: ULTRA
[2025-03-26 09:55:40 UTC][ZED][INFO] [Init]  Camera successfully opened.
[2025-03-26 09:55:40 UTC][ZED][INFO] [Init]  Camera FW version: 1523
[2025-03-26 09:55:40 UTC][ZED][INFO] [Init]  Video mode: VGA@100
[2025-03-26 09:55:40 UTC][ZED][INFO] [Init]  Serial Number: S/N 32675467


In [None]:
count = 0
while True:
    img = camera.get_image()
    
    if img is not None:
        
        # Crop the image 
        h, w = img.shape[:2]
        margin_width = 200
        margin_height = 220
        cropped_image = img[margin_height:,margin_width:w-margin_width]

        # Calculate the centre of the image
        h, w = cropped_image.shape[:2]
        centre_x = w // 2
        centre_y = h // 2

        # Extract yellow from cropped image
        frame_width = cropped_image.shape[1]
        frame, mask = extract_yellow(cropped_image)
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)        

        # Calculate centre of rope in image
        rope_centre_x, mask = centre_rope(frame, mask)
        
        # Display result using matplotlib
        plt.imshow(frame_rgb)
        plt.axis('off')
        plt.show()

        # Move the robot based on rope position
        move_robot(rope_centre_x, frame_width, frame_rgb, count)

        # Increment count to prevent images being overwritten
        count += 1

## Flip left and right images to programmatically expand dataset

In order to quickly aquire more left and right images in the dataset, we can augment the images already captured by flipping them. In this way, left images become right images, and right images become left images. This effectively doubles the amount of left and right images in our dataset, which helps to deal with data sparsity.

In [None]:
count = 0
for filename in os.listdir("Task 1 Images/left"):
    # Get the image
    left_filepath = os.path.join("Task 1 Images/left", filename)
    image = cv2.imread(left_filepath)
    image = cv2.flip(image, 1) 

    if image is None:
        continue

    # Change filepath to point to Right directory
    file_without_extension = filename[:-4]
    filepath = os.path.join("Task 1 Images/right", f"right_{count}_flipped.png")

    # Save image
    cv2.imwrite(filepath+'_flipped.png', image)

    # Increment count to prevent overwrites
    count += 1

In [None]:
count = 0
for filename in os.listdir("Task 1 Images/right"):
    # Get the image
    left_filepath = os.path.join("Task 1 Images/right", filename)
    image = cv2.imread(left_filepath)
    image = cv2.flip(image, 1) 

    if image is None:
        continue

    # Change filepath to point to Left directory
    file_without_extension = filename[:-4]
    filepath = os.path.join("Task 1 Images/left", f"left_{count}_flipped.png")

    # Save image
    cv2.imwrite(filepath+'_flipped.png', image)

    # Increment to prevent overwrites
    count += 1

## Saving Dataset Locally

To upload the captured dataset to GitHub, it first needed to be downloaded on the local machine. We did this so that all team members could access the dataset, and it would persist week-after-week.

The following code is used to append a '_2' to each image captured in the week. This was done to prevent newly captured images from overwriting images that were captured in previous Tutorials when uploading to GitHub.

In [8]:
for filename in os.listdir("Task 1 Images/left"):
    file_without_extension = filepath[:-4]
    filepath = os.path.join("Task 1 Images/left", filename)
    os.rename(filepath, file_without_extension+'_2.png')

In [None]:
for filename in os.listdir("Task 1 Images/right"):
    file_without_extension = filepath[:-4]
    filepath = os.path.join("Task 1 Images/right", filename)
    os.rename(filepath, file_without_extension+'_2.png')

In [None]:
for filename in os.listdir("Task 1 Images/forward"):
    file_without_extension = filepath[:-4]
    filepath = os.path.join("Task 1 Images/forward", filename)
    os.rename(filepath, file_without_extension+'_2.png')

This final piece of code compresses the dataset and presents it as a link that can be downloaded from within the notebook. This allows us to quickly download the dataset from the robots so it can be prepared for upload to GitHub. Shutil is used to compress the folder, whilst IPython FileLink is used to download it.

In [7]:
folder_path = 'Task 1 Images'
output_zip = 'images_cropped.zip'

# Compress dataset into Zip file
shutil.make_archive(output_zip.replace('.zip', ''), 'zip', folder_path)

# Create download link
FileLink(output_zip)