In [1]:
import os
import cv2
import numpy as np
from tqdm import tqdm
from utils import data_handler, frame_utils, metrics


import optuna
from optuna.integration.wandb import WeightsAndBiasesCallback
import wandb

In [2]:
# dir
XML_annotation_path =  "/home/mimo/Desktop/MS CV/C6/week_1/data/ai_challenge_s03_c010-full_annotation.xml"
video_path = "/home/mimo/Desktop/MS CV/C6/week_1/data/AICity_data/AICity_data/train/S03/c010/vdo.avi"
extracted_frame_dir = "/home/mimo/Desktop/MS CV/C6/week_1/data/extracted_frames/"

In [3]:
# annotation data
frame_with_movement_data = data_handler.parse_xml_annotations(
    xml_file=XML_annotation_path
)

In [4]:
train_frame_paths, test_frame_paths = data_handler.split_train_test_frames(
    data_dir=extracted_frame_dir
)

In [5]:
def train_gaussian(train_frame_paths,target_brightness=15, color_space="RGB"):
    train_frames = [cv2.imread(frame_path) for frame_path in tqdm(train_frame_paths)]

    # fix the illumination of all the frames
    train_frames = [
        frame_utils.adjust_brightness(frame, target_brightness=target_brightness)
        for frame in tqdm(train_frames)
    ]

    train_frames = [
        frame_utils.convert_color_space(frame, color_space) for frame in tqdm(train_frames)
    ]

    # Calculate the mean and variance across the color channels of the training frames
    mean = np.mean(train_frames, axis=(0, 1, 2))
    variance = np.var(train_frames, axis=(0, 1, 2))

    return mean, variance

In [6]:
from sklearn.metrics import precision_recall_fscore_support

def calculate_metrics(detections, ground_truth, iou_threshold=0.5):
    true_positives = 0
    false_positives = 0
    false_negatives = 0

    for det in detections:
        matched = False
        for ann in ground_truth:
            iou = metrics.calculate_iou(
                (
                    det["xtl"],
                    det["ytl"],
                    det["xbr"] - det["xtl"],
                    det["ybr"] - det["ytl"],
                ),
                (
                    ann["xtl"],
                    ann["ytl"],
                    ann["xbr"] - ann["xtl"],
                    ann["ybr"] - ann["ytl"],
                ),
            )
            if iou >= iou_threshold:
                true_positives += 1
                matched = True
                break

        if not matched:
            false_positives += 1

    false_negatives = len(ground_truth) - true_positives

    precision, recall, f1, _ = precision_recall_fscore_support(
        [1] * true_positives + [0] * false_positives,
        [1] * true_positives + [0] * false_negatives,
        average="binary",
        pos_label=1,
        zero_division=0,
    )

    return precision, recall, f1

