In [None]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

import torch

from detection.testing import _get_checkpoint_filename, _get_results_folder
from detection.visualization import show_loss_history

from train_configurations.utils import get_standard_test_dataset
from train_configurations import (regnet_y_400mf, regnet_y_800mf,
                                  tracknet_v2, tracknet_v2_2f,
                                  tracknet_v2_4f, tracknet_v2_6f,
                                  tracknet_v2_rnn, tracknet_v2_rnn_scheduler)

plt.style.use('default')

figures_folder = '../TFM/pictures'
# figures_folder = "C:/Users/calva/Documents/Uni/Master AI/Tesi/TFM/pictures"

if not os.path.exists(figures_folder):
    os.makedirs(figures_folder)

%load_ext autoreload
%autoreload 2

# sequence of train configurations
training_configurations = [regnet_y_400mf, regnet_y_800mf,
                           tracknet_v2_2f, tracknet_v2,
                           tracknet_v2_4f, tracknet_v2_6f,
                           tracknet_v2_rnn, tracknet_v2_rnn_scheduler]


# get the name of the train configuration as string
def tc_name(training_configuration):
    """Return the name of the train configuration as string"""
    name = training_configuration.__name__.split('.')[-1]
    if name == 'tracknet_v2':
        name = 'tracknet_v2_3f'
    return name

# Detection

## Architectures and train configuration explanations

Table for the architectures

In [None]:
table = []
n = [3, 3, 2, 3, 4, 6, 1, 1]
h = [0, 0, 0, 0, 0, 0, 3, 3]
epochs = [40, 40, 20, 42, 23, 20, 35, 35]
batch_size = [16, 16, 8, 8, 8, 8, 8, 8]
for i, tc in enumerate(training_configurations):
    d = {'configuration': tc_name(tc),
         'n': n[i],
         'h': h[i],
         'epochs': epochs[i],
         'batch size': batch_size[i]
         }
    table.append(d)

df = pd.DataFrame(table)
display(df)
print(df.to_latex(index=False))


## Examples

First let's define some utility functions:

In [None]:
def get_trained_model(training_configuration, training_phase=None):
    """Get the model of the provided train configuration with the final weights."""
    config = training_configuration.Config()

    checkpoint_folder = config._checkpoint_folder
    if training_phase is not None:
        checkpoint_folder = os.path.join(checkpoint_folder, training_phase)

    results_folder = _get_results_folder(checkpoint_folder, None)
    checkpoint_path = os.path.join(results_folder, _get_checkpoint_filename(checkpoint_folder))

    model = config.get_model()
    model.eval()
    model.load(checkpoint_path, device='cpu')

    return get_standard_test_dataset(training_configuration, 'prova', is_rnn=training_phase is not None), model


def get_output(model, dataset, i):
    """Get the model output for the given dataset element."""
    frame, _ = dataset[i]

    with torch.no_grad():
        output = model(frame.to(torch.float32).unsqueeze(0)).numpy().squeeze()
    return output


def get_frame(dataset, i):
    """Get the last input frame of the given dataset element element."""
    frame, _ = dataset[i]
    return frame[-3:].to(torch.float32).numpy().transpose(1, 2, 0)

Here we visualize an example of input frames and target heatmap. In this example, we will use the ``tracknet_v2_2f`` training configuration.

In addition, the model output is also shown

In [None]:
# load model from the desired train configuration
dataset, model = get_trained_model(tracknet_v2_2f)

dataset_element = 15
frames, heatmap = dataset[dataset_element]

frames = frames.to(torch.float32)
frames = frames.numpy().transpose(1, 2, 0)
frames = [frames[:,:,3*i:3*(i+1)] for i in range(2)]

heatmap = heatmap.to(torch.float32).squeeze().numpy()

heatmap_pred = get_output(model, dataset, dataset_element)

In [None]:
w, h, dpi = 2*640, 2*360, 100
fig, axs = plt.subplots(figsize=(w/dpi, h/dpi), dpi=dpi, nrows=2, ncols=2)
axs = axs.ravel()

