# Imports

In [None]:
# Google Colab only
'''
!pip install pymongo scenedetect

from google.colab import drive
drive.mount('/content/drive')
'''

In [None]:
# Activate GPU
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
# More Memory
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

In [None]:
import logging
import os
from concurrent.futures import ThreadPoolExecutor

import cv2
import re
import numpy as np
import torch
from pymongo import MongoClient
from scenedetect import VideoManager, SceneManager
from scenedetect.detectors import ContentDetector, ThresholdDetector
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input as preprocess_input_resnet
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input as preprocess_input_vgg
from tqdm import tqdm



# Video Pre-processing

In [None]:
# Configuration
input_dir = 'V3C1-100/'
output_dir = 'preprocessed_videos/'
output_format = 'mp4'

# Configuration Google Colab
# input_dir = '/content/drive/MyDrive/V3C1-100'
# output_dir = '/content/drive/MyDrive/preprocessed_videos'
# output_format = 'mp4'

resize_width = 640
resize_height = 480
convert_to_grayscale = False
frame_rate = 24  # Target frame rate
max_workers = 1

# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)

def preprocess_video(input_path, output_path, resize_dim, grayscale, frame_rate):
    try:
        print(f"Processing video: {input_path}")
        cap = cv2.VideoCapture(input_path)
        if not cap.isOpened():
            raise ValueError(f"Failed to open video file: {input_path}")

        original_frame_rate = cap.get(cv2.CAP_PROP_FPS)
        if original_frame_rate == 0:
            raise ValueError(f"Failed to get frame rate for video file: {input_path}")

        frame_interval = int(original_frame_rate // frame_rate)

        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, frame_rate, resize_dim, not grayscale)

        frame_count = 0
        prev_gray = None

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            if frame_count % frame_interval == 0:
                # Resize frame
                frame = cv2.resize(frame, resize_dim, interpolation=cv2.INTER_AREA)
                # # Convert to grayscale if needed for optical flow
                # gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

                # # Apply noise reduction (computationally expensive)
                # frame = cv2.fastNlMeansDenoisingColored(frame, None, 10, 10, 7, 21)
                # # Apply histogram equalization
                # if grayscale:
                #     frame = cv2.equalizeHist(frame)
                # else:
                #     for i in range(3):
                #         frame[:, :, i] = cv2.equalizeHist(frame[:, :, i])
                # # Edge detection
                # edges = cv2.Canny(frame, 100, 200)

                # # Optical flow calculation
                # if prev_gray is not None:
                #     flow = cv2.calcOpticalFlowFarneback(prev_gray, gray_frame, None, 0.5, 3, 15, 3, 5, 1.2, 0)
                #     mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
                #     hsv = np.zeros_like(frame)
                #     hsv[..., 1] = 255
                #     hsv[..., 0] = ang * 180 / np.pi / 2
                #     hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
                #     optical_flow = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
                #     frame = cv2.addWeighted(frame, 0.5, optical_flow, 0.5, 0)
                # prev_gray = gray_frame

                # # Feature extraction using HOG
                # hog = cv2.HOGDescriptor()
                # hog_features = hog.compute(frame)

                # # Keypoint descriptors (ORB example)
                # orb = cv2.ORB_create()
                # kp, des = orb.detectAndCompute(frame, None)
                # frame = cv2.drawKeypoints(frame, kp, None, color=(0, 255, 0), flags=0)
                
                out.write(frame)
            frame_count += 1

        cap.release()
        out.release()
        print(f"Successfully processed video: {input_path}")
    except Exception as e:
        print(f"Error processing {input_path}: {e}")

def get_video_files(input_directory):
    video_files = []
    for root, _, files in os.walk(input_directory):
        for file in files:
            if file.endswith(('.mp4', '.avi', '.mov', '.mkv')):
                video_files.append(os.path.join(root, file))
    return video_files

