In [35]:
import gdown
import zipfile
import os

def download_from_gdrive(drive_url: str, output_path: str) -> str:
    try:
        print(f"📥 Downloading from Google Drive...\nURL: {drive_url}")
        gdown.download(drive_url, output_path, quiet=False)
    except Exception as e:
        raise RuntimeError(
            "❌ Google Drive blocked the download (quota exceeded).\n"
            "👉 Try downloading manually in your browser and place the file as 'okutama_action.zip'.\n"
            f"Error details: {e}"
        )
    return output_path


def extract_zip(zip_path: str, extract_to: str) -> str:
    """
    Extract a zip file to a target directory.
    Returns the path to the extracted folder.
    """
    if zipfile.is_zipfile(zip_path):
        print(f"📦 Extracting {zip_path} to {extract_to} ...")
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
        os.remove(zip_path)
        print(f"✅ Extraction complete! Files are in: {extract_to}")
        return extract_to
    else:
        raise ValueError("❌ The downloaded file is not a valid zip archive.")

def setup_dataset(drive_url: str, output_zip: str, extract_dir: str) -> str:
    """
    High-level function: Download + Extract dataset.
    Returns the dataset root directory.
    """
    zip_path = download_from_gdrive(drive_url, output_zip)
    dataset_dir = extract_zip(zip_path, extract_dir)
    return dataset_dir


In [36]:
import cv2
import os
import glob
import random
import zipfile

def get_random_color():
    """Generates a random BGR color tuple."""
    return (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))

def extract_if_zip(path: str) -> str:
    """
    If the given path is a .zip file, extract it and return the extracted folder path.
    If it's already a folder, just return it.
    """
    if os.path.isfile(path) and path.endswith(".zip"):
        extract_dir = path.replace(".zip", "")
        if not os.path.exists(extract_dir):
            print(f"📦 Extracting {path} -> {extract_dir}")
            with zipfile.ZipFile(path, 'r') as zip_ref:
                zip_ref.extractall(extract_dir)
        else:
            print(f"✅ Zip already extracted at {extract_dir}")
        return extract_dir
    return path

def adjust_bbox(xmin, ymin, xmax, ymax, shrink_factor=0.05):
    """Shrink bounding box slightly to fit person tighter."""
    w = xmax - xmin
    h = ymax - ymin
    dx = int(w * shrink_factor)
    dy = int(h * shrink_factor)
    return xmin + dx, ymin + dy, xmax - dx, ymax - dy