for i, (frame, ax) in enumerate(zip(frames, axs)):
    ax.set_title(f'Frame {i+1}')
    ax.imshow(frame)

    x = dataset._label_df['x'][i+dataset_element]
    y = dataset._label_df['y'][i+dataset_element]
    ax.scatter(x*frame.shape[1], y*frame.shape[0], zorder=100, facecolors='none', edgecolors='y', linewidths=3, s=150)

axs[2].imshow(frames[-1])
axs[2].imshow(heatmap, alpha=0.7, cmap='gray', vmin=0, vmax=1)
axs[2].set_title('Target heatmap')

axs[3].imshow(frames[-1])
axs[3].imshow(heatmap_pred, alpha=0.7, cmap='gray', vmin=0, vmax=1)
axs[3].set_title('Predicted heatmap')

fig.tight_layout()

fig.savefig(os.path.join(figures_folder, 'sample_input_a.png'))

plt.show()

### Output for all models

Here we visualize the output of all the various models.

Visualize the output heatmap for the various tracknet variants superimposed on the last frame of the input sequence

In [None]:
for tc in training_configurations[2:]:
    print(tc_name(tc))
    sample = 18
    if 'rnn' in tc_name(tc):
        training_phase = 'phase_3_0'
    else:
        training_phase = None
        sl = tc.Config()._sequence_length
        sample -= sl

    dataset, model = get_trained_model(tc, training_phase)
    frame = get_frame(dataset, sample)
    output =  get_output(model, dataset, sample)

    w, h, dpi = 640, 360, 120
    fig, ax = plt.subplots(figsize=(w/dpi, h/dpi), dpi=dpi)

    ax.imshow(frame)
    ax.imshow(output, alpha=0.7, cmap='gray', vmin=0, vmax=1)

    ax.set_axis_off()
    fig.tight_layout(pad=0)

    fig.savefig(os.path.join(figures_folder, f"{tc_name(tc)}_example.png"))

    plt.show()

## Loss history

Regnet and Standard configurations

In [None]:
for tc in training_configurations[:-2]:
    w, h, dpi = 640, 360, 130
    fig, ax = plt.subplots(figsize=(w/dpi, h/dpi), dpi=dpi)

    ax.set_yscale('log')
    ax = show_loss_history(tc, ax=ax)

    ax.set_title(tc_name(tc))
    fig.tight_layout()

    fig.savefig(os.path.join(figures_folder, f'{tc_name(tc)}_loss.pdf'))
    plt.show()

Recurrent architecture

In [None]:
for tc in training_configurations[-2:]:
    phases = np.array([0, 10, 15, 20, 25, 30, 35]) + 0.5

    w, h, dpi = 640, 360, 130
    fig, ax = plt.subplots(figsize=(w/dpi, h/dpi), dpi=dpi)

    ax = show_loss_history(tc, ax=ax, epoch_range=(1, 35))

    xlim = ax.get_xbound()
    ylim = ax.get_ybound()

    for i, p in enumerate(phases[1:-1]):
        ax.plot([p,p], [ylim[0], ylim[1]], 'k--', linewidth=0.5 if i%2==0 else 0.7)

    ax.set_ylim(ylim)

    bbox = {'boxstyle': 'round',
            'facecolor': 'w',
            'edgecolor': 'None',
            'alpha': 1}

    for i, c in enumerate(['r', 'g', 'b']):
        t = 0.08
        xpos = 10*i+6 if i==0 else 10*i+8
        ax.annotate(f'Phase {i+1}', [xpos, t*ylim[1]+(1-t)*ylim[0]], bbox=bbox)
        ax.fill_between([phases[2*i], phases[2*(i+1)]], ylim[0], ylim[1], color=c, alpha=0.15)

    ax.legend(loc='upper right', framealpha=1)
    ax.set_title(tc_name(tc))

    fig.tight_layout()
    fig.savefig(os.path.join(figures_folder, f'{tc_name(tc)}_loss.pdf'))
    plt.show()

## Analyze results

Compute the position error from the csv outputted from model testing

First define the utility function ``get_output_df`` to get a dataframe from the csv file:

In [None]:
def get_output_df(training_configuration, training_phase=None):
    config = training_configuration.Config()

    checkpoint_folder = config._checkpoint_folder
    if training_phase is not None:
        checkpoint_folder = os.path.join(checkpoint_folder, training_phase)

    results_folder = _get_results_folder(checkpoint_folder, None)
    return pd.read_csv(os.path.join(results_folder, 'output_val.csv'))

Build a dataframe for each train configuration. Take the last training phase for the RNN variants

In [None]:
df_out = []
for tc in training_configurations:
    phase = 'phase_3_0' if 'rnn' in tc_name(tc) else None
    df_out.append(get_output_df(tc, phase))

Compute the position error as the magnitude of the difference between the true position and the estimated position

In [None]:
image_size = (360, 640)

for df in df_out:
    x_true, x_pred, y_true, y_pred = [df[k].values for k in ['x_true', 'x_pred', 'y_true', 'y_pred']]
    error = (np.sqrt((image_size[1] * (x_true-x_pred))**2 + (image_size[0] * (y_true-y_pred))**2))
    df['error'] = error

### Error histograms

Here we visualize the error distributions for the various models

Summary dataframe with most important statistics of the error:
 - mean
 - standard deviation
 - median
 - percentage with error < 1 px
 - percentage with error < 5 px
 - percentage with error < 10 px

In [None]:
thresholds = [1, 5, 10] #error strictly smaller than the threshold

summary = []
for tc, df in zip(training_configurations, df_out):
    result = {'train_configuration': tc_name(tc),
              'mean': np.mean(df['error']),
              'std': np.std(df['error']),
              'median': np.median(df['error'])}

    hist, bins = np.histogram(df['error'], bins=np.arange(np.max(df['error'])), density=True)
    for t in thresholds:
        result[f'{t} px'] = hist[:t].sum()

    summary.append(result)

df_summary = pd.DataFrame(summary)
df_summary

In [None]:
df_s2 = df_summary.copy()
for k in df_s2.columns:
    if 'px' in k:
        df_s2[k] *= 100
    else:
        def to_apply(x):
            if type(x) is float:
                return f'{x:.3g}'
            else:
                return x
        df_s2[k] = df_s2[k].apply(to_apply)

print(df_s2.to_latex(index=False, float_format=lambda x: f'{x:.2f}'))

Visualize error distribution histograms (one ifigure per configuration)

In [None]:
def print_summary(summary: dict):
    for k, v in summary.items():
        print_v = v
        if 'px' in k:
            print_v = f'{100*v:.2g}%'
        elif k!='train_configuration':
            print_v = f'{v:.2g}'
        print(f"{k}: {print_v}")


hist_range = thresholds[-1]

for tc, df, s in zip(training_configurations, df_out, summary):
    print_summary(s)

    w, h, dpi = 640, 480, 120
    fig, ax = plt.subplots(figsize=(w/dpi, h/dpi), dpi=dpi)

    ax.hist(df['error'].clip(upper=hist_range), bins=np.arange(hist_range+2), density=True, align='left', rwidth=0.8)

    # legend
    for k in [k for k in s.keys() if 'px' in k]:
        ax.plot([], [], ' ', label='error < ' + f'{k}:'.rjust(6) + f'{100*s[k]:.1f}%'.rjust(6) + ' ')

    for k in ['median', 'mean', 'std']:
        ax.plot([], [], ' ', label=f'{k}:'.ljust(14) + f'{s[k]:.1f}'.rjust(5) + ' ')

    ax.legend(handlelength=0, prop={'family': 'monospace'}, loc='upper center')

    # set title and axis labels
    ax.set_title(tc_name(tc))
    ax.set_xlabel('Error magnitude')
    ax.set_ylabel('Occurrence')

    # set axis limits
    ax.set_ylim(0, 1)

    fig.tight_layout()
    fig.savefig(os.path.join(figures_folder, f'{tc_name(tc)}_error.pdf'))

    plt.show()

Visualize error distribution histograms (all in one figure)

In [None]:
hist_range = thresholds[-1]

w, h, dpi = 1200, 600, 100
fig, axs = plt.subplots(figsize=(w/dpi, h/dpi), dpi=dpi, nrows=2, ncols=4)

axs = axs.T.ravel()

