In [1]:
from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection
import torch
from torch.utils.data import DataLoader, Dataset
from PIL import Image, ImageDraw
import cv2
import numpy as np
import os
from concurrent.futures import ThreadPoolExecutor
from itertools import chain
from tqdm import tqdm
import json
import copy

In [None]:
VIDEO_PATH = "./videos"
JSON_PATH = "?"
SAMPLE_PATH = "?" # for output videos with bounding boxes
MODEL_PATH = "google/owlv2-base-patch16-ensemble"
BATCH_SIZE = 1
VIDEO_LENGTH = 20 # in seconds
VIDEO_STEP = 1 # in seconds
NUM_VIDEOS = 2
HEATMAP_INTENSITY = 5

In [21]:
timestamps = np.arange(0, VIDEO_LENGTH, VIDEO_STEP)
try:
    video_paths = [os.path.join(VIDEO_PATH, video) for video in os.listdir(VIDEO_PATH)]
except:
    video_paths = [VIDEO_PATH]


def extract_frame_at_timestamp(video_path, timestamp):
    cap = cv2.VideoCapture(video_path)
    
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_number = int(timestamp * fps)
    
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
    
    ret, frame = cap.read()
    if not ret:
        cap.release()
        return None
    
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    cap.release()

    frame_image = Image.fromarray(frame_rgb)
    return frame_image

def process_video(video_path):
    frames = []
    for timestamp in timestamps:
        frame_image = extract_frame_at_timestamp(video_path, timestamp)
        if frame_image:
            frames.append(frame_image)
    return frames

with ThreadPoolExecutor() as executor:
    results = list(executor.map(process_video, video_paths))

img_list = list(chain.from_iterable(results))

In [23]:
thres = len(img_list) // NUM_VIDEOS

rgb_list = np.array(img_list[:thres])
ir_list = np.array(img_list[thres:])

In [None]:
def filters(rgb, ir):
    heatmap = []
    for rgb, ir in zip(rgb_list, ir_list):
        ir_normalized = cv2.normalize(ir, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
        ir_heatmap = cv2.applyColorMap(ir_normalized, cv2.COLORMAP_JET)
        highlighted_image = cv2.addWeighted(rgb, 1.0, ir_heatmap, HEATMAP_INTENSITY, 0)
        heatmap.append(highlighted_image)
    return heatmap

filtered_img = filters(rgb_list, ir_list)

In [9]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

image_processor = AutoProcessor.from_pretrained(MODEL_PATH)
model = AutoModelForZeroShotObjectDetection.from_pretrained(MODEL_PATH).to(device)

In [12]:
class CustomDataset(Dataset):
    def __init__(self, data_dict, text_queries, image_processor):
        self.pixel_values = []
        self.input_ids = []
        self.attention_masks = []

        for img, query in zip(data_dict, text_queries):
            inputs = image_processor(text=query, images=img, return_tensors="pt")
            self.pixel_values.append(inputs["pixel_values"].squeeze(0))
            self.input_ids.append(inputs["input_ids"].squeeze(0))
            self.attention_masks.append(inputs["attention_mask"].squeeze(0))

    def __len__(self):
        return len(self.pixel_values)

    def __getitem__(self, idx):
        return {
            "pixel_values": self.pixel_values[idx],
            "input_ids": self.input_ids[idx],
            "attention_mask": self.attention_masks[idx]
        }

text_queries = [["a small glowing spherical object"] for i in range(len(filtered_img))]
dataset = CustomDataset(filtered_img,text_queries,image_processor)
data_loader = DataLoader(dataset, batch_size=BATCH_SIZE, pin_memory=True)

In [None]:
total_output = []
with torch.no_grad():
    for batch in tqdm(data_loader):
        for k,v in batch.items():
            batch[k] = v.to(device)
        outputs = model(**batch)
        total_output.append(image_processor.post_process_object_detection(
            outputs, 
            threshold=0.1, 
            target_sizes=[(img_list[0].width,img_list[0].height)]))

100%|██████████| 200/200 [00:43<00:00,  4.59it/s]


In [15]:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
fps = 1
frame_size = (img_list[0].width, img_list[0].height)
out = cv2.VideoWriter(SAMPLE_PATH, fourcc, fps, frame_size)

for frame, result in zip(img_list[:thres], total_output):
    draw = ImageDraw.Draw(frame)
    boxes = result[0]["boxes"]
    scores = result[0]["scores"]
    labels = result[0]["labels"]

    for box, score, label in zip(boxes, scores, labels):
        x_min, y_min, x_max, y_max = box
        draw.rectangle([(x_min, y_min), (x_max, y_max)], outline="red", width=3)

    frame_cv2 = cv2.cvtColor(np.array(frame), cv2.COLOR_RGB2BGR)
    out.write(frame_cv2)

out.release()

The code below is used to produce JSON for mathematical script

In [None]:
""" testing = copy.deepcopy(total_output)
for scene in testing:
    for key, value in scene[0].items():
        if isinstance(value, torch.Tensor):
            scene[0][key] = value.tolist() """

In [None]:
""" box_inx = []

for sublist_index, sublist in enumerate(testing):
    for d in sublist:
        if "boxes" in d:
            box_inx.append(d["boxes"])

end_inx = len(filtered_img)
output_json = {"video3":box_inx[:end_inx],
              "video2":box_inx[end_inx:end_inx*2],
              "video1":box_inx[end_inx*2:end_inx*3]} """

In [None]:
""" with open(JSON_PATH, "w") as json_file:
    json.dump(output_json, json_file, indent=4) """