In [2]:
import os
import torch
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import cv2
from IPython import display
import time
from sam2.build_sam import build_sam2_camera_predictor


In [3]:
# # use bfloat16 for the entire notebook
torch.autocast(device_type="cuda", dtype=torch.bfloat16).__enter__()

if torch.cuda.get_device_properties(0).major >= 8:
    # turn on tfloat32 for Ampere GPUs (https://pytorch.org/docs/stable/notes/cuda.html#tensorfloat-32-tf32-on-ampere-devices)
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.allow_tf32 = True

### Building the SAM 2 camera predictor


In [4]:
sam2_checkpoint = "../checkpoints/sam2_hiera_tiny.pt"
model_cfg = "sam2_hiera_t.yaml"

predictor = build_sam2_camera_predictor(model_cfg, sam2_checkpoint)

In [5]:
def show_mask(mask, ax, obj_id=None, random_color=False):
    if random_color:
        color = np.concatenate([np.random.random(3), np.array([0.6])], axis=0)
    else:
        cmap = plt.get_cmap("tab10")
        cmap_idx = 0 if obj_id is None else obj_id
        color = np.array([*cmap(cmap_idx)[:3], 0.6])
    h, w = mask.shape[-2:]
    mask_image = mask.reshape(h, w, 1) * color.reshape(1, 1, -1)
    ax.imshow(mask_image)


def show_points(coords, labels, ax, marker_size=200):
    pos_points = coords[labels == 1]
    neg_points = coords[labels == 0]
    ax.scatter(
        pos_points[:, 0],
        pos_points[:, 1],
        color="green",
        marker="*",
        s=marker_size,
        edgecolor="white",
        linewidth=1.25,
    )
    ax.scatter(
        neg_points[:, 0],
        neg_points[:, 1],
        color="red",
        marker="*",
        s=marker_size,
        edgecolor="white",
        linewidth=1.25,
    )


def show_bbox(bbox, ax, marker_size=200):
    tl, br = bbox[0], bbox[1]
    w, h = (br - tl)[0], (br - tl)[1]
    x, y = tl[0], tl[1]
    print(x, y, w, h)
    ax.add_patch(plt.Rectangle((x, y), w, h, fill=None, edgecolor="blue", linewidth=2))

#### Select an video stream (video or camera)


In [None]:
# from IPython.display import HTML

# # Replace with your public IP address
# video_stream_url = "http://66.27.122.32:5050/video_feed?key=12903hjk1230"

# HTML(f"""
# <iframe src="{video_stream_url}" width="640" height="480"></iframe>
# """)

In [34]:
cap = cv2.VideoCapture("../webcam_test.mp4")
ret, frame = cap.read()
width, height = frame.shape[:2][::-1]

if_init = False
frame_count = 0  # Frame counter for FPS calculation

# Start timing
start_time = time.time()

with torch.inference_mode(), torch.autocast("cuda", dtype=torch.bfloat16):
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        height, width = frame.shape[:2]
        frame_count += 1  # Increment frame count

        # Only perform initialization on the first frame
        if not if_init:
            predictor.load_first_frame(frame)
            if_init = True
            obj_id = 1  # Example object ID
            frame_idx = 0

            # Define the point prompt at one-third from the right, centered vertically
            point = [int(width * 2 / 3), int(height / 2)]
            points = [point]
            labels = [1]  # Positive prompt

            # Initialize segmentation with the point prompt
            _, out_obj_ids, out_mask_logits = predictor.add_new_prompt(frame_idx, obj_id, points=points, labels=labels)

        else:
            # Track the object in subsequent frames
            out_obj_ids, out_mask_logits = predictor.track(frame)
            
        # Process output mask only if it's non-empty
        if out_mask_logits.shape[0] > 0:
            mask = (out_mask_logits[0, 0] > 0).cpu().numpy().astype("uint8") * 255
        else:
            mask = np.zeros((height, width), dtype="uint8")

        # Invert and prepare the mask for overlay (not displayed)
        inverted_mask_colored = cv2.cvtColor(cv2.bitwise_not(mask), cv2.COLOR_GRAY2BGR)
        overlayed_frame = cv2.addWeighted(frame, 0.7, inverted_mask_colored, 0.3, 0)

