In [None]:
# %pip install ultralytics opencv-python yt-dlp  # Install the required packages

In [None]:
import os
import cv2
import json
import numpy as np
from ultralytics import SAM

# --- Configuration ---
# Replace with the actual YouTube video URL (under 30 seconds recommended for testing)
video_url = "YOUTUBE_VIDEO_URL_HERE"

# Output directory structure
output_dir = "output"
img_dir = os.path.join(output_dir, "img")
data_dir = os.path.join(output_dir, "data")

# Model name (e.g., 'sam_l.pt', 'sam_b.pt')
# Ensure this model file is available or can be downloaded by ultralytics
model_name = 'sam_b.pt'


In [None]:
# --- Setup ---
# Create output directories
os.makedirs(img_dir, exist_ok=True)
os.makedirs(data_dir, exist_ok=True)

# Load the SAM-2 model
try:
    # Initialize the SAM model
    model = SAM(model_name)
    print(f"Successfully loaded model: {model_name}")
except Exception as e:
    print(f"Error loading model '{model_name}'. Ensure the file exists or is accessible.")
    print(f"Details: {e}")
    # Exit or handle the error appropriately if running as a script
    raise e

print("Setup completed successfully!")

In [None]:
# --- Step 1: Download Video and Extract Frames ---
import yt_dlp

print(f"Downloading video from: {video_url}")

# Download video to a temporary file
video_filename = "temp_video.mp4"
ydl_opts = {
    'format': 'best[height<=720]',  # Download reasonable quality to save time
    'outtmpl': video_filename,
}

try:
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([video_url])
    print(f"Video downloaded successfully as: {video_filename}")
except Exception as e:
    print(f"Error downloading video: {e}")
    raise e

# Extract frames from the downloaded video
cap = cv2.VideoCapture(video_filename)
frame_count = 0
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)

print(f"Video info: {total_frames} frames at {fps:.2f} FPS")
print("Extracting frames...")

# Extract every frame (you can modify this to extract every Nth frame if needed)
while True:
    ret, frame = cap.read()
    if not ret:
        break
    
    frame_count += 1
    frame_number_str = f"{frame_count:03d}"
    
    # Save frame as image
    img_path = os.path.join(img_dir, f"frame_{frame_number_str}.png")
    cv2.imwrite(img_path, frame)
    
    if frame_count % 10 == 0:  # Print progress every 10 frames
        print(f"Extracted frame {frame_count}/{total_frames}")

cap.release()

# Clean up the temporary video file
os.remove(video_filename)

print(f"\nFrame extraction completed!")
print(f"Total frames extracted: {frame_count}")
print(f"Images saved to: {img_dir}")

In [None]:
# # --- Step 2: Run SAM Predictions on Extracted Frames ---
# import glob

# print("Running SAM predictions on extracted frames...")

# # Get list of all extracted frame images
# frame_files = sorted(glob.glob(os.path.join(img_dir, "frame_*.png")))
# total_files = len(frame_files)

# print(f"Found {total_files} frames to process")

# for i, img_path in enumerate(frame_files):
#     # Extract frame number from filename
#     frame_filename = os.path.basename(img_path)
#     frame_number_str = frame_filename.replace("frame_", "").replace(".png", "")
    
#     print(f"Processing frame {i+1}/{total_files}: {frame_filename}")
    
#     try:
#         # Run SAM prediction on the image
#         results = model.predict(source=img_path, save=False, save_json=False)
        
#         # results is a list containing one Results object for the image
#         if not results:
#             print(f"  Warning: No results for {frame_filename}")
#             # Create empty JSON file
#             json_data = []
#         else:
#             frame_results = results[0]  # Get the Results object
#             json_data = []  # List to hold segmentation data for this frame
            
#             # Check if segmentation masks are present in the results
#             if hasattr(frame_results, 'masks') and frame_results.masks is not None:
#                 masks = frame_results.masks
                
#                 # Extract polygon segments
#                 if hasattr(masks, 'segments') and masks.segments is not None:
#                     segments_list_np = masks.segments
#                     num_segments = len(segments_list_np)
#                     print(f"  Found {num_segments} segments")
                    
#                     for j in range(num_segments):
#                         segment_np = segments_list_np[j]
#                         if segment_np is not None:
#                             # Convert the numpy polygon array to a list for JSON
#                             polygon_coords = segment_np.tolist()
                            
#                             segment_info = {
#                                 "polygon": polygon_coords
#                             }
#                             json_data.append(segment_info)
#                 else:
#                     print(f"  No segments found for {frame_filename}")
#             else:
#                 print(f"  No masks found for {frame_filename}")
        
#         # Save the JSON data for the frame
#         json_path = os.path.join(data_dir, f"frame_{frame_number_str}.json")
#         with open(json_path, 'w') as f:
#             json.dump(json_data, f, indent=4)
            
#         print(f"  Saved segmentation data to: frame_{frame_number_str}.json")
        
#     except Exception as e:
#         print(f"  Error processing {frame_filename}: {e}")
#         # Create empty JSON file for failed frames
#         json_data = []
#         json_path = os.path.join(data_dir, f"frame_{frame_number_str}.json")
#         with open(json_path, 'w') as f:
#             json.dump(json_data, f, indent=4)

# print(f"\nSAM prediction completed!")
# print(f"Segmentation data saved to: {data_dir}")
# print(f"You can now inspect the results in the {output_dir} directory")

In [None]:
# https://docs.ultralytics.com/models/sam/#sam-comparison-vs-yolo
# https://docs.ultralytics.com/datasets/segment/#ultralytics-yolo-format
from ultralytics.data.annotator import auto_annotate

auto_annotate(
    data=img_dir,
    det_model="yolo11x.pt",
    # sam_model="sam_b.pt", # sam base model; a bit slow on mac...
    sam_model="mobile_sam.pt",  # mobile sam model; faster on mac
    # max_det=10,
    output_dir=data_dir,
)