def annotate_videos(data_dir: str, source_dir: str, output_dir: str):
    """
    Process all .mov videos inside source_dir with corresponding annotation .txt files,
    draw bounding boxes frame-wise, and save annotated videos + preview frames + sample frames.
    """

    # Ensure output directories exist
    os.makedirs(output_dir, exist_ok=True)
    samples_dir = os.path.join(output_dir, "samples")
    os.makedirs(samples_dir, exist_ok=True)

    # Handle zip extraction if needed
    source_dir = extract_if_zip(source_dir)

    # Find all .mov video files in the source directory
    video_files = glob.glob(os.path.join(source_dir, "*.mov"))
    if not video_files:
        print(f"❌ No .mov files found in '{source_dir}'. Please check the path.")
        return
    else:
        print(f"✅ Found {len(video_files)} videos to process.")

    # -------------------- Main Processing Loop --------------------
    for video_path in video_files:
        base_name = os.path.basename(video_path)
        file_name_without_ext = os.path.splitext(base_name)[0]

        print(f"\n{'='*50}")
        print(f"🚀 Starting to process: {base_name}")
        print(f"{'='*50}")

        # Define corresponding annotation and output paths
        anno_path = os.path.join(source_dir, f"{file_name_without_ext}.txt")
        out_video_path = os.path.join(output_dir, f"{file_name_without_ext}_annotated.mp4")

        # Check if the annotation file exists
        if not os.path.exists(anno_path):
            print(f"⚠️ Warning: Annotation file not found for {base_name}. Skipping.")
            continue

        # -------------------- 1. Parse Annotation File --------------------
        annotations = {}
        label_to_color = {}
        with open(anno_path, "r") as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) < 10:
                    continue

                xmin, ymin, xmax, ymax = map(int, parts[1:5])
                frame_id = int(parts[5]) - 1  # 0-based frame index
                action_labels = [p.strip('"') for p in parts[9:]]
                label = " ".join(action_labels)

                if label not in label_to_color:
                    label_to_color[label] = get_random_color()
                label_color = label_to_color[label]

                # Apply shrinking
                xmin, ymin, xmax, ymax = adjust_bbox(xmin, ymin, xmax, ymax)

                annotations.setdefault(frame_id, []).append((xmin, ymin, xmax, ymax, label, label_color))

        print(f"✅ Parsed annotations for {len(annotations)} unique frames from {os.path.basename(anno_path)}")

        # -------------------- 2. Open Video --------------------
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print(f"❌ Could not open video: {video_path}. Check codec support or file integrity.")
            continue

        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        print(f"🎞 Video Info: FPS={fps:.2f}, Size=({frame_w}x{frame_h}), Frames={frame_count}")

        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        out = cv2.VideoWriter(out_video_path, fourcc, fps, (frame_w, frame_h))

        # Select random frames for saving samples
        sample_frames_to_save = set(random.sample(range(frame_count), min(5, frame_count)))

        # -------------------- 3. Annotate Frames --------------------
        frame_idx = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            if frame_idx in annotations:
                for (xmin, ymin, xmax, ymax, label, color) in annotations[frame_idx]:
                    cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), color, 2)
                    text = f"{label}"
                    (tw, th), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
                    cv2.rectangle(frame, (xmin, ymin - th - 10), (xmin + tw, ymin), color, -1)
                    cv2.putText(frame, text, (xmin, ymin - 5),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

            out.write(frame)

            # Save sample frames
            if frame_idx in sample_frames_to_save:
                sample_file = os.path.join(samples_dir, f"{file_name_without_ext}_frame{frame_idx}.jpg")
                cv2.imwrite(sample_file, frame)
                print(f"📸 Saved sample frame: {sample_file}")

            frame_idx += 1

            if frame_idx % 100 == 0:
                print(f"  ... Annotated {frame_idx}/{frame_count} frames ...")

        cap.release()
        out.release()

        print(f"✅ Annotated video saved at: {out_video_path}")

        # Save preview frame (first frame of annotated video)
        cap = cv2.VideoCapture(out_video_path)
        ret, frame = cap.read()
        if ret:
            frame_file = os.path.join(output_dir, f"{file_name_without_ext}_preview.jpg")
            cv2.imwrite(frame_file, frame)
            print(f"📸 Preview saved: {frame_file}")
        cap.release()

    print("\n🎉 All videos processed successfully! 🎉")


In [37]:
# DRIVE_URL = "https://drive.google.com/uc?id=1fJshjFiK0LNfrLHbGPmhYm9b8WzUKDtY"
# OUTPUT_ZIP = "okutama_action.zip"
# EXTRACT_DIR = "okutama_action"

# dataset_path = setup_dataset(DRIVE_URL, OUTPUT_ZIP, EXTRACT_DIR)
# print(f"📂 Dataset ready at: {dataset_path}")

In [38]:
# ----------------- Example Usage -----------------
data_dir = "okutama-action"
source_dir = os.path.join(data_dir, "Sample.zip")   # or "Sample" if already extracted
output_dir = os.path.join(data_dir, "outputs")

annotate_videos(data_dir, source_dir, output_dir)

✅ Zip already extracted at okutama-action\Sample
✅ Found 1 videos to process.

🚀 Starting to process: 1.1.1.mov
✅ Parsed annotations for 2272 unique frames from 1.1.1.txt
🎞 Video Info: FPS=29.97, Size=(3840x2160), Frames=2272
  ... Annotated 100/2272 frames ...
  ... Annotated 200/2272 frames ...
  ... Annotated 300/2272 frames ...
  ... Annotated 400/2272 frames ...
  ... Annotated 500/2272 frames ...
📸 Saved sample frame: okutama-action\outputs\samples\1.1.1_frame522.jpg
  ... Annotated 600/2272 frames ...
  ... Annotated 700/2272 frames ...
  ... Annotated 800/2272 frames ...
  ... Annotated 900/2272 frames ...
  ... Annotated 1000/2272 frames ...
  ... Annotated 1100/2272 frames ...
📸 Saved sample frame: okutama-action\outputs\samples\1.1.1_frame1181.jpg
📸 Saved sample frame: okutama-action\outputs\samples\1.1.1_frame1199.jpg
  ... Annotated 1200/2272 frames ...
  ... Annotated 1300/2272 frames ...
  ... Annotated 1400/2272 frames ...
  ... Annotated 1500/2272 frames ...
📸 Saved sa