def process_videos(video_files, output_directory, resize_dim, grayscale, frame_rate):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        with tqdm(total=len(video_files), desc="Processing Videos", unit="video") as pbar:
            for video_file in video_files:
                relative_path = os.path.relpath(video_file, input_dir)
                output_file = os.path.join(output_directory, os.path.splitext(relative_path)[0] + '.' + output_format)
                os.makedirs(os.path.dirname(output_file), exist_ok=True)
                future = executor.submit(preprocess_video, video_file, output_file, resize_dim, grayscale, frame_rate)
                futures.append(future)

            for future in futures:
                future.add_done_callback(lambda p: pbar.update())
            for future in futures:
                future.result()  # Wait for all threads to complete

print("Starting video pre-processing...")
video_files = get_video_files(input_dir)
print(f"Found {len(video_files)} video files.")
resize_dim = (resize_width, resize_height)
process_videos(video_files, output_dir, resize_dim, convert_to_grayscale, frame_rate)
print("All videos processed successfully.")


# Shot Boundary Detection

In [None]:
# Configuration
input_dir = 'preprocessed_videos/'
output_dir = 'shot_boundaries/'
keyframe_dir = 'keyframes/'

# Configuration Google Colab
# input_dir = '/content/drive/MyDrive/preprocessed_videos'
# output_dir = '/content/drive/MyDrive/shot_boundaries'
# keyframe_dir = '/content/drive/MyDrive/keyframes'

min_scene_length = 15  # Minimum length of a scene in frames
threshold = 30.0  # Threshold for the ThresholdDetector
min_scene_len = 2  # Minimum number of frames a scene should last
hist_threshold = 0.4  # Threshold for histogram comparison

# Ensure the output and keyframe directories exist
os.makedirs(output_dir, exist_ok=True)
os.makedirs(keyframe_dir, exist_ok=True)

def calculate_histogram_difference(frame1, frame2):
    hist1 = cv2.calcHist([frame1], [0], None, [256], [0, 256])
    hist2 = cv2.calcHist([frame2], [0], None, [256], [0, 256])
    cv2.normalize(hist1, hist1)
    cv2.normalize(hist2, hist2)
    return cv2.compareHist(hist1, hist2, cv2.HISTCMP_CORREL)

def detect_shot_boundaries(video_path, output_path, keyframe_path):
    video_manager = VideoManager([video_path])
    scene_manager = SceneManager()

    # Add ContentDetector and ThresholdDetector
    scene_manager.add_detector(ContentDetector(threshold=30.0, min_scene_len=min_scene_length))
    scene_manager.add_detector(ThresholdDetector(threshold=threshold, min_scene_len=min_scene_len))

    video_manager.set_downscale_factor()
    video_manager.start()
    scene_manager.detect_scenes(frame_source=video_manager)
    scenes = scene_manager.get_scene_list()
    print(f"Detected {len(scenes)} scenes in video {video_path}")

    # Additional processing for gradual transitions
    cap = cv2.VideoCapture(video_path)
    prev_frame = None
    prev_gray = None
    frame_num = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        if prev_frame is not None:
            hist_diff = calculate_histogram_difference(prev_frame, frame)
            if hist_diff < hist_threshold:
                # Gradual transition detected
                scenes.append((frame_num, frame_num + min_scene_len))
            # Motion analysis using optical flow
            if prev_gray is not None:
                flow = cv2.calcOpticalFlowFarneback(prev_gray, gray_frame, None, 0.5, 3, 15, 3, 5, 1.2, 0)
                mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
                motion_magnitude = np.mean(mag)
                if motion_magnitude > threshold:
                    scenes.append((frame_num, frame_num + min_scene_len))
        prev_frame = frame
        prev_gray = gray_frame
        frame_num += 1
    cap.release()

    # Remove duplicate and sort scenes
    scenes = sorted(list(set(scenes)))
    print(f"Total scenes after processing: {len(scenes)}")

    # Save shot boundaries to a file
    with open(output_path, 'w') as f:
        for start_time, end_time in scenes:
            f.write(f"{start_time}, {end_time}\n")
            # f.write(f"{start_time.get_seconds()}, {end_time.get_seconds()}\n")
            # f.write(f"{start_time.get_frames()}, {end_time.get_frames()}\n")
        print(f"Shot boundaries saved to {output_path}")

    # Extract keyframes for each detected scene
    cap = cv2.VideoCapture(video_path)
    for start, end in scenes:
        cap.set(cv2.CAP_PROP_POS_FRAMES, int(start))
        ret, frame = cap.read()
        if ret:
            keyframe_filename = os.path.join(keyframe_path, f"{os.path.basename(video_path)}_start_{start}.jpg")
            cv2.imwrite(keyframe_filename, frame)
    cap.release()
    print(f"Keyframes saved to {keyframe_path}")


