# owl

In [None]:
import cv2
import os
import numpy as np
import torch
from transformers import OwlViTProcessor, OwlViTForObjectDetection
from PIL import Image
from tqdm import tqdm

def label_video(
    video_path,
    output_path,
    text_prompt,
    conf_threshold=0.1
):
    """
    Label video with OWL-ViT zero-shot object detection.
    
    Args:
        video_path: Path to input video
        output_path: Path to save labeled video
        text_prompt: Comma-separated list of objects to detect (e.g., "bowl, cube")
        conf_threshold: Confidence threshold for detections
    """
    # Load OWL-ViT model
    print("Loading OWL-ViT model...")
    processor = OwlViTProcessor.from_pretrained("google/owlvit-base-patch32")
    model = OwlViTForObjectDetection.from_pretrained("google/owlvit-base-patch32")
    model.eval()
    
    # Move to GPU if available
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)
    print(f"Using device: {device}")
    
    # Parse class names
    class_names = [obj.strip() for obj in text_prompt.split(',')]
    
    # Open video
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return
    
    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Create video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    print(f"Processing {total_frames} frames from {video_path}...")
    print(f"Looking for: {text_prompt}")
    
    # Colors for different classes (BGR format)
    colors = [
        (0, 255, 0),    # Green for first class
        (255, 0, 0),    # Blue for second class
        (0, 0, 255),    # Red for third class
        (255, 255, 0),  # Cyan for fourth class
        (255, 0, 255),  # Magenta for fifth class
    ]
    
    # Statistics
    detection_stats = {name: 0 for name in class_names}
    
    frame_count = 0
    for _ in tqdm(range(total_frames)):
        ret, frame = cap.read()
        if not ret:
            break
        
        # Convert BGR to RGB for PIL
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        pil_image = Image.fromarray(frame_rgb)
        
        # Process inputs
        inputs = processor(text=class_names, images=pil_image, return_tensors="pt")
        inputs = {k: v.to(device) for k, v in inputs.items()}
        
        # Run inference
        with torch.no_grad():
            outputs = model(**inputs)
        
        # Get predictions
        target_sizes = torch.Tensor([pil_image.size[::-1]]).to(device)
        results = processor.post_process_object_detection(
            outputs=outputs, 
            target_sizes=target_sizes, 
            threshold=conf_threshold
        )[0]
        
        # Draw boxes
        annotated_frame = frame.copy()
        
        for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
            x1, y1, x2, y2 = box.cpu().numpy().astype(int)
            cls_id = label.item()
            class_name = class_names[cls_id]
            color = colors[cls_id % len(colors)]
            
            # Update statistics
            detection_stats[class_name] += 1
            
            # Draw bounding box
            cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
            
            # Draw label background
            label_text = class_name
            (text_width, text_height), baseline = cv2.getTextSize(
                label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2
            )
            cv2.rectangle(
                annotated_frame,
                (x1, y1 - text_height - baseline - 5),
                (x1 + text_width, y1),
                color,
                -1
            )
            
            # Draw label text
            cv2.putText(
                annotated_frame,
                label_text,
                (x1, y1 - 5),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.6,
                (255, 255, 255),
                2
            )
        
        # Write frame
        out.write(annotated_frame)
        frame_count += 1
    
    cap.release()
    out.release()
    print(f"\nDone! Processed {frame_count} frames.")
    print(f"\nDetection statistics:")
    for class_name, count in detection_stats.items():
        print(f"  {class_name}: {count} detections")
    print(f"\nSaved labeled video to: {output_path}")

if __name__ == "__main__":
    # Configuration
    VIDEO_INPUT = "pi0.5_scene1_ep0.mp4"
    VIDEO_OUTPUT = "pi0.5_scene1_ep0_labeled.mp4"
    
    # Objects to detect (comma-separated)
    PROMPT = "bowl, cube"
    
    # Confidence threshold (0.0 to 1.0)
    CONF_THRESHOLD = 0.1
    
    # Get absolute paths
    current_dir = os.path.dirname(os.path.abspath(__file__))
    video_full_path = os.path.join(current_dir, VIDEO_INPUT)
    output_full_path = os.path.join(current_dir, VIDEO_OUTPUT)
    
    # Check if video exists
    if not os.path.exists(video_full_path):
        print(f"Error: Video not found at {video_full_path}")
        exit(1)
    
    label_video(
        video_path=video_full_path,
        output_path=output_full_path,
        text_prompt=PROMPT,
        conf_threshold=CONF_THRESHOLD
    )

