In [None]:
import json
from functools import partial
from pathlib import Path
from PIL import Image
import numpy as np

import cv2
import os
import time
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as T
from torch.utils.data import ConcatDataset
from torch.utils.data import Dataset, random_split, DataLoader
from torchvision.utils import draw_bounding_boxes

from contextlib import redirect_stdout
from pathlib import Path
print(torch.cuda.is_available())


import torchvision.transforms as transforms
import cv2
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.utils import draw_bounding_boxes

In [None]:
# MODELS - https://github.com/pytorch/vision/tree/main/torchvision/models/detection
from torchvision.models.detection import (
    fasterrcnn_resnet50_fpn_v2, FasterRCNN_ResNet50_FPN_V2_Weights,
    fasterrcnn_mobilenet_v3_large_fpn, FasterRCNN_MobileNet_V3_Large_FPN_Weights,
    fasterrcnn_mobilenet_v3_large_320_fpn, FasterRCNN_MobileNet_V3_Large_320_FPN_Weights,
    ssd300_vgg16, SSD300_VGG16_Weights,
    ssdlite320_mobilenet_v3_large, SSDLite320_MobileNet_V3_Large_Weights,
    retinanet_resnet50_fpn_v2, RetinaNet_ResNet50_FPN_V2_Weights,
    fcos_resnet50_fpn, FCOS_ResNet50_FPN_Weights
)
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.retinanet import RetinaNetClassificationHead
from torchvision.models.detection.fcos import FCOSHead

In [None]:
from model_inference import compare_two_models, visualize_detections, display_text_block
from compare_videos import play_top_bottom, play_side_by_side

In [None]:
# ---- device ----
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


def visualize_detections(img_pil, model, class_names, score_threshold=0.5, box_width=4):
    """
    img_pil: PIL.Image (RGB)
    class_names: dict mapping class_id -> string, e.g. {1: 'chuck'}
    returns: PIL.Image with boxes drawn
    """
    transform = transforms.ToTensor()
    img_tensor = transform(img_pil).to(device)  # [C,H,W] on same device as model

    model.eval()
    with torch.no_grad():
        outputs = model([img_tensor])  # list of tensors

    result = outputs[0]
    scores = result["scores"]
    keep = scores > score_threshold
    
    boxes = result["boxes"][keep].detach().to("cpu")
    labels = result["labels"][keep].to("cpu")
    kept_scores = scores[keep].detach().to("cpu")

    # prepare label strings
    label_names_raw = [class_names.get(lbl.item(), str(lbl.item())) for lbl in labels]
    label_names = [f"{name} - {score:.2f}" for name, score in zip(label_names_raw, kept_scores)]

    # draw on a CPU tensor version of the image (uint8 expected)
    img_cpu = (img_tensor.to("cpu") * 255).byte()
    # warnings.warn("Argument 'font_size' will be ignored since 'font' is not set.")
    img_vis = draw_bounding_boxes(
        img_cpu, boxes, labels=label_names, width=box_width, colors="black"
    )

    return torchvision.transforms.ToPILImage()(img_vis)


# # OPTIONAL: stub for display_text_block if you don't already have it
def display_text_block(
    frame_bgr,
    lines,
    x_start=10,
    y_start=None,
    x_gap=0,
    y_gap=20,
    text_color=(255, 255, 255),
    background_color=(0, 0, 0),
):
    # Get frame dimensions
    height, width = frame_bgr.shape[:2]

    # If y_start is not provided, default to near bottom
    if y_start is None:
        y_start = height - 90

    overlay = frame_bgr.copy()
    x, y = x_start, y_start

    for line in lines:
        (tw, th), _ = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)
        cv2.rectangle(
            overlay, (x - 5, y - th - 5), (x + tw + 5, y + 5), background_color, -1
        )
        cv2.putText(
            overlay,
            line,
            (x, y),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.7,
            text_color,
            2,
            cv2.LINE_AA,
        )
        x += x_gap
        y += y_gap

    alpha = 0.6
    return cv2.addWeighted(overlay, alpha, frame_bgr, 1 - alpha, 0)

# 1 video

In [None]:
class_map = {1: "object"}  # your label map
num_classes = 2  # 1 class + background; adjust to your training setup

model_dir = "../models" # "frcnn_mobilenet_inter"
model_type = "v1"
model_name = f"ftmo_{model_type}.pth"
model_path = Path(f"{model_dir}/{model_name}")

input_video_dir = Path(f"../videos_plant_not_working/raw")
output_video_dir = Path(f"../videos_plant_not_working/{model_type}")

video_name = "111"
output_video_name = f"{video_name}_{model_type}"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
##### resnet50_fpn_v2 backbone #####
print(f"\033[91m{model_path}\033[0m")

