In [1]:
import os
import sam3
import torch

sam3_root = os.path.join(os.path.dirname(sam3.__file__), "..")


In [2]:
# Configuration flags for notebook demo
save_frames = True  # set True to save per-frame overlays
visualize_frames = False  # set False to disable cv2.imshow window
output_dir = os.path.join(sam3_root, "outputs", "real_time","bedroom_stream")
output_video = os.path.join(output_dir, "bedroom_stream.mp4")


In [3]:
from sam3.model_builder import build_sam3_stream_predictor

# Initialize predictor (single-GPU streaming)
device = "cuda" if torch.cuda.is_available() else "cpu"
predictor = build_sam3_stream_predictor(device=device)

resp = predictor.handle_request({"type": "start_session"})
session_id = resp["session_id"]
session_id


[0m[32mINFO 2025-11-27 17:45:23,017 2193781 sam3_video_base.py: 124:[0m setting max_num_objects=10000 and num_obj_for_compile=16


'599e3424-c08a-4ccd-8e3b-4fed9dc72dd6'

In [4]:
# Open video and stream frames
import os, cv2
video_path = os.path.join(sam3_root, "assets", "videos", "bedroom.mp4")
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    raise RuntimeError(f"Failed to open video: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS) or 30
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0)
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0)

writer = None
if output_video:
    fourcc = cv2.VideoWriter_fourcc(*"X264")
    writer = cv2.VideoWriter(output_video, fourcc, fps, (width, height))
if save_frames and not os.path.exists(output_dir):
    os.makedirs(output_dir, exist_ok=True)

frame_idx = 0


OpenCV: FFMPEG: tag 0x34363258/'X264' is not supported with codec id 27 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x31637661/'avc1'
[ERROR:0@0.013] global cap_ffmpeg_impl.hpp:3207 open Could not find encoder for codec_id=27, error: Encoder not found
[ERROR:0@0.013] global cap_ffmpeg_impl.hpp:3285 open VIDEOIO/FFMPEG: Failed to initialize VideoWriter


In [None]:
# Streaming loop
from sam3.visualization_utils import render_masklet_frame
import time
processed = 0
start_time = time.time()
try:
    while True:
        ret, frame_bgr = cap.read()
        if not ret:
            break
        frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
        # Push frame
        predictor.handle_request({"type": "add_frame", "session_id": session_id, "frame": frame_rgb})
        # Add text prompt only on first frame
        if frame_idx == 0:
            predictor.handle_request({"type": "add_prompt", "session_id": session_id, "frame_index": 0, "text": "a kid"})
        # Run per-frame inference
        resp = predictor.handle_request({"type": "run_inference", "session_id": session_id, "frame_index": frame_idx})
        outputs = resp.get("outputs")
        if outputs is not None:
            overlay = render_masklet_frame(frame_rgb, outputs, frame_idx=frame_idx, alpha=0.5)
        else:
            overlay = frame_rgb
        # Save per-frame overlay
        if save_frames:
            out_path = os.path.join(output_dir, f"frame_{frame_idx:05d}.png")
            cv2.imwrite(out_path, cv2.cvtColor(overlay, cv2.COLOR_RGB2BGR))
        # Append to output video
        if writer is not None:
            writer.write(cv2.cvtColor(overlay, cv2.COLOR_RGB2BGR))
        # Visualize
        if visualize_frames:
            cv2.imshow("SAM3 Streaming", cv2.cvtColor(overlay, cv2.COLOR_RGB2BGR))
            # Press ESC to stop
            if cv2.waitKey(1) & 0xFF == 27:
                break
        frame_idx += 1
        processed += 1
finally:
    cap.release()
    if writer is not None:
        writer.release()
    if visualize_frames:
        cv2.destroyAllWindows()
    predictor.handle_request({"type": "close_session", "session_id": session_id})
end_time = time.time()
elapsed = end_time - start_time
fps_est = processed / elapsed if elapsed > 0 else 0.0
print(f"Processed {processed} frames in {elapsed:.2f}s => {fps_est:.2f} FPS")
processed