# End timing
end_time = time.time()
cap.release()

# Calculate FPS
total_time = end_time - start_time
fps = frame_count / total_time
print(f"Processed {frame_count} frames in {total_time:.2f} seconds (FPS: {fps:.2f})")

Processed 333 frames in 15.44 seconds (FPS: 21.56)


In [None]:
import cv2
import numpy as np
import torch
import time
from flask import Flask, Response
import threading

# URL for accessing the raw webcam feed from the local machine
video_feed_url = "http://66.27.122.32:5050/video_feed?key=12903hjk1230"

# Flask app to serve the processed video stream
app = Flask(__name__)

# Frame lock and processed frame storage
frame_lock = threading.Lock()
processed_frame = None

# Function to capture and process the frames from the raw feed
def process_frames():
    global processed_frame
    cap = cv2.VideoCapture(video_feed_url)

    if not cap.isOpened():
        raise RuntimeError("Unable to open video feed from URL.")

    if_init = False
    frame_count = 0  # Frame counter for FPS calculation

    # Start timing
    start_time = time.time()

    with torch.inference_mode(), torch.autocast("cuda", dtype=torch.bfloat16):
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            height, width = frame.shape[:2]
            frame_count += 1  # Increment frame count

            # Only perform initialization on the first frame
            if not if_init:
                predictor.load_first_frame(frame)
                if_init = True
                obj_id = 1  # Example object ID
                frame_idx = 0

                # Define the point prompt at one-third from the right, centered vertically
                point = [int(width * 2 / 3), int(height / 2)]
                points = [point]
                labels = [1]  # Positive prompt

                # Initialize segmentation with the point prompt
                _, out_obj_ids, out_mask_logits = predictor.add_new_prompt(frame_idx, obj_id, points=points, labels=labels)
            else:
                # Track the object in subsequent frames
                out_obj_ids, out_mask_logits = predictor.track(frame)

            # Process output mask only if it's non-empty
            if out_mask_logits.shape[0] > 0:
                mask = (out_mask_logits[0, 0] > 0).cpu().numpy().astype("uint8") * 255
            else:
                mask = np.zeros((height, width), dtype="uint8")

            # Invert and prepare the mask for overlay (not displayed)
            inverted_mask_colored = cv2.cvtColor(cv2.bitwise_not(mask), cv2.COLOR_GRAY2BGR)
            overlayed_frame = cv2.addWeighted(frame, 0.7, inverted_mask_colored, 0.3, 0)

            # Update the processed frame for Flask
            with frame_lock:
                processed_frame = overlayed_frame

    # End timing
    end_time = time.time()
    cap.release()

    # Calculate FPS
    total_time = end_time - start_time
    fps = frame_count / total_time
    print(f"Processed {frame_count} frames in {total_time:.2f} seconds (FPS: {fps:.2f})")

# Start a background thread to process the frames continuously
processing_thread = threading.Thread(target=process_frames)
processing_thread.daemon = True
processing_thread.start()

# Flask endpoint to stream the processed video
@app.route('/processed_feed')
def processed_feed():
    def generate_processed_frames():
        global processed_frame
        while True:
            with frame_lock:
                if processed_frame is None:
                    continue
                _, buffer = cv2.imencode('.jpg', processed_frame)
                frame = buffer.tobytes()
                yield (b'--frame\r\n'
                       b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')

    return Response(generate_processed_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5001)


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5001
 * Running on http://204.12.253.6:5001
 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5001
 * Running on http://204.12.253.6:5001
[33mPress CTRL+C to quit[0m
INFO:werkzeug:[33mPress CTRL+C to quit[0m


In [None]:
from IPython.display import HTML

# URL for the processed video feed served by the Flask app in the remote Jupyter notebook
processed_feed_url = "http://0.0.0.0:5001/processed_feed"

HTML(f"""
<iframe src="{processed_feed_url}" width="640" height="480"></iframe>
""")
