# filter by different prediction scores and merge the overlapping rally predictions

In [253]:
import dataclasses
from dataclasses import dataclass
from typing import List, Optional, Iterable
import numpy as np
import json
import pandas as pd
from matplotlib import pyplot as plt
import itertools

In [254]:
# origin
# thresholds_by_method = {
#     "Isodata": 0.3329599041272786,
#     "Li": 0.14066436881847458,
#     "Mean": 0.03914363852704741,
#     "Minimum": 0.24126479546077917,
#     "Otsu": 0.32968507881776077,
#     "Triangle": 0.0316759756516376,
#     "Yen": 0.07097387936585164,
#     "prediction score = ": 0.9,
# }

thresholds_by_method = {
    "Otsu ≈ Isodata": 0.32968507881776077,
    # "Isodata": 0.3329599041272786,
    "Li": 0.14066436881847458,
    "Mean ≈ Triangle": 0.03914363852704741,
    # "Triangle": 0.0316759756516376,
    "Minimum": 0.24126479546077917,
    "Yen": 0.07097387936585164,
    "prediction score = ": 0.9,
}

# thresholds_by_method = {
#     "prediction score": 0.6,
#     "prediction score 1": 0.65,
#     "prediction score 2": 0.7,
#     "prediction score 3": 0.75,
#     "prediction score 4": 0.8,
#     "prediction score 5": 0.85,
#     "prediction score 6": 0.9,
#     "prediction score 7": 0.95,
# }

score_thresholds = list(thresholds_by_method.values())
methods = list(thresholds_by_method.keys())


In [255]:
dfs_by_method = []
match_name = "ginting_axelsen"
source_video_fps = 25
with open(f'../data/output/badminton/{match_name}_detection_raw.json') as f:
    d = json.load(f)    

def ss_to_hhmmss(seconds):
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    seconds = int(seconds % 60)
    return f"{hours:02}:{minutes:02}:{seconds:02}"

In [256]:
for t in score_thresholds:
    pred_segs = []
    segs = d["results"][match_name]
    for seg in segs:
        pred_segs.append({"start": seg["segment"][0], "end": seg["segment"][1], "score": seg["score"]})
    df = pd.DataFrame(pred_segs)
    # filter
    df = df[df['score'] > t]
    df = df.reset_index()
    df = df.assign(start_hhmmss=df['start'].apply(ss_to_hhmmss))
    df = df.assign(end_hhmmss=df['end'].apply(ss_to_hhmmss))
    # merge
    merged_intervals = []

    # Start with the first interval
    current_start = df['start'].values[0]
    current_end = df['end'].values[0]

    # Iterate through the DataFrame rows
    for i in range(1, len(df)):
        row_start = df.loc[i, 'start']
        row_end = df.loc[i, 'end']

        # If the current interval overlaps with the previous one, merge them
        if row_start <= current_end:
            current_end = max(current_end, row_end)  # Update the end of the merged interval
        else:
            # No overlap, add the previous interval and start a new one
            merged_intervals.append([current_start, current_end])
            current_start = row_start
            current_end = row_end

    # Append the last interval
    merged_intervals.append([current_start, current_end])

    # Create a new DataFrame from the merged intervals
    merged_df = pd.DataFrame(merged_intervals, columns=['start', 'end'])
    merged_df = merged_df.assign(start_hhmmss=merged_df['start'].apply(ss_to_hhmmss))
    merged_df = merged_df.assign(end_hhmmss=merged_df['end'].apply(ss_to_hhmmss))
    dfs_by_method.append(merged_df)

# Get precision and recall for processed predictions by comparing with ground truth rallies

