## Real-time Visual Navigation using Webcam in VS Code

This notebook uses your local machine's webcam to find a safe path in real-time.

**Instructions:**
1.  **Run this cell first** to install the necessary libraries for this project.
2.  Run the subsequent cells in order to load the models.
3.  Run the final "Main Real-time Processing Loop" cell to start your webcam.
4.  A new window titled "Real-time Navigation" will appear.
5.  **To stop the program, click on the video window and press the 'q' key.**

# Load

In [2]:
import cv2
import numpy as np
import torch
from PIL import Image
from ultralytics import YOLO
from transformers import OneFormerProcessor, OneFormerForUniversalSegmentation
import time

In [3]:
print("Loading models... This may take a moment.")
# Check for GPU (highly recommended for performance)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# --- YOLOv11 for Object Detection ---
yolo_model = YOLO("yolo11n.pt")
yolo_model.to(device)

# --- OneFormer for Semantic Segmentation ---
oneformer_processor = OneFormerProcessor.from_pretrained("shi-labs/oneformer_ade20k_swin_large")
oneformer_model = OneFormerForUniversalSegmentation.from_pretrained("shi-labs/oneformer_ade20k_swin_large")
oneformer_model.to(device)

# Identify all plausible walkable surfaces from the model's configuration
walkable_keywords = ["floor", "path", "road", "sidewalk", "pavement", "ground"]
walkable_ids = [
    k for k, v in oneformer_model.config.id2label.items()
    if any(keyword in v.lower() for keyword in walkable_keywords)
]
print(f"Found walkable class IDs: {walkable_ids}")

print("Models loaded successfully.")

Loading models... This may take a moment.
Using device: cuda


  return func(*args, **kwargs)


Found walkable class IDs: [3, 6, 11, 13, 52, 94]
Models loaded successfully.


# Helper function

In [4]:
def analyze_scene(frame, yolo_model, oneformer_processor, oneformer_model, walkable_ids, device):
    """Performs segmentation and object detection on a single frame."""
    # 1. Semantic Segmentation
    image_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    inputs = oneformer_processor(images=image_pil, task_inputs=["semantic"], return_tensors="pt").to(device)
    with torch.no_grad():
        outputs = oneformer_model(**inputs)
    predicted_map = oneformer_processor.post_process_semantic_segmentation(outputs, target_sizes=[image_pil.size[::-1]])[0]
    walkable_mask = np.isin(predicted_map.cpu().numpy(), walkable_ids).astype(np.uint8)

    # 2. Object Detection
    yolo_results = yolo_model(frame, verbose=False)
    human_boxes = []
    obstacle_boxes = []
    for result in yolo_results:
        for box in result.boxes:
            class_id = int(box.cls[0])
            class_name = yolo_model.names[class_id]
            if class_name == 'person':
                human_boxes.append(box.xyxy[0].cpu().numpy().astype(int))
            else:
                obstacle_boxes.append(box.xyxy[0].cpu().numpy().astype(int))
    
    # 3. Combine Masks for "Safe Path"
    obstacle_mask = np.zeros_like(walkable_mask)
    all_obstacles = human_boxes + obstacle_boxes
    for box in all_obstacles:
        x1, y1, x2, y2 = box
        cv2.rectangle(obstacle_mask, (x1, y1), (x2, y2), 1, -1)
    safe_path_mask = cv2.bitwise_and(walkable_mask, cv2.bitwise_not(obstacle_mask))
    
    return safe_path_mask, human_boxes, obstacle_boxes

In [5]:
def check_for_blockage(boxes, region_x_start, region_x_end, frame_height):
    """Checks if any box overlaps with a specified vertical region in the bottom half of the frame."""
    for x1, y1, x2, y2 in boxes:
        # Consider only boxes that are at least partially in the lower (navigation) half of the frame
        if y2 > frame_height / 2:
            # Check for horizontal overlap with the central region
            # AABB overlap condition: (box1.left < box2.right) and (box1.right > box2.left)
            if x1 < region_x_end and x2 > region_x_start:
                return True # Found a blocking object
    return False # No blocking objects in the region

