In [None]:
import csv
import os

import matplotlib.pyplot as plt
import numpy as np

from can_llamas_drive.process_videos import get_video_metadata


def get_crash_labels(filepath: str) -> dict[str, list[int]]:
    crash_metadata = get_video_metadata(filepath)
    crash_labels = {}
    for data in crash_metadata:
        crash_labels[data["name"]] = data["labels"].split(", ")
        crash_labels[data["name"]] = list(map(int, crash_labels[data["name"]]))

    return crash_labels


def get_stop_labels(directory: str) -> dict[str, list[int]]:
    paths = (
        (os.path.splitext(file_name)[0], os.path.join(directory, file_name))
        for file_name in os.listdir(directory)
        if os.path.isfile(os.path.join(directory, file_name))
    )
    stop_labels = {}
    for file_name, path in paths:
        with open(path) as file:
            csv_reader = csv.reader(file)
            stop_labels[file_name] = list(csv_reader)[0]
        stop_labels[file_name] = list(map(int, stop_labels[file_name]))

    return stop_labels


results = {
    "crash": "./output/llava-v1.5-13b/crash/csv",
    "normal": "./output/llava-v1.5-13b/normal/csv",
}

crash_labels = get_crash_labels("./datasets/CarCrash/videos/Crash-1500.txt")
stop_labels = get_stop_labels(results["crash"])

combined_labels = {}
for key, values in stop_labels.items():
    combined_labels[key] = {
        "stop": np.array(values),
        "crash": np.array(crash_labels[key]),
    }

delta = {}
for key, values in combined_labels.items():
    first_crash = np.argwhere(values["crash"])[0][0]
    first_stop = np.argwhere(values["stop"] == 1)
    first_stop = first_stop[0][0] if first_stop.any() else -1

    delta[key] = first_crash - first_stop if first_stop != -1 else None

bins = np.arange(-5.0, 5.1, 0.5)
x = np.array(list(delta.values()))
total = x.size
x = x[x != None].astype(int)
x = x / 10
counts, bins = np.histogram(x, bins)

true_positives = ((x > 0) & (x < 3)).sum()
false_positives_crash = (x > 3).sum()
false_negatives = total - true_positives - false_positives_crash

fig, ax = plt.subplots()
ax.hist(x, bins=bins)
ax.invert_xaxis()
ax.set_title("Distribution of initial triggers")
ax.set_xlabel("Time-to-collision (s)")
plt.show()

deltas = []

for labels in stop_labels.values():
    first_stop = np.argwhere(np.array(labels) == 1)
    first_stop = first_stop[0][0] if first_stop.any() else -1
    deltas.append(first_stop)

bins = np.arange(0, 5.1, 0.5)
x = np.array(deltas)
x = x[x != None].astype(int)
x = x / 10
counts, bins = np.histogram(x, bins)

fig, ax = plt.subplots()
ax.hist(x, bins=bins)
ax.set_title("Distribution of initial triggers")
ax.set_xlabel("Time (s)")
plt.show()

## Normal recordings

In [None]:
stop_labels = get_stop_labels(results["normal"])
deltas = []

for labels in stop_labels.values():
    first_stop = np.argwhere(np.array(labels) == 1)
    first_stop = first_stop[0][0] if first_stop.any() else None
    deltas.append(first_stop)

bins = np.arange(0, 5.1, 0.5)
x = np.array(deltas)
total = x.size
x = x[x != None].astype(int)
x = x / 10
counts, bins = np.histogram(x, bins)

false_positives_normal = x.size
true_negatives = total - false_positives_normal

false_positives = false_positives_crash + false_positives_normal

fig, ax = plt.subplots()
ax.hist(x, bins=bins)
ax.set_title("Distribution of initial triggers")
ax.set_xlabel("Time (s)")
plt.show()

In [None]:
print(
    f"tp: {true_positives} fp: {false_positives_crash} fn: {false_negatives} tn: {true_negatives}"
)