# Dataset Analysis

Used to generate figures which explore and illustrate the composition of the train and test data sets.


In [None]:
import os
import sys
import xmltodict
from collections import OrderedDict
import imageio.v3 as iio
from pathlib import Path
from copy import deepcopy
from pprint import pprint

import numpy as np
import scipy.stats as stats
import pandas as pd

import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
import matplotlib.patches as mpatches

from tunamelt import REPO_PATH, log
from tunamelt.data import DataLoader, Dataset
from tunamelt.metrics import calc_box_area

# max number of frames per video
MAX_LEN = 300
FPS = 10.0

# sns.set_theme()

media_path = Path("./media")
os.makedirs(media_path, exist_ok=True)

In [None]:
# DATA PROCESSING FUNCTIONS


def labels_to_pos_frames(label):
    pos_frames = [0 for _ in range(label["video_length"])]
    for t in label["tracks"]:
        for f in t["frames"]:
            pos_frames[f["frame"] - label["start_frame"]] = 1

    return sum(pos_frames)


def average(x):
    return sum(x) / len(x)


def calc_avg_target_size(targets):
    target_sizes = []
    for target in targets:
        target_boxes = [frame["box"] for frame in target["frames"]]
        target_sizes.append(average([calc_box_area(box) for box in target_boxes]))
    return target_sizes


# POST PROCESS METHODS


def calc_target_density(targets, lengths):
    assert len(targets) == len(lengths)
    target_densities = [t / l for t, l in zip(targets, lengths)]
    return target_densities


def calc_statistics(lengths, targets, pos_frames):
    target_prob = (1 / len(lengths)) * sum([t / l for t, l in zip(pos_frames, lengths)])
    log.info(f"P(Target in Frame): {target_prob}")
    df = pd.Series(lengths)
    log.info(df.describe())
    log.info(f"N Targets in data split: {sum(targets)}")

In [None]:
label_path = f"{REPO_PATH}/data/PNNL-TUNAMELT/labels/cvat-video-1.1/train"
video_path = f"{REPO_PATH}/data/PNNL-TUNAMELT/mp4/train"
train_dataset = Dataset(videos=video_path, labels=label_path)
label_path = f"{REPO_PATH}/data/PNNL-TUNAMELT/labels/cvat-video-1.1/test"
video_path = f"{REPO_PATH}/data/PNNL-TUNAMELT/mp4/test"
test_dataset = Dataset(videos=video_path, labels=label_path)

train_data = train_dataset.aligned_data["train"]
test_data = test_dataset.aligned_data["test"]

In [None]:
train_lengths = []
train_n_targets = []
train_n_pos_frames = []
train_targets_by_size = []
for _, label in train_data:
    train_lengths.append(label["video_length"])
    train_n_targets.append(len(label["tracks"]))  # n targets in video
    # n frames with target in video
    train_n_pos_frames.append(labels_to_pos_frames(label))
    train_targets_by_size.append(calc_avg_target_size(label["tracks"]))

In [None]:
test_lengths = []
test_n_targets = []
test_n_pos_frames = []
test_targets_by_size = []
for _, label in test_data:
    test_lengths.append(label["video_length"])
    test_n_targets.append(len(label["tracks"]))  # n targets in video
    # n frames with target in video
    test_n_pos_frames.append(labels_to_pos_frames(label))
    test_targets_by_size.append(calc_avg_target_size(label["tracks"]))

### Video Lengths


In [None]:
fig = plt.figure()
ax = fig.add_subplot(111)

ax.set_xlabel("Number of Frames")
ax.set_ylabel("Count")
ax.hist(
    x=[train_lengths, test_lengths],
    bins=10,
    color=[sns.color_palette()[0], sns.color_palette()[1]],
    label=["Train", "Test"],
)

format = "svg"
ax.legend()
plt.savefig(f"{str(media_path)}/video_length.{format}", format=format)
plt.show()

log.info("-- Train --")
calc_statistics(train_lengths, train_n_targets, train_n_pos_frames)
log.info("-- Test --")
calc_statistics(test_lengths, test_n_targets, test_n_pos_frames)

### Create Size Distribution Plot


In [None]:
fig = plt.figure()
ax = fig.add_subplot(111)
ax2 = ax.twinx()
ax2.grid(linestyle="dotted", zorder=0)
# ax2.grid(False)