In [6]:
def make_navigation_decision(safe_path_mask, human_boxes, obstacle_boxes, frame_shape):
    """Makes an intelligent navigation decision based on the safe path and detected objects."""
    H, W = frame_shape
    left_boundary = W // 3
    right_boundary = 2 * W // 3
    
    # Analyze safe path in the lower half of the frame (where the robot would move)
    navigation_area = safe_path_mask[H//2:, :]
    left_region = navigation_area[:, :left_boundary]
    center_region = navigation_area[:, left_boundary:right_boundary]
    right_region = navigation_area[:, right_boundary:]

    left_score = cv2.countNonZero(left_region)
    center_score = cv2.countNonZero(center_region)
    right_score = cv2.countNonZero(right_region)

    # Thresholds for making decisions
    center_threshold = center_region.size * 0.20  # 20% of center must be clear to move forward
    side_threshold = left_region.size * 0.15      # 15% of a side must be clear to consider turning

    # 1. Primary Check: Is the way forward clear based on the segmentation mask?
    if center_score > center_threshold:
        return "Move Forward"

    # 2. If blocked, determine the cause by checking for objects in the central path
    is_human_blocking = check_for_blockage(human_boxes, left_boundary, right_boundary, H)
    is_obstacle_blocking = check_for_blockage(obstacle_boxes, left_boundary, right_boundary, H)
    
    if is_human_blocking:
        return "Waiting for human..."
    
    if is_obstacle_blocking:
        # Path is blocked by a static object, suggest a more drastic action
        return "Obstacle Blockage. Scan 360"

    # 3. If blocked but not by a detected object (e.g., a wall), find an alternate path
    if left_score > right_score and left_score > side_threshold:
        return "Turn Left"
        
    if right_score > left_score and right_score > side_threshold:
        return "Turn Right"

    # 4. If no other option, declare a full stop
    return "STOP - FULLY BLOCKED"

In [7]:
def visualize_output(frame, safe_path_mask, human_boxes, obstacle_boxes, decision):
    """Draws all annotations and text onto the frame for display."""
    H, W, _ = frame.shape
    left_boundary = W // 3
    right_boundary = 2 * W // 3

    # Create a green overlay for the safe path
    safe_path_viz = np.zeros_like(frame)
    if safe_path_mask is not None:
        safe_path_viz[safe_path_mask == 1] = (0, 255, 0) # Green
    output_frame = cv2.addWeighted(frame, 0.7, safe_path_viz, 0.3, 0)

    # Draw boxes for humans (blue) and obstacles (red)
    for x1, y1, x2, y2 in human_boxes:
        cv2.rectangle(output_frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
        cv2.putText(output_frame, "Human", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)
    for x1, y1, x2, y2 in obstacle_boxes:
        cv2.rectangle(output_frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
        cv2.putText(output_frame, "Obstacle", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

    # Draw navigation guide lines
    cv2.line(output_frame, (left_boundary, H//2), (left_boundary, H), (255, 255, 0), 2)
    cv2.line(output_frame, (right_boundary, H//2), (right_boundary, H), (255, 255, 0), 2)
    
    # Display the final decision
    cv2.putText(output_frame, f"Decision: {decision}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2, cv2.LINE_AA)
    
    return output_frame

# Main Real-time Processing Loop

In [8]:
# --- Configuration ---
# Process every Nth frame to improve performance and prevent freezing.
# A lower number is more accurate but slower. A higher number is faster but less responsive.
FRAME_SKIP = 5 

# Use 0 for the default built-in webcam
cap = cv2.VideoCapture(0) 

if not cap.isOpened():
    print("Error: Could not open webcam.")
else:
    print("Webcam opened successfully. Press 'q' on the video window to quit.")
    frame_count = 0
    last_decision = "Initializing..."
    
    # Initialize variables before the loop to prevent reference-before-assignment on skipped frames
    ret, sample_frame = cap.read()
    if ret:
        H_init, W_init, _ = sample_frame.shape
        # Create empty placeholders for the analysis results
        safe_path_mask = np.zeros((H_init, W_init), dtype=np.uint8)
        human_boxes = []
        obstacle_boxes = []
    else:
        print("Error: Could not read the first frame from webcam.")
        cap.release() # Release the webcam if it can't be read

    try:
        # Set the capture back to the beginning if it was read from
        cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
        
        while ret: # Continue as long as we can read frames
            ret, frame = cap.read()
            if not ret:
                print("End of video stream or webcam error.")
                break
            
            frame_count += 1
            
            # --- Performance Boost: Only process every Nth frame ---
            if frame_count % FRAME_SKIP == 0:
                # 1. Analyze the scene to get fresh data
                safe_path_mask, human_boxes, obstacle_boxes = analyze_scene(
                    frame, yolo_model, oneformer_processor, oneformer_model, walkable_ids, device
                )
                
                # 2. Make a new navigation decision and cache it
                decision = make_navigation_decision(safe_path_mask, human_boxes, obstacle_boxes, frame.shape[:2])
                last_decision = decision 
            
            # 3. Create the visualization using the most recent data
            # For skipped frames, this uses the cached data from the last processed frame for a smooth display
            output_frame = visualize_output(
                frame, safe_path_mask, human_boxes, obstacle_boxes, last_decision
            )
            
            # 4. Display the frame in a new window
            cv2.imshow('Real-time Navigation', output_frame)

            # 5. Check for 'q' key to exit
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    except Exception as e:
        print(f"An error occurred during the loop: {e}")
    finally:
        # Release everything when done
        cap.release()
        cv2.destroyAllWindows()
        print("Webcam released and windows closed.")

Webcam opened successfully. Press 'q' on the video window to quit.


  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


End of video stream or webcam error.
Webcam released and windows closed.


In [9]:
!pip install -r requirements.txt