In [7]:
def evaluation(
    train_frame_paths,
    test_frame_paths,
    frame_with_movement_data,
    color_space,
    alpha,
    rho=None,
    iou_threshold=0.5,target_brightness = 150
):
    if rho is None: 

        mean, variance = train_gaussian(train_frame_paths, color_space)

        print("::EXTRACTING FOREGROUND")
        foreground_masks = []
        for path in tqdm(test_frame_paths):
            frame = cv2.imread(path)
            # adjust brightness
            frame = frame_utils.adjust_brightness(
                frame, target_brightness=target_brightness
            )
            frame = frame_utils.convert_color_space(frame, color_space=color_space)
            # Calculate the absolute difference between the current frame and the model
            abs_diff = np.abs(frame - mean)
            # Classify as foreground if the difference exceeds the threshold (alpha * (variance + 2))
            threshold = alpha * (np.sqrt(variance) + 2)
            foreground_mask = abs_diff >= threshold
            foreground_masks.append(foreground_mask)

    else: 

        mean, variance = train_gaussian(train_frame_paths, color_space)
        print("::EXTRACTING FOREGROUND")
        foreground_masks = []
        for path in tqdm(test_frame_paths):
            frame = cv2.imread(path)

            # adjust brightness
            frame = frame_utils.adjust_brightness(
                frame, target_brightness=target_brightness
            )
            frame = frame_utils.convert_color_space(frame, color_space=color_space)
            # Calculate the absolute difference between the current frame and the model
            abs_diff = np.abs(frame - mean)
            # Classify as foreground if the difference exceeds the threshold (alpha * (variance + 2))
            threshold = alpha * (np.sqrt(variance) + 2)
            foreground_mask = abs_diff >= threshold
            foreground_masks.append(foreground_mask)

            # Get indices of background pixels
            background_indices = np.where(~foreground_mask)

            # Update the model for the background pixels
            mean[background_indices] = (
                rho * frame[background_indices] + (1 - rho) * mean[background_indices]
            )
            variance[background_indices] = (
                rho * ((frame[background_indices] - mean[background_indices]) ** 2)
                + (1 - rho) * variance[background_indices]
            )
    results = []
    for f_mask in tqdm(foreground_masks): 
        f_mask_gray = cv2.cvtColor(f_mask.astype(np.uint8), cv2.COLOR_BGR2GRAY)
        # Use connected components to extract individual detections
        num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(
            f_mask.astype(np.uint8), connectivity=8, ltype=cv2.CV_32S
        )

        detections = []
        # threshold to consider BBOX 
        min_detection_area = 150

        # Check if the area of the detection is considerably bigger than the corresponding annotation
        for i in range(1, num_labels):
            x, y, w, h, area = stats[i]
            if area < min_detection_area:
                continue

            # Append detection
            detections.append(
                {
                    "xtl": x,
                    "ytl": y,
                    "xbr": x + w,
                    "ybr": y + h,
                    "confidence": 1,
                }
            )

        # Calculate metrics for each frame
        precision, recall, f1 = calculate_metrics(
            detections, frame_with_movement_data[path], iou_threshold
        )
        results.append((precision, recall, f1))

        # Update the model for the background pixels if rho is specified
        if rho is not None:
            background_indices = np.where(~foreground_mask)
            mean[background_indices] = (
                rho * frame[background_indices] + (1 - rho) * mean[background_indices]
            )
            variance[background_indices] = (
                rho * ((frame[background_indices] - mean[background_indices]) ** 2)
                + (1 - rho) * variance[background_indices]
            )

    # Calculate mean average precision (mAP) across all frames
    mAP = np.mean([precision for precision, _, _ in results])
    # Calculate mean F1 score across all frames
    mean_F1 = np.mean([f1 for _, _, f1 in results])

    return mAP, mean_F1

In [9]:
evaluation(
    train_frame_paths=train_frame_paths,
    test_frame_paths=test_frame_paths,
    frame_with_movement_data=frame_with_movement_data,
    color_space="RGB",
    alpha=4,
)

In [None]:
available_colorspace = [
    "RGB",
    "HSV",
    "Lab",
    "YCrCb",
    "XYZ",
    "Luv",
    "HLS",
    "YUV",
    "GRAY",
]
available_alpha = [i for i in range(1, 31, 5)]  
available_rho = [i / 5 for i in range(0, 10)]  
available_target_brightness = list(
    range(100, 201, 20)
)  

In [None]:
# RANDOM SEARCH
import random
from tabulate import tabulate

# Define a set to keep track of used combinations
used_combinations = set()


def objective():
    selected_colorspace = random.choice(available_colorspace)
    selected_alpha = random.choice(available_alpha)
    selected_rho = random.choice(available_rho)
    selected_target_brightness = random.choice(available_target_brightness)

    # Check if the combination is already used, if yes, select a new one
    while (
        selected_colorspace,
        selected_alpha,
        selected_rho,
        selected_target_brightness,
    ) in used_combinations:
        selected_colorspace = random.choice(available_colorspace)
        selected_alpha = random.choice(available_alpha)
        selected_rho = random.choice(available_rho)
        selected_target_brightness = random.choice(available_target_brightness)

    used_combinations.add(
        (selected_colorspace, selected_alpha, selected_rho, selected_target_brightness)
    )

    try:
        ap, f1_score_max = evaluation(
            train_frame_paths=train_frame_paths,
            test_frame_paths=test_frame_paths,
            frame_with_movement_data=frame_with_movement_data,
            color_space="RGB",
            alpha=4,
        )

        
        data = [
            ["Colorspace", selected_colorspace],
            ["Alpha", selected_alpha],
            ["Rho", selected_rho],
            ["Target Brightness", selected_target_brightness],
            ["MAP", f"{ap * 100 if ap is not None else 0}"],
            ["F1 Score Max", f"{f1_score_max * 100 if f1_score_max is not None else 0}"]
        ]

        # Print the data in table format
        print(tabulate(data, headers=["Parameter", "Value"]))

        return ap * 100
    except:
        pass


num_trials = 30
for i in range(num_trials):
    print(f"Trial {i+1}/{num_trials}:")
    objective()  
    print("----------------------")