def process_videos(video_files, output_directory, keyframe_directory):
    for video_file in tqdm(video_files, desc="Detecting Shot Boundaries", unit="video"):
        output_file = os.path.join(output_directory, os.path.splitext(os.path.basename(video_file))[0] + '_shots.txt')
        keyframe_path = os.path.join(keyframe_directory, os.path.splitext(os.path.basename(video_file))[0])
        os.makedirs(keyframe_path, exist_ok=True)
        try:
            detect_shot_boundaries(video_file, output_file, keyframe_path)
        except Exception as e:
            print(f"Error processing {video_file}: {e}")

def get_video_files(input_directory):
    video_files = []
    for root, _, files in os.walk(input_directory):
        for file in files:
            if file.endswith(('.mp4', '.avi', '.mov', '.mkv')):
                match = re.search(r'\d+', file)
                if match:
                    number = int(match.group())
                    if 126 <= number <= 149:
                        video_files.append(os.path.join(root, file))
    return video_files


print("Starting shot boundary detection...")
video_files = get_video_files(input_dir)
print(f"Found {len(video_files)} video files to process.")
print("Video files:", video_files)
process_videos(video_files, output_dir, keyframe_dir)
print("Shot boundary detection completed successfully.") 

# Feature Extraction

### Using YOLOv5

In [None]:
import cv2
import numpy as np
import os
import xml.etree.ElementTree as ET
from sklearn.metrics import confusion_matrix, recall_score
import matplotlib.pyplot as plt
import seaborn as sns

# Ensure the correct paths to your YOLO files
yolo_cfg_path = "weights/yolov3.cfg"  # Update this path if necessary
yolo_weights_path = "weights/yolov3.weights"  # Update this path if necessary

# Load YOLO
net = cv2.dnn.readNet(yolo_weights_path, yolo_cfg_path)
layer_names = net.getLayerNames()
output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]

# The full COCO class names for YOLOv3
coco_classes = ["person", "bicycle", "car", "motorbike", "aeroplane", "bus", "train", "truck", 
                "boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench", 
                "bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", 
                "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee", 
                "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", 
                "skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup", 
                "fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", 
                "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "sofa", 
                "pottedplant", "bed", "diningtable", "toilet", "tvmonitor", "laptop", "mouse", 
                "remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink", 
                "refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier", 
                "toothbrush"]

# The relevant classes for evaluation
relevant_classes = ["person", "bird", "truck", "horse", "car"]

# Mapping to unify bus and train as truck
class_mapping = {
    "bus": "truck",
    "train": "truck"
}

# Path to annotated frames and annotations
frame_dir = '00110'
annotation_dir = '00110'