model = fasterrcnn_resnet50_fpn_v2(weights=FasterRCNN_ResNet50_FPN_V2_Weights.COCO_V1)
# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features
# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
model.load_state_dict(torch.load(model_path, weights_only=False,  map_location=torch.device('cpu')))
model.to(device).eval()

In [None]:
# ---- video I/O ----
video_path = input_video_dir/f"{video_name}.avi"

cap = cv2.VideoCapture(video_path)
assert cap.isOpened(), f"Failed to open {video_path}"

fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

output_video_path = output_video_dir/f"{output_video_name}.avi"
fourcc = cv2.VideoWriter_fourcc(*"XVID")
video_writer_out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

fps_list = []

In [None]:
# ---- main loop ----
while True:
    start = time.time()
    success, frame_bgr = cap.read()
    if not success:
        break

    # OpenCV -> PIL (RGB)
    frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
    img_pil = Image.fromarray(frame_rgb)

    # run detection + visualization (returns PIL)
    frame_vis_pil = visualize_detections(img_pil, model, class_map)

    # PIL -> OpenCV (BGR)
    frame_vis = cv2.cvtColor(np.array(frame_vis_pil), cv2.COLOR_RGB2BGR)

    # keep original size if needed
    if (frame_vis.shape[1], frame_vis.shape[0]) != (width, height):
        frame_vis = cv2.resize(frame_vis, (width, height), interpolation=cv2.INTER_LINEAR)

    # FPS
    elapsed = time.time() - start
    fps_list.append(1.0 / max(elapsed, 1e-6))
    if len(fps_list) > 30:
        fps_list.pop(0)
    running_fps = float(np.mean(fps_list))

    # overlay stats
    frame_vis = display_text_block(frame_vis, [f"FPS : {running_fps:.2f}"])

    # write
    video_writer_out.write(frame_vis)

print(f"Mean FPS over last window: {running_fps:.2f}")
cap.release()
video_writer_out.release()
print(f"Output video saved at {output_video_path}")
# Mean FPS over last window: 6.37
# Output video saved at ../videos_plant_not_working/v1/111_v1.avi

# Multiple videos
Modify model_type to v1, v2, v2_kd

In [None]:
class_map = {1: "chuck"}  # your label map
num_classes = 2  # 1 class + background; adjust to your training setup

model_dir = "../models" # "frcnn_mobilenet_inter"
model_type = "v2"
model_name = f"ftmo_{model_type}.pth"
model_path = Path(f"{model_dir}/{model_name}")

input_video_dir = Path(f"../videos_plant_not_working/raw")
output_video_dir = Path(f"../videos_plant_not_working/{model_type}")

raw_videos = os.listdir(input_video_dir)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
##### resnet50_fpn_v2 backbone #####
print(f"\033[91m{model_path}\033[0m")

model = fasterrcnn_resnet50_fpn_v2(weights=FasterRCNN_ResNet50_FPN_V2_Weights.COCO_V1)
# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features
# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
model.load_state_dict(torch.load(model_path, weights_only=False,  map_location=torch.device('cpu')))
model.to(device).eval()

In [None]:
for raw_video in raw_videos:
    # ---- video I/O ----
    video_path = input_video_dir/f"{raw_video}"
    print(video_path)
    
    cap = cv2.VideoCapture(video_path)
    assert cap.isOpened(), f"Failed to open {video_path}"
    
    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    output_video_path = output_video_dir/f"{raw_video}_{model_type}"
    fourcc = cv2.VideoWriter_fourcc(*"XVID")
    video_writer_out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
    
    fps_list = []

    # ---- main loop ----
    while True:
        start = time.time()
        success, frame_bgr = cap.read()
        if not success:
            break
    
        # OpenCV -> PIL (RGB)
        frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
        img_pil = Image.fromarray(frame_rgb)
    
        # run detection + visualization (returns PIL)
        frame_vis_pil = visualize_detections(img_pil, model, class_map)
    
        # PIL -> OpenCV (BGR)
        frame_vis = cv2.cvtColor(np.array(frame_vis_pil), cv2.COLOR_RGB2BGR)
    
        # keep original size if needed
        if (frame_vis.shape[1], frame_vis.shape[0]) != (width, height):
            frame_vis = cv2.resize(frame_vis, (width, height), interpolation=cv2.INTER_LINEAR)
    
        # FPS
        elapsed = time.time() - start
        fps_list.append(1.0 / max(elapsed, 1e-6))
        if len(fps_list) > 30:
            fps_list.pop(0)
        running_fps = float(np.mean(fps_list))
    
        # overlay stats
        frame_vis = display_text_block(frame_vis, [f"FPS : {running_fps:.2f}"])
    
        # write
        video_writer_out.write(frame_vis)
    
    print(f"Mean FPS over last window: {running_fps:.2f}")
    cap.release()
    video_writer_out.release()
    print(f"Output video saved at {output_video_path}")