<a href="https://colab.research.google.com/github/rianachatterjee04/GenAssist/blob/main/Centerpoint_testing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Installing Dependencies

In [None]:
!pip install ultralytics

!pip install yt-dlp

!pip install opencv-python



Proximity Detector

In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
import os

class ProximityDetector:
    def __init__(self):
        # Only need YOLO for object detection
        self.walkable_model = YOLO('walkable_model.pt')  # For walkable areas
        self.object_model = YOLO('yolov8n-seg.pt')
        # Define immediate proximity zone (area right in front)
        self.proximity_height = 0.4  # Bottom 40% of frame
        self.proximity_width = 0.4   # Center 40% of frame width

    def is_in_proximity(self, box, frame_shape):
        """Check if object is very close to robot's path"""
        height, width = frame_shape[:2]

        # Get box coordinates
        x1, y1, x2, y2 = box
        box_center_x = (x1 + x2) / 2
        box_bottom = y2

        # Define immediate front zone
        zone_left = width * (0.5 - self.proximity_width/2)
        zone_right = width * (0.5 + self.proximity_width/2)
        zone_top = height * (1 - self.proximity_height)

        # Check if object is in immediate front zone
        if (zone_left < box_center_x < zone_right and box_bottom > zone_top):
            relative_distance = (box_bottom - zone_top) / (height - zone_top)
            return True, relative_distance
        return False, 0

    def process_frame(self, frame):
        height, width = frame.shape[:2]
        processed = frame.copy()

        # Detect and track objects
        results = self.model.track(frame, persist=True)[0]

        if results.boxes is not None:
            boxes = results.boxes.xyxy.cpu().numpy()
            classes = results.boxes.cls.cpu().numpy()

            # Track closest obstacle
            closest_distance = 0
            has_close_obstacle = False

            for box, cls in zip(boxes, classes):
                class_name = self.model.names[int(cls)]

                # Only consider person, bicycle, car as obstacles
                if class_name in ['person', 'bicycle', 'car']:
                    in_proximity, distance = self.is_in_proximity(box, frame.shape)

                    if in_proximity and distance > 0.3:  # Object significantly in front
                        has_close_obstacle = True
                        if distance > closest_distance:
                            closest_distance = distance
                            # Draw red box around closest obstacle
                            cv2.rectangle(processed,
                                        (int(box[0]), int(box[1])),
                                        (int(box[2]), int(box[3])),
                                        (0, 0, 255), 2)

            # Add warning text if obstacle detected
            if has_close_obstacle:
                warning_text = "Obstacle ahead, please move"
                text_size = cv2.getTextSize(warning_text, cv2.FONT_HERSHEY_SIMPLEX, 1.2, 3)[0]
                text_x = (width - text_size[0]) // 2
                cv2.putText(processed, warning_text,
                          (text_x, height - 50),
                          cv2.FONT_HERSHEY_SIMPLEX, 1.2,
                          (0, 0, 255), 3)

        return processed

Test on Video

In [None]:
import cv2
import os

# Open your video
cap = cv2.VideoCapture("outside_1.MOV")  # Use the correct filename with .MOV
if not cap.isOpened():
    print("Could not open video")
    print("Current directory:", os.getcwd())
    print("Files in directory:", os.listdir())
    exit()

# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# Create video writer (use the same extension as input)
out = cv2.VideoWriter('output.MOV', cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

# Process each frame
frame_count = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Simply write the original frame
    out.write(frame)
    frame_count += 1

    # Optional: print progress
    if frame_count % 30 == 0:
        print(f"Processed {frame_count} frames")

# Release resources
cap.release()
out.release()

# Verify file creation
if os.path.exists('output.MOV'):
    print(f"Video saved successfully. File size: {os.path.getsize('output.MOV')} bytes")
else:
    print("Failed to create output video")

# Download the video
from google.colab import files
files.download('output.MOV')

Processed 30 frames
Processed 60 frames
Processed 90 frames
Processed 120 frames
Processed 150 frames
Processed 180 frames
Processed 210 frames
Processed 240 frames
Processed 270 frames
Processed 300 frames
Processed 330 frames
Processed 360 frames
Processed 390 frames
Processed 420 frames
Processed 450 frames
Processed 480 frames
Processed 510 frames
Processed 540 frames
Processed 570 frames
Processed 600 frames
Processed 630 frames
Processed 660 frames
Processed 690 frames
Processed 720 frames
Processed 750 frames
Processed 780 frames
Processed 810 frames
Processed 840 frames
Processed 870 frames
Processed 900 frames
Processed 930 frames
Processed 960 frames
Processed 990 frames
Processed 1020 frames
Processed 1050 frames
Processed 1080 frames
Processed 1110 frames
Processed 1140 frames
Processed 1170 frames
Processed 1200 frames
Processed 1230 frames
Processed 1260 frames
Processed 1290 frames
Processed 1320 frames
Processed 1350 frames
Processed 1380 frames
Processed 1410 frames
Pr

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Object detection/Image Segmentation

In [None]:
import os
import cv2
import numpy as np
from ultralytics import YOLO
import glob

def draw_mask_safely(frame, mask, color, alpha=0.5):
    """Safely draw a mask on a frame with validation"""
    try:
        # Validate mask
        if mask is None or len(mask) < 3:  # Need at least 3 points for a polygon
            return frame

        # Convert to correct format
        mask_points = np.array(mask, dtype=np.int32)
        if mask_points.shape[0] < 3:  # Double-check after conversion
            return frame

        # Create overlay
        overlay = frame.copy()
        cv2.fillPoly(overlay, [mask_points], color)
        return cv2.addWeighted(frame, 1-alpha, overlay, alpha, 0)

    except Exception as e:
        print(f"Warning: Could not draw mask: {e}")
        return frame

# Ensure videos directory exists
os.makedirs("videos", exist_ok=True)

# Load both models
walkable_model = YOLO('walkable_model.pt')
object_model = YOLO('yolov8n-seg.pt')

# Set up video capture
video_path = "outside_1.MOV"  # Direct path to the video
cap = cv2.VideoCapture(video_path)

# Check if video is opened successfully
if not cap.isOpened():
    print(f"Error: Could not open video file {video_path}")
    print("Current directory:", os.getcwd())
    print("Files in directory:", os.listdir())
    exit()

# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# Create output video writer
output_path = 'processed_video.mp4'
out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

# Process entire video
frame_count = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    processed_frame = frame.copy()

    # 1. Detect walkable zones with your fine-tuned model
    walkable_results = walkable_model.predict(frame, show=False)
    if walkable_results[0].masks is not None:
        for mask in walkable_results[0].masks.xy:
            # Safely draw walkable zones
            processed_frame = draw_mask_safely(
                processed_frame,
                mask,
                color=(0, 255, 0),  # Green for walkable zones
                alpha=0.3
            )
            # Add outline if mask is valid
            if len(mask) >= 3:
                cv2.polylines(processed_frame, [np.int32(mask)], True, (0, 255, 0), 2)

    # 2. Detect and track objects with original model
    object_results = object_model.track(frame, persist=True)[0]
    if object_results.boxes is not None and object_results.masks is not None:
        masks = object_results.masks.xy
        track_ids = object_results.boxes.id.cpu().numpy() if object_results.boxes.id is not None else None
        classes = object_results.boxes.cls.cpu().numpy()

        for i, (mask, cls) in enumerate(zip(masks, classes)):
            # Safely draw objects
            processed_frame = draw_mask_safely(
                processed_frame,
                mask,
                color=(255, 165, 0),  # Orange for objects
                alpha=0.5
            )

            # Add tracking ID and label
            if track_ids is not None and len(mask) > 0:
                track_id = int(track_ids[i])
                class_name = object_model.names[int(cls)]
                label = f"ID {track_id} - {class_name}"
                x_min, y_min = mask.min(axis=0)
                cv2.putText(processed_frame, label,
                          (int(x_min), int(y_min) - 10),
                          cv2.FONT_HERSHEY_SIMPLEX,
                          0.6,
                          (255, 255, 255),
                          2)

    out.write(processed_frame)
    frame_count += 1
    if frame_count % 30 == 0:
        print(f"Processed frame {frame_count}")

cap.release()
out.release()

# Download the processed video
from google.colab import files
files.download('processed_video.mp4')


0: 384x640 1 walkable-zone, 403.7ms
Speed: 23.7ms preprocess, 403.7ms inference, 34.6ms postprocess per image at shape (1, 3, 384, 640)
[31m[1mrequirements:[0m Ultralytics requirement ['lap>=0.5.12'] not found, attempting AutoUpdate...



ModuleNotFoundError: No module named 'lap'

Combines Both

In [None]:
import os
import cv2
import numpy as np
from ultralytics import YOLO

class ProximityDetector:
    def __init__(self):
        # Load models
        self.walkable_model = YOLO('walkable_model.pt')  # For walkable areas
        self.object_model = YOLO('yolov8n-seg.pt')

        # Define immediate proximity zone (area right in front)
        self.proximity_height = 0.4  # Bottom 40% of frame
        self.proximity_width = 0.4   # Center 40% of frame width

    def draw_mask_safely(self, frame, mask, color, alpha=0.5):
        """Safely draw a mask on a frame with validation"""
        try:
            # Validate mask
            if mask is None or len(mask) < 3:  # Need at least 3 points for a polygon
                return frame

            # Convert to correct format
            mask_points = np.array(mask, dtype=np.int32)
            if mask_points.shape[0] < 3:  # Double-check after conversion
                return frame

            # Create overlay
            overlay = frame.copy()
            cv2.fillPoly(overlay, [mask_points], color)
            return cv2.addWeighted(frame, 1-alpha, overlay, alpha, 0)

        except Exception as e:
            print(f"Warning: Could not draw mask: {e}")
            return frame

    def is_in_proximity(self, box, frame_shape):
        """Check if object is very close to robot's path"""
        height, width = frame_shape[:2]
        # Get box coordinates
        x1, y1, x2, y2 = box
        box_center_x = (x1 + x2) / 2
        box_bottom = y2

        # Define immediate front zone
        zone_left = width * (0.5 - self.proximity_width/2)
        zone_right = width * (0.5 + self.proximity_width/2)
        zone_top = height * (1 - self.proximity_height)

        # Check if object is in immediate front zone
        if (zone_left < box_center_x < zone_right and box_bottom > zone_top):
            relative_distance = (box_bottom - zone_top) / (height - zone_top)
            return True, relative_distance
        return False, 0

    def process_frame(self, frame):
        height, width = frame.shape[:2]
        processed = frame.copy()

        # 1. Detect walkable zones
        walkable_results = self.walkable_model.predict(frame, show=False)
        if walkable_results[0].masks is not None:
            for mask in walkable_results[0].masks.xy:
                # Safely draw walkable zones
                processed = self.draw_mask_safely(
                    processed,
                    mask,
                    color=(0, 255, 0),  # Green for walkable zones
                    alpha=0.3
                )
                # Add outline if mask is valid
                if len(mask) >= 3:
                    cv2.polylines(processed, [np.int32(mask)], True, (0, 255, 0), 2)

        # 2. Detect and track objects
        object_results = self.object_model.track(frame, persist=True)[0]

        # Track closest obstacle
        closest_distance = 0
        has_close_obstacle = False

        if object_results.boxes is not None and object_results.masks is not None:
            boxes = object_results.boxes.xyxy.cpu().numpy()
            classes = object_results.boxes.cls.cpu().numpy()
            masks = object_results.masks.xy
            track_ids = object_results.boxes.id.cpu().numpy() if object_results.boxes.id is not None else None

            for i, (box, cls, mask) in enumerate(zip(boxes, classes, masks)):
                # Safely draw objects
                processed = self.draw_mask_safely(
                    processed,
                    mask,
                    color=(255, 165, 0),  # Orange for objects
                    alpha=0.5
                )

                # Add tracking ID and label
                if track_ids is not None and len(mask) > 0:
                    track_id = int(track_ids[i])
                    class_name = self.object_model.names[int(cls)]
                    label = f"ID {track_id} - {class_name}"
                    x_min, y_min = mask.min(axis=0)
                    cv2.putText(processed, label,
                              (int(x_min), int(y_min) - 10),
                              cv2.FONT_HERSHEY_SIMPLEX,
                              0.6,
                              (255, 255, 255),
                              2)

                # Check proximity for specific obstacle classes
                class_name = self.object_model.names[int(cls)]
                if class_name in ['person', 'bicycle', 'car']:
                    in_proximity, distance = self.is_in_proximity(box, frame.shape)
                    if in_proximity and distance > 0.3:  # Object significantly in front
                        has_close_obstacle = True
                        if distance > closest_distance:
                            closest_distance = distance

                        # Draw red box around closest obstacle
                        cv2.rectangle(processed,
                            (int(box[0]), int(box[1])),
                            (int(box[2]), int(box[3])),
                            (0, 0, 255), 2)

            # Add warning text if obstacle detected
            if has_close_obstacle:
                warning_text = "Obstacle ahead, please move"
                text_size = cv2.getTextSize(warning_text, cv2.FONT_HERSHEY_SIMPLEX, 1.2, 3)[0]
                text_x = (width - text_size[0]) // 2
                cv2.putText(processed, warning_text,
                          (text_x, height - 50),
                          cv2.FONT_HERSHEY_SIMPLEX, 1.2,
                          (0, 0, 255), 3)

        return processed

Test on Video

In [None]:
def process_video(input_path, output_path):
    # Ensure videos directory exists
    os.makedirs("videos", exist_ok=True)

    # Create detector
    detector = ProximityDetector()

    # Set up video capture
    cap = cv2.VideoCapture(input_path)

    # Check if video is opened successfully
    if not cap.isOpened():
        print(f"Error: Could not open video file {input_path}")
        print("Current directory:", os.getcwd())
        print("Files in directory:", os.listdir())
        return

    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Create output video writer
    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

    # Process entire video
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Process frame
        processed_frame = detector.process_frame(frame)

        # Write processed frame
        out.write(processed_frame)
        frame_count += 1

        if frame_count % 30 == 0:
            print(f"Processed frame {frame_count}")

    # Release resources
    cap.release()
    out.release()

# Main execution
if __name__ == "__main__":
    input_video = "first_floor.MOV"
    output_video = 'processed_video.mp4'

    # Process the video
    process_video(input_video, output_video)

    # Download the processed video
    from google.colab import files
    files.download(output_video)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Speed: 4.0ms preprocess, 180.6ms inference, 3.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 1 potted plant, 200.1ms
Speed: 4.2ms preprocess, 200.1ms inference, 10.9ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 330

0: 384x640 1 walkable-zone, 185.7ms
Speed: 5.3ms preprocess, 185.7ms inference, 3.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 190.1ms
Speed: 4.8ms preprocess, 190.1ms inference, 7.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 walkable-zone, 188.8ms
Speed: 5.0ms preprocess, 188.8ms inference, 4.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 1 potted plant, 209.8ms
Speed: 4.3ms preprocess, 209.8ms inference, 11.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 walkable-zone, 182.7ms
Speed: 5.4ms preprocess, 182.7ms inference, 4.4ms postprocess per image at shape (1, 3, 384,

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>