def get_ground_truth_labels(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    labels = []
    boxes = []
    for member in root.findall('object'):
        label = member.find('name').text
        if label in class_mapping:
            label = class_mapping[label]
        if label in relevant_classes:
            labels.append(label)
            bndbox = member.find('bndbox')
            xmin = int(bndbox.find('xmin').text)
            ymin = int(bndbox.find('ymin').text)
            xmax = int(bndbox.find('xmax').text)
            ymax = int(bndbox.find('ymax').text)
            boxes.append([xmin, ymin, xmax, ymax])
    return labels, boxes

def iou(box1, box2):
    """Calculate Intersection Over Union (IOU) of two bounding boxes."""
    x1, y1, x2, y2 = box1
    x1_p, y1_p, x2_p, y2_p = box2

    xi1 = max(x1, x1_p)
    yi1 = max(y1, y1_p)
    xi2 = min(x2, x2_p)
    yi2 = min(y2, y2_p)
    inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)

    box1_area = (x2 - x1) * (y2 - y1)
    box2_area = (x2_p - x1_p) * (y2_p - y1_p)
    union_area = box1_area + box2_area - inter_area

    iou = inter_area / union_area
    return iou

true_labels = []
predicted_labels = []

for frame_name in os.listdir(frame_dir):
    if frame_name.endswith(".jpg"):
        frame_path = os.path.join(frame_dir, frame_name)
        annotation_path = os.path.join(annotation_dir, frame_name.replace(".jpg", ".xml"))
        
        img = cv2.imread(frame_path)
        if img is None:
            print(f"Error reading image: {frame_path}")
            continue
        height, width, channels = img.shape

        # Get ground truth labels and boxes
        if not os.path.exists(annotation_path):
            print(f"Annotation file not found: {annotation_path}")
            continue
        gt_labels, gt_boxes = get_ground_truth_labels(annotation_path)
        
        # Detecting objects
        blob = cv2.dnn.blobFromImage(img, 0.00392, (416, 416), (0, 0, 0), True, crop=False)
        net.setInput(blob)
        outs = net.forward(output_layers)

        class_ids = []
        confidences = []
        boxes = []
        for out in outs:
            for detection in out:
                scores = detection[5:]
                class_id = np.argmax(scores)
                confidence = scores[class_id]
                if confidence > 0.5:
                    center_x = int(detection[0] * width)
                    center_y = int(detection[1] * height)
                    w = int(detection[2] * width)
                    h = int(detection[3] * height)

                    x = int(center_x - w / 2)
                    y = int(center_y - h / 2)

                    boxes.append([x, y, w, h])
                    confidences.append(float(confidence))
                    class_ids.append(class_id)

        indexes = cv2.dnn.NMSBoxes(boxes, confidences, 0.5, 0.4)

        frame_predicted_labels = []
        detected_boxes = []
        for i in range(len(boxes)):
            if i in indexes:
                x, y, w, h = boxes[i]
                detected_box = [x, y, x + w, y + h]
                
                # Map detected class_id to relevant classes, considering the mapping
                detected_class = coco_classes[class_ids[i]]
                if detected_class in class_mapping:
                    detected_class = class_mapping[detected_class]
                if detected_class in relevant_classes:
                    frame_predicted_labels.append(detected_class)
                    detected_boxes.append(detected_box)
                    
                    # Drawing boxes on the image for visualization
                    color = (0, 255, 0)
                    cv2.rectangle(img, (x, y), (x + w, y + h), color, 2)
                    cv2.putText(img, detected_class, (x, y + 30), cv2.FONT_HERSHEY_PLAIN, 3, color, 3)

        # Collecting true and predicted labels
        matched_predictions = [False] * len(detected_boxes)
        for label, gt_box in zip(gt_labels, gt_boxes):
            true_labels.append(label)
            matched = False
            for i, detected_box in enumerate(detected_boxes):
                if iou(gt_box, detected_box) > 0.5:
                    if not matched_predictions[i]:
                        predicted_labels.append(frame_predicted_labels[i])
                        matched_predictions[i] = True
                        matched = True
                        break
            if not matched:
                predicted_labels.append("none")

        for i, detected_box in enumerate(detected_boxes):
            if not matched_predictions[i]:
                true_labels.append("none")
                predicted_labels.append(frame_predicted_labels[i])

        # Show the image with detections
        cv2.imshow("Image", img)
        cv2.waitKey(1)  # Display each frame for a short time