# yolo

In [None]:
import cv2
import os
import numpy as np
from ultralytics import YOLO
from tqdm import tqdm

def label_video(
    video_path,
    output_path,
    text_prompt,
    conf_threshold=0.25
):
    """
    Label video with YOLO-World zero-shot object detection.
    
    Args:
        video_path: Path to input video
        output_path: Path to save labeled video
        text_prompt: Comma-separated list of objects to detect (e.g., "bowl, cube")
        conf_threshold: Confidence threshold for detections
    """
    # Load YOLO-World model (zero-shot)
    print("Loading YOLO-World model...")
    model = YOLO('yolov8m-world.pt')  # or 'yolov8m-world.pt', 'yolov8l-world.pt' for better accuracy
    
    # Set custom vocabulary (the objects you want to detect)
    class_names = [obj.strip() for obj in text_prompt.split(',')]
    model.set_classes(class_names)
    
    # Open video
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return
    
    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Create video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    print(f"Processing {total_frames} frames from {video_path}...")
    print(f"Looking for: {text_prompt}")
    
    # Colors for different classes (BGR format)
    colors = [
        (0, 255, 0),    # Green for first class
        (255, 0, 0),    # Blue for second class
        (0, 0, 255),    # Red for third class
        (255, 255, 0),  # Cyan for fourth class
        (255, 0, 255),  # Magenta for fifth class
    ]
    
    frame_count = 0
    for _ in tqdm(range(total_frames)):
        ret, frame = cap.read()
        if not ret:
            break
        
        # Run inference
        results = model(frame, conf=conf_threshold, verbose=False)
        
        # Draw boxes manually without confidence scores
        annotated_frame = frame.copy()
        result = results[0]
        
        if result.boxes is not None:
            boxes = result.boxes.xyxy.cpu().numpy()  # Get bounding boxes
            classes = result.boxes.cls.cpu().numpy().astype(int)  # Get class IDs
            
            for i, (box, cls_id) in enumerate(zip(boxes, classes)):
                x1, y1, x2, y2 = map(int, box)
                class_name = class_names[cls_id]
                color = colors[cls_id % len(colors)]
                
                # Draw bounding box
                cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
                
                # Draw label background
                label = class_name
                (text_width, text_height), baseline = cv2.getTextSize(
                    label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2
                )
                cv2.rectangle(
                    annotated_frame,
                    (x1, y1 - text_height - baseline - 5),
                    (x1 + text_width, y1),
                    color,
                    -1
                )
                
                # Draw label text
                cv2.putText(
                    annotated_frame,
                    label,
                    (x1, y1 - 5),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.6,
                    (255, 255, 255),
                    2
                )
        
        # Write frame
        out.write(annotated_frame)
        frame_count += 1
    
    cap.release()
    out.release()
    print(f"\nDone! Processed {frame_count} frames.")
    print(f"Saved labeled video to: {output_path}")

if __name__ == "__main__":
    # Configuration
    VIDEO_INPUT = "pi0.5_scene1_ep0.mp4"
    VIDEO_OUTPUT = "pi0.5_scene1_ep0_labeled.mp4"
    
    # Objects to detect (comma-separated)
    PROMPT = "rubiks cube"
    
    # Confidence threshold (0.0 to 1.0)
    CONF_THRESHOLD = 0.1
    
    # Get absolute paths
    current_dir = os.path.dirname(os.path.abspath(__file__))
    video_full_path = os.path.join(current_dir, VIDEO_INPUT)
    output_full_path = os.path.join(current_dir, VIDEO_OUTPUT)
    
    # Check if video exists
    if not os.path.exists(video_full_path):
        print(f"Error: Video not found at {video_full_path}")
        exit(1)
    
    label_video(
        video_path=video_full_path,
        output_path=output_full_path,
        text_prompt=PROMPT,
        conf_threshold=CONF_THRESHOLD
    )


# gemini

In [10]:
from google import genai
from google.genai import types
from dotenv import load_dotenv
import os
import cv2
import json
from pathlib import Path

load_dotenv()

client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
MODEL_ID = "gemini-robotics-er-1.5-preview"