In [257]:
# compare each with the ground truth segments
# TODO: remove duplicate code
def ss_to_mmss(num_of_seconds) -> str:
    """Converts start_ss (in seconds) to minute:second format."""
    minutes = int(num_of_seconds // 60)  # Calculate minutes
    seconds = int(num_of_seconds % 60)  # Calculate remaining seconds
    return f"{minutes:02}:{seconds:02}"  # Format as mm:ss

@dataclass
class GroundTruthSegment:
    start_ss: float
    end_ss: float
    start_mmss: str
    end_mmss: str
    is_matched: bool
    score_board: str


@dataclass
class PredictionSegment:
    start_ss: float
    end_ss: float
    start_mmss: str
    end_mmss: str
    best_iou: float
    best_match_gt_segment: Optional[GroundTruthSegment]

def get_gts() -> List[GroundTruthSegment]:
    gts = []
    df_gt = pd.read_csv('data/RallySeg_GT.csv')
    df_gt['start_ss'] = df_gt['Start'] / source_video_fps
    df_gt['end_ss'] = df_gt['End'] / source_video_fps
    for s, e, score_board in zip(df_gt['start_ss'], df_gt['end_ss'], df_gt['Score']):
        gts.append(GroundTruthSegment(s, e, ss_to_mmss(s), ss_to_mmss(e), False, score_board))
    return gts

def get_preds(df_preds: pd.DataFrame) -> List[PredictionSegment]:
    pred_segments = []
    for s, e in zip(df_preds['start'], df_preds['end']):
        pred_segments.append(PredictionSegment(s, e, ss_to_mmss(s), ss_to_mmss(e), 0.0, None))
    return pred_segments


def find_best_iou(pred: PredictionSegment, gts: List[GroundTruthSegment]) -> (float, GroundTruthSegment):
    best_iou = 0.0
    best_match_gt_segment = None
    for gt in gts:
        iou = calc_iou(pred, gt)
        if iou > best_iou:
            best_iou = iou
            best_match_gt_segment = gt
    return best_iou, best_match_gt_segment

def calc_iou(pred: PredictionSegment, gt: GroundTruthSegment) -> float:
    inter_start = max(pred.start_ss, gt.start_ss)
    inter_end = min(pred.end_ss, gt.end_ss)

    # Calculate the length of the intersection
    intersection = max(0, inter_end - inter_start)

    # Calculate the start and end of the union
    union_start = min(pred.start_ss, gt.start_ss)
    union_end = max(pred.end_ss, gt.end_ss)

    # Calculate the length of the union
    union = union_end - union_start

    # Calculate IoU
    if union == 0:
        return 0.0  # Handle case when both segments are points
    iou = intersection / union
    return iou

def create_confusion_matrix(preds: List[PredictionSegment], gts: List[GroundTruthSegment], iou_threshold: float) -> (
        float, float, float):
    # assign iou to each pred segment
    for pred in preds:
        # which gt, for that the pred has the largest iou, the gt wil be then remove from gts
        best_iou, best_gt_match = find_best_iou(pred, gts)
        pred.best_iou = best_iou
        if best_gt_match:
            pred.best_match_gt_segment = dataclasses.replace(best_gt_match)
            gts.remove(best_gt_match)
    # count TP, FP, FN
    tp, fp, fn = 0, 0, 0
    for pred in preds:
        # true positive
        if pred.best_iou >= iou_threshold:
            tp += 1
        # false positive
        else:
            fp += 1
    fn = len(gts)
    # based on this tutorial: https://towardsdatascience.com/what-is-average-precision-in-object-detection-localization-algorithms-and-how-to-calculate-it-3f330efe697b
    # fn does not include preds that have iou smaller than threshold_iou
    # under_threshold = [p for p in preds if 0.0 < p.best_iou < iou_threshold]
    # fn = fn + len(under_threshold)
    return tp, fp, fn


def calc_precision_and_recall(tp, fp, fn):
    return tp / (tp + fp), tp / (tp + fn)

def calc_precisions_and_recalls(thresholds, preds: List[PredictionSegment]) -> (List[float], List[float]):
    precisions = []
    recalls = []
    for i, t in enumerate(thresholds):
        gts = get_gts()
        if i == 0:
            # for debug
            print(f"preds / gts len: {len(preds)}/{len(gts)}")
        tp, fp, fn = create_confusion_matrix(preds, gts, t)
        precision, recall = calc_precision_and_recall(tp, fp, fn)
        precisions.append(precision)
        recalls.append(recall)
    return precisions, recalls


# Draw the precision and recall curves

In [258]:
def get_preds_grundprojekt() -> List[PredictionSegment]:
    df_preds = pd.read_csv('data/ginting_axelsen_grundprojekt.csv')
    df_preds = df_preds[df_preds['pred_is_rally'] == 1]
    df_preds = df_preds.assign(start_ss=(df_preds['start'] / source_video_fps).round(2))
    df_preds = df_preds.assign(end_ss=(df_preds['end'] / source_video_fps).round(2))
    raw_pred_segments = []
    for s, e in zip(df_preds['start_ss'], df_preds['end_ss']):
        raw_pred_segments.append(PredictionSegment(s, e, ss_to_mmss(s), ss_to_mmss(e), 0.0, None))
    pred_segments = []
    merged_segment = None
    for i in range(len(raw_pred_segments) - 1):
        current = raw_pred_segments[i]
        next = raw_pred_segments[i + 1]
        if not merged_segment:
            merged_segment = dataclasses.replace(current)
        if current.end_ss == next.start_ss:
            merged_segment.end_ss = next.end_ss
        else:
            pred_segments.append(dataclasses.replace(merged_segment))
            merged_segment = None
     # the last predicted rally
    pred_segments.append(dataclasses.replace(merged_segment))
    return pred_segments

In [259]:
def draw_curves(curves_data: List[List[float]],
                thresholds: Iterable[float],
                legends: List[str], 
                score_thresholds: List[float],
                y_label: str,
                title: str):
    colors = itertools.cycle(plt.cm.tab10.colors)  # Use tab10 colormap for variety of colors

    # sort by the first precision
    sorted_data = sorted(zip([precisions[0] for precisions in curves_data], curves_data, legends, score_thresholds))

    plt.figure(figsize=(10, 6))

    # Iterate over each curve in sorted order
    for first_precision, precisions, legend, score_threshold in sorted_data:
        color = next(colors)
        label = legend if score_threshold is None else f"{legend} ({score_threshold:.2f})"
        plt.plot(thresholds, precisions, marker='o', color=color, label=label)  
        
        # Move the legend to the left of the first point with more space
        plt.text(thresholds[0] - 0.02, precisions[0], label, fontsize=9, ha='right', va='bottom', color=color)

    plt.xlabel("IoU thresholds")
    
    # Move y-axis label and ticks to the right
    plt.ylabel(y_label, labelpad=15)  # Add padding for better spacing
    plt.gca().yaxis.tick_right()
    plt.gca().yaxis.set_label_position("right")
    
    plt.title(title)
    plt.grid()
    plt.show()

In [None]:
# draw precision curves
thresholds_iou = np.arange(start=0.5, stop=0.91, step=0.1)
precisions_by_mehod = []
recalls_by_method = []
for df, method in zip(dfs_by_method, methods):
    print(f"method: {method}")
    precisions, recalls = calc_precisions_and_recalls(thresholds_iou, get_preds(df))
    precisions_by_mehod.append(precisions)
    recalls_by_method.append(recalls)

# grundprojekt results
precisions, recalls = calc_precisions_and_recalls(thresholds_iou, get_preds_grundprojekt())
precisions_by_mehod.append(precisions)
recalls_by_method.append(recalls)
methods.append("Grundprojekt")
score_thresholds.append(None)

title = "Precision by prediction score threshold"
draw_curves(curves_data=precisions_by_mehod,
            thresholds=thresholds_iou,
            legends=methods,
            score_thresholds=score_thresholds,
            y_label="Precision",
            title=title,
            )


In [None]:
# draw recall curves
title = "Recall by prediction score threshold"
draw_curves(curves_data=recalls_by_method,
            thresholds=thresholds_iou,
            legends=methods,
            score_thresholds=score_thresholds,
            y_label="Recall",
            title=title,
            )
for method, recall in zip(methods, recalls_by_method):
    print(f"{method}: {recall[-1]}")