cv2.destroyAllWindows()

# Print the true and predicted labels for debugging
print("True Labels:", true_labels)
print("Predicted Labels:", predicted_labels)

# Evaluate Performance
# Ensure we only have relevant class labels in the final lists
filtered_true_labels = [label for label in true_labels if label in relevant_classes + ["none"]]
filtered_predicted_labels = [label for label in predicted_labels if label in relevant_classes + ["none"]]

# Calculate confusion matrix
conf_matrix = confusion_matrix(filtered_true_labels, filtered_predicted_labels, labels=relevant_classes + ["none"])
print("Confusion Matrix:")
print(conf_matrix)

# Calculate recall for each class
recall = recall_score(filtered_true_labels, filtered_predicted_labels, average=None, labels=relevant_classes)
print("Recall for each class:")
print(recall)

# Weighted average recall
weighted_recall = recall_score(filtered_true_labels, filtered_predicted_labels, average='weighted', labels=relevant_classes)
print("Weighted Recall:")
print(weighted_recall)


# Plotting the confusion matrix
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix, annot=True, fmt="d", xticklabels=relevant_classes + ["none"], yticklabels=relevant_classes + ["none"], cmap="Blues")
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()

# Using CLIP

In [None]:
import os
import torch
from PIL import Image
from transformers import CLIPProcessor, CLIPModel
import torchvision.transforms as T
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from sklearn.metrics import confusion_matrix, recall_score
import matplotlib.pyplot as plt
import seaborn as sns
import xml.etree.ElementTree as ET

# Load models and processors
device = "cuda" if torch.cuda.is_available() else "cpu"
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
faster_rcnn = fasterrcnn_resnet50_fpn(pretrained=True).to(device)
faster_rcnn.eval()

# Define the folder containing images
frame_dir = '00110'
annotation_dir = '00110'

# Define the queries for evaluation
queries = ["A photo of a person", "A photo of a bird", "A photo of a truck", "A photo of a horse", "A photo of a car"]
query_labels = ["person", "bird", "truck", "horse", "car"]

# Mapping to unify bus and train as truck
class_mapping = {
    "bus": "truck",
    "train": "truck"
}

def get_ground_truth_labels(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    labels = []
    boxes = []
    for member in root.findall('object'):
        label = member.find('name').text
        if label in class_mapping:
            label = class_mapping[label]
        if label in query_labels:
            labels.append(label)
            bndbox = member.find('bndbox')
            xmin = int(bndbox.find('xmin').text)
            ymin = int(bndbox.find('ymin').text)
            xmax = int(bndbox.find('xmax').text)
            ymax = int(bndbox.find('ymax').text)
            boxes.append([xmin, ymin, xmax, ymax])
    return labels, boxes

def iou(box1, box2):
    """Calculate Intersection Over Union (IOU) of two bounding boxes."""
    x1, y1, x2, y2 = box1
    x1_p, y1_p, x2_p, y2_p = box2

    xi1 = max(x1, x1_p)
    yi1 = max(y1, y1_p)
    xi2 = min(x2, x2_p)
    yi2 = min(y2, y2_p)
    inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)

    box1_area = (x2 - x1) * (y2 - y1)
    box2_area = (x2_p - x1_p) * (y2_p - y1_p)
    union_area = box1_area + box2_area - inter_area

    iou = inter_area / union_area
    return iou

true_labels = []
predicted_labels = []