def convert_np_to_bytes(image):
    # convert numpy array to cv2 image
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    _, image_bytes = cv2.imencode('.png', image)
    # convert back to normal coloring
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image_bytes.tobytes()

def parse_json(json_output):
  # Parsing out the markdown fencing
  lines = json_output.splitlines()
  for i, line in enumerate(lines):
    if line == "```json":
      # Remove everything before "```json"
      json_output = "\n".join(lines[i + 1 :])
      # Remove everything after the closing "```"
      json_output = json_output.split("```")[0]
      break  # Exit the loop once "```json" is found
  return json_output

def query_gemini(image_bytes):
    # PROMPT = """
    #         Return bounding boxes as a JSON array with labels. Never return masks orcode fencing.
    #         Find all the objects on the table.
    #         The label returned should be an identifying name for the object detected.
    #         If an object is present multiple times, name each according to their UNIQUE CHARACTERISTIC
    #         (colors, size, position, etc.)
    #         The format should be as follows:
    #         [{"box_2d": [ymin, xmin, ymax, xmax], "label": <label for the object>}]
    #         normalized to 0-1000. The values in box_2d must only be integers.
    #         """

    PROMPT = """
            Point to all the objects on the table. The label returned
            should be an identifying name for the object detected. Don't label the color of the object.
            The answer should follow the json format: [{"point": <point>,
            "label": <label1>}, ...]. The points are in [y, x] format
            normalized to 0-1000.
    """

    image_response = client.models.generate_content(
        model=MODEL_ID,
        contents=[
            types.Part.from_bytes(
                data=image_bytes,
                mime_type='image/png',
            ),
            PROMPT
        ],
        config = types.GenerateContentConfig(
            temperature=0.5,
            thinking_config=types.ThinkingConfig(thinking_budget=0)
        )
    )

    return json.loads(parse_json(image_response.text))

def scale_bounding_boxes(json_output, image):
    y_scale = image.shape[0] / 1000
    x_scale = image.shape[1] / 1000
    scaled_json_output = []
    for item in json_output:
        scaled_item = {
            'box_2d': [int(item['box_2d'][0] * y_scale), int(item['box_2d'][1] * x_scale), int(item['box_2d'][2] * y_scale), int(item['box_2d'][3] * x_scale)],
            'label': item['label']
        }
        scaled_json_output.append(scaled_item)
    return scaled_json_output

def scale_points(json_output, image):
    y_scale = image.shape[0] / 1000
    x_scale = image.shape[1] / 1000
    scaled_json_output = []
    for item in json_output:
        scaled_item = {
            'point': [int(item['point'][0] * y_scale), int(item['point'][1] * x_scale)],
            'label': item['label'].lower()  # Force lowercase
        }
        scaled_json_output.append(scaled_item)
    return scaled_json_output

def plot_points(image, json_output):
    annotated_image = image.copy()
    for item in json_output:
        y, x = item['point']
        label = item['label']
        cv2.circle(annotated_image, (x, y), 5, (0, 255, 255), -1)
        text_width, text_height = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)[0]
        cv2.rectangle(annotated_image, (x + 8, y - text_height), (x + text_width, y), (0, 0, 0), -1)
        cv2.putText(annotated_image, label, (x + 8, y ), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
    return annotated_image

def plot_bounding_boxes(image, json_output):
    annotated_image = image.copy()

    colors = [
        (0, 0, 255),      # red
        (0, 255, 0),      # green
        (255, 0, 0),      # blue
        (0, 255, 255),    # yellow
        (0, 165, 255),    # orange
        (255, 192, 203),  # pink
        (128, 0, 128),    # purple
        (42, 42, 165),    # brown
        (128, 128, 128),  # gray
        (255, 255, 0),    # cyan
    ]

    for i, item in enumerate(json_output):
        color = colors[i % len(colors)]
        y1, x1, y2, x2 = item['box_2d']
        label = item['label']

        cv2.rectangle(annotated_image, (x1, y1), (x2, y2), color, 2) # draw rectangle
        # draw label with background
        text_width, text_height = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.75, 2)[0]
        cv2.rectangle(annotated_image, (x1, y2 - text_height - 10), (x1 + text_width, y2), color, -1)
        cv2.putText(annotated_image, label, (x1, y2 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 0), 2)

    return annotated_image