all_train_targets_size = [x for sub in train_targets_by_size for x in sub]
train_ecdfx = np.sort(all_train_targets_size)
train_ecdfy = np.arange(len(all_train_targets_size)) / float(
    len(all_train_targets_size)
)

all_test_targets_size = [x for sub in test_targets_by_size for x in sub]
test_ecdfx = np.sort(all_test_targets_size)
test_ecdfy = np.arange(len(all_test_targets_size)) / float(len(all_test_targets_size))

ax.plot(train_ecdfx, train_ecdfy, linestyle="dotted", color=sns.color_palette()[0])
ax.plot(test_ecdfx, test_ecdfy, linestyle="dotted", color=sns.color_palette()[1])

# only keep finite-valued indices for plotting and poly fitting
train_epdfy = np.gradient(train_ecdfy, train_ecdfx)
test_epdfy = np.gradient(test_ecdfy, test_ecdfx)
train_valid_idxs = np.isfinite(train_ecdfx) & np.isfinite(train_epdfy)
test_valid_idxs = np.isfinite(test_ecdfx) & np.isfinite(test_epdfy)

train_x = train_ecdfx[train_valid_idxs]
test_x = test_ecdfx[test_valid_idxs]
train_y = train_epdfy[train_valid_idxs]
test_y = test_epdfy[test_valid_idxs]

ax2.plot(train_x, train_y, color=sns.color_palette()[0])
ax2.plot(test_x, test_y, color=sns.color_palette()[1])

# train_fit = np.polyfit(train_x, train_y, 4)
# test_fit = np.polyfit(test_x, test_y, 4)
# train_poly = np.poly1d(train_fit)
# test_poly = np.poly1d(test_fit)
# ax2.plot(train_x, train_poly(train_x), color=sns.color_palette()[3])
# ax2.plot(test_x, test_poly(test_x), color=sns.color_palette()[4])

plt.legend(
    handles=[
        Line2D(
            [0], [0], color=sns.color_palette()[0], linestyle="solid", label="Train"
        ),
        Line2D([0], [0], color=sns.color_palette()[1], linestyle="solid", label="Test"),
    ]
)
ax.set_ylabel("eCDF")
ax2.set_ylabel("ePDF")
ax.set_xlabel("Bounding Box Area")
plt.xscale("log")
plt.savefig(f"{str(media_path)}/size_distribution.svg", format="svg")
plt.show()

# Bounding Box Area Statistics


In [None]:
log.info(len(train_targets_by_size))
df = pd.Series(all_train_targets_size)
log.info(df.describe())

log.info(len(test_targets_by_size))
df = pd.Series(all_test_targets_size)
log.info(df.describe())

# plt.boxplot(
#    x=[all_train_targets_size, all_test_targets_size],
#    labels=["Train", "Test"],
#    notch=False, # whether to do angled notch thing or not
#    sym="",
#    whis=(5.0,95.0),
#    widths=.40,
#    showmeans=True,
#    medianprops={'color': 'k'},
#    meanprops={'marker': 'o', 'markerfacecolor': 'k', 'markeredgecolor': 'k'}
# )

vplot = plt.violinplot(
    dataset=[all_train_targets_size, all_test_targets_size],
    vert=True,
    widths=0.40,
    showmeans=True,
    showmedians=True,
    showextrema=False,
    points=1000,
)

vplot["cmeans"].set_color("m")
vplot["cmedians"].set_color("y")
plt.yscale("log")
plt.ylabel("Target Size")
plt.grid(True, axis="y", which="both", linestyle="dotted")
plt.savefig(f"{str(media_path)}/violin_plot.png", format="png")

In [None]:
log.info(len(train_targets_by_size))
df = pd.Series(all_train_targets_size)
log.info(df.describe())

log.info(len(test_targets_by_size))
df = pd.Series(all_test_targets_size)
log.info(df.describe())

plt.boxplot(
    x=[all_train_targets_size, all_test_targets_size],
    labels=["Train", "Test"],
    notch=False,  # whether to do angled notch thing or not
    sym="",
    whis=(5.0, 95.0),
    widths=0.40,
    showmeans=True,
    medianprops={"color": "k"},
    meanprops={"marker": "o", "markerfacecolor": "k", "markeredgecolor": "k"},
)

plt.yscale("log")
plt.ylabel("Target Size")
plt.grid(True, axis="y", which="both", linestyle="dotted")
plt.savefig(f"{str(media_path)}/box_plot.png", format="png")