for frame_name in os.listdir(frame_dir):
    if frame_name.endswith(".jpg"):
        frame_path = os.path.join(frame_dir, frame_name)
        annotation_path = os.path.join(annotation_dir, frame_name.replace(".jpg", ".xml"))
        
        img = Image.open(frame_path).convert("RGB")
        if img is None:
            print(f"Error reading image: {frame_path}")
            continue
        
        # Get ground truth labels and boxes
        if not os.path.exists(annotation_path):
            print(f"Annotation file not found: {annotation_path}")
            continue
        gt_labels, gt_boxes = get_ground_truth_labels(annotation_path)

        # Convert image to tensor
        transform = T.Compose([T.ToTensor()])
        img_tensor = transform(img).to(device)

        # Generate bounding boxes using Faster R-CNN
        with torch.no_grad():
            predictions = faster_rcnn([img_tensor])
        pred_boxes = predictions[0]['boxes'].cpu().numpy()
        pred_scores = predictions[0]['scores'].cpu().numpy()

        detected_labels = []
        detected_boxes = []

        for box, score in zip(pred_boxes, pred_scores):
            if score > 0.5:  # Adjust threshold as needed
                xmin, ymin, xmax, ymax = box
                cropped_img = img.crop((xmin, ymin, xmax, ymax))
                inputs = clip_processor(text=queries, images=cropped_img, return_tensors="pt", padding=True).to(device)
                outputs = clip_model(**inputs)
                logits_per_image = outputs.logits_per_image.softmax(dim=1).detach().cpu().numpy().flatten()

                best_idx = logits_per_image.argmax()
                detected_label = query_labels[best_idx]
                confidence = logits_per_image[best_idx]

                detected_labels.append((detected_label, confidence, [xmin, ymin, xmax, ymax]))

        # For visualization and evaluation
        frame_true_labels = []
        frame_predicted_labels = []
        matched_predictions = [False] * len(detected_labels)

        for label, gt_box in zip(gt_labels, gt_boxes):
            frame_true_labels.append(label)
            matched = False
            for i, (detected_label, _, detected_box) in enumerate(detected_labels):
                if iou(gt_box, detected_box) > 0.5 and not matched_predictions[i]:
                    frame_predicted_labels.append(detected_label)
                    matched_predictions[i] = True
                    matched = True
                    break
            if not matched:
                frame_predicted_labels.append("none")

        for i, (detected_label, _, detected_box) in enumerate(detected_labels):
            if not matched_predictions[i]:
                frame_true_labels.append("none")
                frame_predicted_labels.append(detected_label)

        true_labels.extend(frame_true_labels)
        predicted_labels.extend(frame_predicted_labels)

# Print the true and predicted labels for debugging
print("True Labels:", true_labels)
print("Predicted Labels:", predicted_labels)

# Ensure the lengths are equal
min_length = min(len(true_labels), len(predicted_labels))
filtered_true_labels = true_labels[:min_length]
filtered_predicted_labels = predicted_labels[:min_length]

# Calculate confusion matrix
conf_matrix = confusion_matrix(filtered_true_labels, filtered_predicted_labels, labels=query_labels + ["none"])
print("Confusion Matrix:")
print(conf_matrix)

# Calculate recall for each class
recall = recall_score(filtered_true_labels, filtered_predicted_labels, average=None, labels=query_labels)
print("Recall for each class:")
print(recall)

# Weighted average recall
weighted_recall = recall_score(filtered_true_labels, filtered_predicted_labels, average='weighted', labels=query_labels)
print("Weighted Recall:")
print(weighted_recall)

# Plotting the confusion matrix
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix, annot=True, fmt="d", xticklabels=query_labels + ["none"], yticklabels=query_labels + ["none"], cmap="Blues")
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()


# Using CLIP and BLIP

In [None]:
import os
import torch
from PIL import Image
from transformers import CLIPProcessor, CLIPModel, BlipProcessor, BlipForConditionalGeneration
import torchvision.transforms as T
from torchvision.models.detection import FasterRCNN, fasterrcnn_resnet50_fpn
import matplotlib.pyplot as plt
from pymongo import MongoClient, errors
import datetime

# Define the folder containing images
folder_path = "keyframes/00102"