In [6]:
def label_video_gemini(
    video_path,
    output_path,
    sample_every_n_frames=1  # Process every Nth frame (1 = all frames)
):
    """
    Label video using Gemini object detection.
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return
    
    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Create video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    print(f"Processing {total_frames} frames...")
    
    frame_idx = 0
    last_detections = None  # Cache detections between sampled frames
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # Only query Gemini every N frames (API rate limits + cost)
        if frame_idx % sample_every_n_frames == 0:
            try:
                image_bytes = convert_np_to_bytes(frame)
                json_output = query_gemini(image_bytes)
                last_detections = scale_points(json_output, frame)
                print(f"Frame {frame_idx}: {[d['label'] for d in last_detections]}")
            except Exception as e:
                print(f"Frame {frame_idx}: Error - {e}")
        
        # Draw detections on frame
        if last_detections:
            annotated_frame = plot_points(frame, last_detections)
        else:
            annotated_frame = frame
        
        out.write(annotated_frame)
        frame_idx += 1
        
        if frame_idx % 30 == 0:
            print(f"Processed {frame_idx}/{total_frames} frames")
    
    cap.release()
    out.release()
    print(f"Done! Saved to {output_path}")

# Run it
label_video_gemini(
    video_path="pi0.5_scene1_ep0.mp4",
    output_path="pi0.5_scene1_ep0_gemini_labeled.mp4",
    sample_every_n_frames=10  # Query Gemini every 10 frames to save API calls
)

Processing 151 frames...
Frame 0: ['bowl', "rubik's cube"]
Frame 10: ['bowl', "rubik's cube"]
Frame 20: ["rubik's cube", 'bowl']
Processed 30/151 frames
Frame 30: ["rubik's cube", 'bowl']
Frame 40: ["rubik's cube", 'bowl']
Frame 50: ["rubik's cube", 'bowl']
Processed 60/151 frames
Frame 60: ['bowl', "rubik's cube"]
Frame 70: ["rubik's cube", 'bowl']
Frame 80: ['bowl', "rubik's cube"]
Processed 90/151 frames
Frame 90: ['bowl', "rubik's cube"]
Frame 100: ['bowl', "rubik's cube"]
Frame 110: ['bowl', "rubik's cube"]
Processed 120/151 frames
Frame 120: ['bowl', "rubik's cube"]
Frame 130: ["rubik's cube", 'blue bowl']
Frame 140: ["rubik's cube"]
Processed 150/151 frames
Frame 150: ["rubik's cube in a bowl"]
Done! Saved to pi0.5_scene1_ep0_gemini_labeled.mp4


In [11]:
import numpy as np

def label_combined_video(video_path, output_path, sample_every_n_frames=10):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    half_width = width // 2
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    frame_idx = 0
    last_external_detections = None
    last_wrist_detections = None
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        external_frame = frame[:, :half_width]
        wrist_frame = frame[:, half_width:]
        
        if frame_idx % sample_every_n_frames == 0:
            print(f"Frame {frame_idx}")
            try:
                # Label external (left) side
                ext_bytes = convert_np_to_bytes(external_frame)
                ext_json = query_gemini(ext_bytes)
                last_external_detections = scale_points(ext_json, external_frame)
                
                # Label wrist (right) side
                wrist_bytes = convert_np_to_bytes(wrist_frame)
                wrist_json = query_gemini(wrist_bytes)
                last_wrist_detections = scale_points(wrist_json, wrist_frame)
            except Exception as e:
                print(f"Frame {frame_idx}: Error - {e}")
        
        # Draw on each half
        if last_external_detections:
            external_frame = plot_points(external_frame, last_external_detections)
        if last_wrist_detections:
            wrist_frame = plot_points(wrist_frame, last_wrist_detections)
        
        # Combine back together
        combined = np.hstack([external_frame, wrist_frame])
        out.write(combined)
        frame_idx += 1
    
    cap.release()
    out.release()

# Use it
label_combined_video("pi0.5_scene1_ep0.mp4", "pi0.5_scene1_ep0_both_labeled.mp4")

Frame 0
Frame 10
Frame 20
Frame 30
Frame 40
Frame 50
Frame 60
Frame 70
Frame 80
Frame 90
Frame 100
Frame 110
Frame 120
Frame 130
Frame 140
Frame 150
