<a href="https://colab.research.google.com/github/thegregbeyond/FreeFuse-AI-Calbright-Project/blob/main/Image_Extraction%2C_Object_Detection%2C_%26_Labeling_RoboFlow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 1) Extract Video Stills

In [None]:
# Extract Video Stills

# === Import Libraries ===

from pathlib import Path
import cv2
from google.colab import drive

# === Configuration ===
INPUT_DIR = Path('/content/drive/MyDrive/FreeFuse_Project/Source_Videos/Free-Use Videos from Pexels')
OUTPUT_DIR = Path('/content/drive/MyDrive/FreeFuse_Project/Extracted_Stills')
CAPTURE_INTERVAL_S = 3      # seconds between captures
START_TIME_S = 1            # skip first N seconds of each video
VIDEO_EXTS = {'.mp4', '.mov', '.avi'}

# === Functions ===
def mount_drive():
    """Mount Google Drive to /content/drive"""
    print("Mounting Google Drive...")
    drive.mount('/content/drive', force_remount=True)
    print("Drive mounted.")

def extract_stills(input_dir: Path, output_dir: Path, interval_s: float, start_s: float):
    """
    Extract still frames from all videos in input_dir at every interval_s seconds,
    starting after start_s seconds, saving to output_dir.
    """
    output_dir.mkdir(parents=True, exist_ok=True)
    video_files = [f for f in input_dir.iterdir() if f.suffix.lower() in VIDEO_EXTS]

    if not video_files:
        print(f"No videos found in {input_dir}")
        return

    print(f"Found {len(video_files)} videos in {input_dir}\n")
    for idx, video_file in enumerate(video_files, start=1):
        print(f"[{idx}/{len(video_files)}] Processing '{video_file.name}'")
        cap = cv2.VideoCapture(str(video_file))
        if not cap.isOpened():
            print(f"  ✗ Could not open {video_file.name}, skipping.")
            continue

        fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
        start_frame = int(start_s * fps)
        if start_frame > 0:
            cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
            print(f"  → Skipped first {start_s}s ({start_frame} frames)")

        next_capture = start_s
        saved_count = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            current_s = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000.0
            if current_s >= next_capture:
                mm = int(current_s // 60)
                ss = int(current_s % 60)
                timestamp = f"{mm:02d}{ss:02d}"
                out_name = f"{video_file.stem}_{timestamp}.jpg"
                out_path = output_dir / out_name
                cv2.imwrite(str(out_path), frame)
                print(f"  ✔ Saved frame '{out_name}'")
                saved_count += 1
                next_capture += interval_s

        cap.release()
        print(f"  Completed '{video_file.name}', saved {saved_count} frames.\n")

    print("All videos processed.")

# === Main Execution ===
mount_drive()
extract_stills(INPUT_DIR, OUTPUT_DIR, CAPTURE_INTERVAL_S, START_TIME_S)

## 2) Object Detection

In [None]:
# Get Object Annotations from RoboFlow:
'''
IMPORTANT:
This project assumes that the API Key is stored in
Colab Secrets under the name 'ROBOFLOW_API_KEY'. To
configure, click the key icon in Colab, and add the
name and value.
'''

# Install RoboFlow SDK
!pip install inference-sdk -q

# Imports & Configurable Parameters
import os
import pandas as pd
from pathlib import Path
from inference_sdk import InferenceHTTPClient
from google.colab import drive, userdata

# === Configurable Parameters ===
stills_folder_path   = '/content/drive/MyDrive/FreeFuse_Project/Extracted_Stills'
output_csv_path      = '/content/drive/MyDrive/FreeFuse_Project/Extracted_Stills/draft_annotations.csv'
max_objects_per_image = 10
confidence_threshold  = 0.4

# RoboFlow workflow settings
workflow_id          = 'freefusetestwf'       # <-- set your workflow ID here
workspace_name       = 'student-workspace'    # <-- set your workspace name here
# ===============================

# Mount Google Drive
print("Connecting to Google Drive...")
drive.mount('/content/drive', force_remount=True)
print("Google Drive connected.")

# Retrieve API key
try:
    ROBOFLOW_API_KEY = userdata.get('ROBOFLOW_API_KEY')
except userdata.SecretNotFoundError:
    print("ERROR: Secret 'ROBOFLOW_API_KEY' not found.")
    ROBOFLOW_API_KEY = None

# Initialize RoboFlow client
if not ROBOFLOW_API_KEY:
    raise ValueError("RoboFlow API key is missing. Aborting.")
client = InferenceHTTPClient(
    api_url="https://serverless.roboflow.com",
    api_key=ROBOFLOW_API_KEY
)
print("Roboflow client initialized.")

# Prepare to collect annotations
all_annotations = []
image_files = []
try:
    image_files = [
        f for f in os.listdir(stills_folder_path)
        if f.lower().endswith(('.jpg','.jpeg','.png'))
    ]
except FileNotFoundError:
    raise FileNotFoundError(f"Input folder not found: {stills_folder_path}")

if not image_files:
    print("No images found. Exiting.")
else:
    print(f"Processing {len(image_files)} images...")

    for idx, img_name in enumerate(image_files, start=1):
        print(f"[{idx}/{len(image_files)}] {img_name}")
        img_path = os.path.join(stills_folder_path, img_name)
        frame_id = os.path.splitext(img_name)[0]

        try:
            # Call the workflow
            result = client.run_workflow(
                workspace_name=workspace_name,
                workflow_id=workflow_id,
                images={"image": img_path},
                use_cache=True
            )
        except Exception as e:
            print(f"  Error invoking RoboFlow on {img_name}: {e}")
            continue

        # Parse response
        if not (isinstance(result, list) and result and 'output' in result[0]):
            print(f"  Unexpected response for {img_name}")
            continue
        preds = result[0]['output'].get('predictions', {}).get('predictions', [])

        # Sort and filter
        preds.sort(key=lambda p: p.get('confidence', 0), reverse=True)
        count = 0

        # Load dimensions once
        from PIL import Image
        im = Image.open(img_path)
        w, h = im.size

        for p in preds:
            if count >= max_objects_per_image:
                break
            conf = p.get('confidence', 0)
            if conf < confidence_threshold:
                continue

            # Convert center/wh to pixel bounds
            xc, yc, bw, bh = p.get('x',0), p.get('y',0), p.get('width',0), p.get('height',0)
            x_min = int(xc - bw/2)
            y_min = int(yc - bh/2)
            x_max = int(xc + bw/2)
            y_max = int(yc + bh/2)
            # clamp to image
            x_min, y_min = max(0,x_min), max(0,y_min)
            x_max, y_max = min(w, x_max), min(h, y_max)
            area = (x_max - x_min) * (y_max - y_min)

            # Build an object_id
            object_id = f"{frame_id}_obj{count+1}"

            all_annotations.append({
                'frame_id': frame_id,
                'image_file_name': img_name,
                'timestamp_sec': int(float(frame_id.split('_')[-1])),  # adjust if your naming differs
                'image_width_px': w,
                'image_height_px': h,
                'object_id': object_id,
                'object_name': p.get('class','unknown').lower().replace(' ','_'),
                'object_category': 'N/A',
                'x_min': x_min,
                'y_min': y_min,
                'x_max': x_max,
                'y_max': y_max,
                'bb_area_px': area,
                'confidence': round(conf,4),
                'review_status': 'pending',
                'reviewer_notes': ''
            })
            count += 1

# Export to CSV
if all_annotations:
    df = pd.DataFrame(all_annotations, columns=[
        'frame_id','image_file_name','timestamp_sec',
        'image_width_px','image_height_px','object_id',
        'object_name','object_category','x_min','y_min','x_max','y_max',
        'bb_area_px','confidence','review_status','reviewer_notes'
    ])
    df.to_csv(output_csv_path, index=False)
    print(f"Saved {len(df)} rows to {output_csv_path}")
else:
    print("No annotations collected.")

### 3) Label Stills

In [None]:
from pathlib import Path
import pandas as pd
import cv2
from google.colab import drive

# === Configuration ===
STILLS_DIR = Path('/content/drive/MyDrive/FreeFuse_Project/Extracted_Stills')
ANNOTATIONS_CSV = STILLS_DIR / 'draft_annotations.csv'
OUTPUT_DIR = Path('/content/drive/MyDrive/FreeFuse_Project/Labeled_Stills')

BOX_COLOR = (0, 255, 0)       # BGR
TEXT_COLOR = (255, 255, 255)  # BGR
BOX_THICKNESS = 2
FONT = cv2.FONT_HERSHEY_SIMPLEX
FONT_SCALE = 0.6
LINE_TYPE = cv2.LINE_AA

# === Helper Functions ===
def mount_drive():
    """Mount Google Drive."""
    print("Mounting Google Drive...")
    drive.mount('/content/drive', force_remount=True)
    print("Drive mounted.")

def load_annotations(csv_path: Path) -> pd.DataFrame:
    """Load and validate annotation CSV."""
    if not csv_path.exists():
        raise FileNotFoundError(f"Annotation file not found: {csv_path}")
    df = pd.read_csv(csv_path)
    required_cols = {
        'frame_id','image_file_name','timestamp_sec',
        'image_width_px','image_height_px','object_id',
        'object_name','object_category','x_min','y_min','x_max','y_max',
        'bb_area_px','confidence','review_status','reviewer_notes'
    }
    missing = required_cols - set(df.columns)
    if missing:
        raise ValueError(f"Missing columns in CSV: {missing}")
    return df

def annotate_and_save(df: pd.DataFrame, stills_dir: Path, output_dir: Path):
    """Draw bounding boxes on images and save annotated copies."""
    output_dir.mkdir(parents=True, exist_ok=True)
    grouped = df.groupby('image_file_name')
    print(f"Found annotations for {len(grouped)} images.")
    for idx, (img_name, group) in enumerate(grouped, start=1):
        print(f"[{idx}/{len(grouped)}] Annotating {img_name}")
        img_path = stills_dir / img_name
        if not img_path.exists():
            print(f"  Image not found: {img_path}")
            continue
        image = cv2.imread(str(img_path))
        if image is None:
            print(f"  Failed to load image: {img_path}")
            continue

        # Draw each annotation
        for _, row in group.iterrows():
            x_min = int(row['x_min'])
            y_min = int(row['y_min'])
            x_max = int(row['x_max'])
            y_max = int(row['y_max'])
            label = row['object_name']
            conf = row['confidence']
            # Draw bounding box
            cv2.rectangle(image, (x_min, y_min), (x_max, y_max), BOX_COLOR, BOX_THICKNESS)
            # Text background
            text = f"{label}: {conf:.2f}"
            (w, h), _ = cv2.getTextSize(text, FONT, FONT_SCALE, 1)
            cv2.rectangle(image, (x_min, y_min - h - 4), (x_min + w, y_min), BOX_COLOR, -1)
            # Text overlay
            cv2.putText(image, text, (x_min, y_min - 2), FONT, FONT_SCALE, TEXT_COLOR, 1, LINE_TYPE)

        # Save annotated image
        out_name = img_path.stem + '_annotated.jpg'
        out_path = output_dir / out_name
        cv2.imwrite(str(out_path), image)
    print("Annotation of images complete.")

# === Main Execution ===
mount_drive()
try:
    annotations_df = load_annotations(ANNOTATIONS_CSV)
    annotate_and_save(annotations_df, STILLS_DIR, OUTPUT_DIR)
except Exception as e:
    print(f"ERROR: {e}")