# Define the paths to the weights
fasterrcnn_weights_path = "weights/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth"
resnet50_weights_path = "weights/resnet50-0676ba61.pth"

# Check if the files exist
assert os.path.exists(fasterrcnn_weights_path), "Faster R-CNN weights file not found!"
assert os.path.exists(resnet50_weights_path), "ResNet50 weights file not found!"

# Print paths to verify
print(f"Faster R-CNN weights path: {fasterrcnn_weights_path}")
print(f"ResNet50 weights path: {resnet50_weights_path}")

# Load the ResNet50 backbone with local weights
from torchvision.models import resnet50
backbone = resnet50(pretrained=False)
backbone_state_dict = torch.load(resnet50_weights_path, map_location=torch.device('cpu'))

# Remove the fully connected layer weights from the state dictionary
backbone_state_dict.pop("fc.weight", None)
backbone_state_dict.pop("fc.bias", None)

# Load the state dictionary with strict=False to ignore missing keys
backbone.load_state_dict(backbone_state_dict, strict=False)

# Create a custom backbone with FPN from the loaded ResNet50 backbone
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone

# Use the backbone with FPN, ensuring it uses the locally loaded weights
backbone_with_fpn = resnet_fpn_backbone('resnet50', pretrained=False, norm_layer=torch.nn.BatchNorm2d)
backbone_with_fpn.body.load_state_dict(backbone.state_dict(), strict=False)

# Load the Faster R-CNN model with the custom backbone
detection_model = FasterRCNN(backbone=backbone_with_fpn, num_classes=91)  # Use the backbone explicitly
detection_model.load_state_dict(torch.load(fasterrcnn_weights_path, map_location=torch.device('cpu')))
detection_model.eval()

# Load the CLIP model and processor
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# Load the BLIP captioning model and processor
blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")

# MongoDB setup with error handling
try:
    client = MongoClient('mongodb://localhost:27017/', serverSelectionTimeoutMS=5000)
    client.server_info()  # Trigger exception if cannot connect to db
    db = client['object_detection']
    collection = db['detected_objects']
except errors.ServerSelectionTimeoutError as err:
    print("Failed to connect to MongoDB server:", err)
    exit(1)

# Transform for the object detection model
transform = T.Compose([T.ToTensor()])

# Function to generate captions using BLIP
def generate_caption(image):
    inputs = blip_processor(images=image, return_tensors="pt")
    out = blip_model.generate(**inputs)
    caption = blip_processor.decode(out[0], skip_special_tokens=True)
    return caption

# Process each image in the folder
for filename in os.listdir(folder_path):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
        image_path = os.path.join(folder_path, filename)
        image = Image.open(image_path)

        # Transform image for the detection model
        image_tensor = transform(image)

        # Get bounding boxes
        with torch.no_grad():
            detections = detection_model([image_tensor])[0]

        # Filter out low-confidence detections
        threshold = 0.5
        boxes = [box for box, score in zip(detections['boxes'], detections['scores']) if score > threshold]

        detected_objects = []

        # Use BLIP to generate captions for objects within bounding boxes
        for box in boxes:
            xmin, ymin, xmax, ymax = box.int().numpy()
            cropped_image = image.crop((xmin, ymin, xmax, ymax))
            caption = generate_caption(cropped_image)
            inputs = clip_processor(text=[caption], images=cropped_image, return_tensors="pt", padding=True)
            outputs = clip_model(**inputs)
            probs = outputs.logits_per_image.softmax(dim=1).detach().cpu().numpy()[0]
            detected_label = caption  # Use the generated caption as the label
            confidence = probs.max()

            detected_objects.append({
                "box": [xmin, ymin, xmax, ymax],
                "label": detected_label,
                "confidence": float(confidence)
            })

            # Prepare the data to be stored in MongoDB
            detected_object = {
                "filename": filename,
                "label": detected_label,
                "confidence": float(confidence),
                "box": [int(xmin), int(ymin), int(xmax), int(ymax)],
                "timestamp": datetime.datetime.utcnow()
            }

            # Insert the data into MongoDB
            collection.insert_one(detected_object)
            print(f"Image: {filename}, Detected {detected_label} with confidence {confidence:.4f} within box {box}")

        # Optionally, display the image with detected bounding boxes and labels
        plt.imshow(image)
        plt.axis('off')
        ax = plt.gca()
        for obj in detected_objects:
            xmin, ymin, xmax, ymax = obj['box']
            detected_label = obj['label']
            confidence = obj['confidence']
            rect = plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin, fill=False, color='red', linewidth=2)
            ax.add_patch(rect)
            plt.text(xmin, ymin, f'{detected_label} {confidence:.2f}', bbox=dict(facecolor='yellow', alpha=0.5))

        plt.show()