for i, (tc, df, s, ax) in enumerate(zip(training_configurations, df_out, summary, axs)):
    ax.hist(df['error'].clip(upper=hist_range), bins=np.arange(hist_range+2), density=True, align='left', rwidth=0.8)

    # legend
    for k in [k for k in s.keys() if 'px' in k]:
        ax.plot([], [], ' ', label='error < ' + f'{k}:'.rjust(6) + f'{100*s[k]:.1f}%'.rjust(6) + ' ')

    for k in ['median', 'mean', 'std']:
        ax.plot([], [], ' ', label=f'{k}:'.ljust(14) + f'{s[k]:.1f}'.rjust(5) + ' ')

    ax.legend(handlelength=0, prop={'family': 'monospace'}, loc='upper center')

    # set title and axis labels
    ax.set_title(tc_name(tc))
    if i<2:
        ax.set_ylabel('Occurrence')
    else:
        ax.set_yticklabels([])

    if i%2==1:
        ax.set_xlabel('Error magnitude')
    else:
        ax.set_xticklabels([])

    # set axis limits
    ax.set_ylim(0, 1)

fig.tight_layout()
fig.savefig(os.path.join(figures_folder, f'errors.pdf'))

plt.show()

### Error as function of maximum heatmap value

Here we do a scatterplot of the error and the maximum heatmap value

In [None]:
from scipy.optimize import curve_fit

def linear(x, a, b):
    return a*x + b

def quadratic(x, a, b, c):
    return a*x*x + b*x + c

for tc, df in zip(training_configurations[2:], df_out[2:]):
    w, h, dpi = 640, 360, 120
    fig, ax = plt.subplots(figsize=(w/dpi, h/dpi), dpi=dpi)

    # sns.histplot(x=df['max_values'], y=df['error'], bins=(np.linspace(0, 1, 50), np.arange(15)), ax=ax, cbar=True)
    ax.scatter(x=df['max_values'], y=df['error'], alpha=0.5)

    idx = df['max_values'] > 0.2
    if idx.values.sum() > 0:
        params, cov = curve_fit(linear, df['max_values'][idx], df['error'][idx])
        x = np.linspace(0, 1, 2)
        plt.plot(x, linear(x, *params), 'k-')

        params, cov = curve_fit(quadratic, df['max_values'][idx], df['error'][idx])
        x = np.linspace(0, 1, 101)
        plt.plot(x, quadratic(x, *params), 'r-')

    ax.set_xlabel('max heatmap value')
    ax.set_ylabel('position error')

    ax.set_xlim(-0.05, 1.05)
    ax.set_title(tc_name(tc))

    plt.show()

# Trajectories fitting

In [None]:
from trajectories.data_reading import get_candidates, get_frame, get_heatmap

from trajectories.fitting import fit_trajectories
from trajectories.filtering import build_trajectory_graph, find_shortest_paths, build_path_mapping
from trajectories.visualization import create_trajectory_video, visualize_trajectory_graph, show_neighboring_trajectories

## Trajectory graph example

In [None]:
starting_frame, candidates, n_candidates, values = get_candidates(tracknet_v2)
frame_sequence = list(range(starting_frame, starting_frame + len(candidates)))

fitting_info = fit_trajectories(candidates, n_candidates, starting_frame)
trajectory_graph = build_trajectory_graph(fitting_info)
shortest_paths = find_shortest_paths(trajectory_graph)
path_mapping = build_path_mapping(fitting_info, shortest_paths)

In [None]:
ax = visualize_trajectory_graph(trajectory_graph, shortest_paths[0][0], 889)
fig = ax.figure
fig.savefig(f'graph_example.pdf')
plt.close(fig)

## Fit examples

In [None]:
from IPython.display import clear_output

dpi = 150
dark_mode = False

Big examples for success and failure

In [None]:
# success example

sf = 995
num_prev = 4
num_next = 3
filename = f'trajectories/example.png'
fig, ax = create_trajectory_video(tracknet_v2, os.path.join(figures_folder, filename),
                                  fitting_info=fitting_info, path_mapping=path_mapping,
                                  starting_frame=sf,
                                  dark_mode=dark_mode, dpi=dpi,
                                  num_prev=num_prev, num_next=num_next,
                                  num_frames=0)