In [None]:
# Configuration
keyframe_dir = 'keyframes/'
db_name = 'video_features_db'
collection_name = 'features'
batch_size = 32
yolo_model_path = 'yolov5s.pt'  # Using the smallest version of YOLOv5 for demonstration

# Configuration Google Colab
# keyframe_dir = '/content/drive/MyDrive/keyframes'
# db_name = 'video_features_db'
# collection_name = 'features'
# batch_size = 32
# yolo_model_path = '/content/drive/MyDrive/yolov5s.pt'  # Using the smallest version of YOLOv5 for demonstration

# Initialize logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Initialize MongoDB client
client = MongoClient('localhost', 27017)
db = client[db_name]
collection = db[collection_name]

# Initialize pre-trained models
vgg_model = VGG16(weights='imagenet', include_top=False, pooling='avg')
resnet_model = ResNet50(weights='imagenet', include_top=False, pooling='avg')

# Load YOLOv5 model
yolo_model = torch.hub.load('ultralytics/yolov5', 'custom', path=yolo_model_path)

def extract_features(model, preprocess_input, img):
    try:
        img = cv2.resize(img, (224, 224))
        img = img.astype('float32')
        img = preprocess_input(img)
        img = np.expand_dims(img, axis=0)
        features = model.predict(img)
        return features.flatten()
    except Exception as e:
        logging.error(f"Error extracting features: {e}")
        return None

def detect_objects_yolo(img):
    try:
        results = yolo_model(img)
        detected_objects = results.pandas().xyxy[0].to_dict(orient="records")
        return detected_objects
    except Exception as e:
        logging.error(f"Error detecting objects with YOLO: {e}")
        return []

def process_keyframes(keyframe_directory, model, preprocess_input, model_name):
    try:
        for root, _, files in os.walk(keyframe_directory):
            for file in tqdm(files, desc=f"Extracting features using {model_name}", unit="frame"):
                if file.endswith('.jpg'):
                    file_path = os.path.join(root, file)
                    img = cv2.imread(file_path)
                    
                    # YOLO Object Detection
                    objects = detect_objects_yolo(img)
                    
                    # CNN Feature Extraction
                    features = extract_features(model, preprocess_input, img)
                    
                    if features is not None:
                        video_id, frame_id = os.path.basename(root), os.path.splitext(file)[0]
                        feature_data = {
                            'video_id': video_id,
                            'frame_id': frame_id,
                            'model': model_name,
                            'features': features.tolist(),
                            'objects': objects
                        }
                        collection.insert_one(feature_data)
    except Exception as e:
        logging.error(f"Error processing keyframes: {e}")

def process_videos(keyframe_directory):
    process_keyframes(keyframe_directory, vgg_model, preprocess_input_vgg, 'VGG16')
    process_keyframes(keyframe_directory, resnet_model, preprocess_input_resnet, 'ResNet50')

logging.info("Starting feature extraction with YOLOv5 integration...")
process_videos(keyframe_dir)
logging.info("Feature extraction with YOLOv5 integration completed successfully.")