plt.close(fig)

# fail example
sf = 1501
num_prev=2
num_next=3

filename = f'trajectories/example_fail.png'
fig, ax = create_trajectory_video(tracknet_v2, os.path.join(figures_folder, filename),
                                  fitting_info=fitting_info, path_mapping=path_mapping,
                                  starting_frame=sf,
                                  dark_mode=dark_mode, dpi=dpi,
                                  num_prev=num_prev, num_next=num_next,
                                  num_frames=0)
plt.close(fig)

clear_output()

Shot example (near team)

In [None]:
for sf in [982, 990, 991, 995]:
    if sf<=990:
        num_prev = 3
        num_next = 4
    else:
        num_prev = 4
        num_next = 3
    filename = f'trajectories/{sf}.png'
    fig, ax = create_trajectory_video(tracknet_v2, os.path.join(figures_folder, filename),
                                      fitting_info=fitting_info, path_mapping=path_mapping,
                                      starting_frame=sf,
                                      dark_mode=dark_mode, dpi=dpi,
                                      num_prev=num_prev, num_next=num_next,
                                      num_frames=0)
    plt.close(fig)
    clear_output()

Bounce example (floor)

In [None]:
for sf in [1005, 1006, 1015]:
    if sf<=1005:
        num_prev = 3
        num_next = 4
    else:
        num_prev = 4
        num_next = 3
    filename = f'trajectories/{sf}.png'
    fig, ax = create_trajectory_video(tracknet_v2, os.path.join(figures_folder, filename),
                                      fitting_info=fitting_info, path_mapping=path_mapping,
                                      starting_frame=sf,
                                      dark_mode=dark_mode, dpi=dpi,
                                      num_prev=num_prev, num_next=num_next,
                                      num_frames=0)
    plt.close(fig)
    clear_output()

Shot example (far team)

In [None]:
for sf in [1023, 1024]:
    if sf<=1005:
        num_prev = 3
        num_next = 4
    else:
        num_prev = 4
        num_next = 3
    filename = f'trajectories/{sf}.png'
    fig, ax = create_trajectory_video(tracknet_v2, os.path.join(figures_folder, filename),
                                      fitting_info=fitting_info, path_mapping=path_mapping,
                                      starting_frame=sf,
                                      dark_mode=dark_mode, dpi=dpi,
                                      num_prev=num_prev, num_next=num_next,
                                      num_frames=0)
    plt.close(fig)
    clear_output()

for sf in [1537, 1546, 1548, 1554]:
    if sf<=1546:
        num_next = 4
        if sf==1537:
            num_prev = 2
        else:
            num_prev = 3
    else:
        num_prev = 4
        num_next = 3
    filename = f'trajectories/{sf}.png'
    fig, ax = create_trajectory_video(tracknet_v2, os.path.join(figures_folder, filename),
                                      fitting_info=fitting_info, path_mapping=path_mapping,
                                      starting_frame=sf,
                                      dark_mode=dark_mode, dpi=dpi,
                                      num_prev=num_prev, num_next=num_next,
                                      num_frames=0)
    plt.close(fig)
    clear_output()

Failure example

In [None]:
for sf in [1494, 1495, 1498, 1500, 1501]:
    filename = f'trajectories/fail_{sf}.png'

    display='k_min k_max params'
    num_next=3
    num_prev=2
    if sf == 1494:
        display='k_min params'
        display_prev = None
        num_next=4
        num_prev=1
    else:
        display_prev = 'k_max'

    fig, ax = create_trajectory_video(tracknet_v2, os.path.join(figures_folder, filename),
                                      fitting_info=fitting_info, path_mapping=path_mapping,
                                      starting_frame=sf,
                                      dark_mode=dark_mode, dpi=dpi,
                                      num_next=num_next, num_prev=num_prev,
                                      alpha_prev=1, alpha_next=1,
                                      display=display, display_prev=display_prev, display_next='k_min',
                                      show_outside_range=True,
                                      num_frames=0)
    clear_output()
    # plt.show()
    plt.close(fig)