# Setup

**Imports**

In [None]:
import numpy as np
from scipy.signal import argrelmax
import matplotlib.pyplot as plt
from loadmydata.load_human_locomotion import (
    load_human_locomotion_dataset,
    get_code_list,
)

In [None]:
THRESHOLD_IoU = 0.75

**Utility functions**

In [None]:
def _check_step_list(step_list):
    """Some sanity checks."""
    for step in step_list:
        assert len(step) == 2, f"A step consists of a start and an end: {step}."
        start, end = step
        assert start < end, f"start should be before end: {step}."

In [None]:
def inter_over_union(interval_1, interval_2):
    """Intersection over union for two intervals."""
    a, b = interval_1
    c, d = interval_2
    intersection = max(0, min(b, d) - max(a, c))
    if intersection > 0:
        union = max(b, d) - min(a, c)
    else:
        union = (b - a) + (d - c)
    return intersection / union

In [None]:
def _step_detection_precision(step_list_true, step_list_pred):
    """Precision is the number of correctly predicted steps divided by the number of predicted
    steps. A predicted step is counted as correct if it overlaps an annotated step (measured by the
    "intersection over union" metric) by more than 75%.
    Note that an annotated step can only be detected once. If several predicted steps correspond
    to the same annotated step, all but one are considered as false.
    Here, precision is computed on a single prediction task (all steps correspond to the same
    signal).
    The lists y_true_ and y_pred are lists of steps, for instance:
        - step_list_true: [[357, 431], [502, 569], [633, 715], [778, 849], [907, 989]]
        - step_list_pred: [[293, 365], [422, 508], [565, 642], [701, 789]]
    Arguments:
        step_list_true {List} -- list of true steps
        step_list_pred {List} -- list of predicted steps
    Returns:
        float -- precision, between 0.0 and 1.0
    """
    _check_step_list(step_list_pred)

    if len(step_list_pred) == 0:  # empty prediction
        return 0.0

    n_correctly_predicted = 0
    detected_index_set = set()  # set of index of detected true steps
    for step_pred in step_list_pred:
        for (index, step_true) in enumerate(step_list_true):
            if (index not in detected_index_set) and (
                inter_over_union(step_pred, step_true) > THRESHOLD_IoU
            ):
                n_correctly_predicted += 1
                detected_index_set.add(index)
                break
    return n_correctly_predicted / len(step_list_pred)

In [None]:
def _step_detection_recall(step_list_true, step_list_pred):
    """Recall is the number of detected annotated steps divided by the total number of annotated
    steps. An annotated step is counted as detected if it overlaps a predicted step (measured by
    the "intersection over union" metric) by more than 75%.
    Note that an annotated step can only be detected once. If several annotated steps are detected
    with the same predicted step, all but one are considered undetected.
    Here, recall is computed on a single prediction task (all steps correspond to the same
    signal).
    The lists y_true_ and y_pred are lists of steps, for instance:
        - step_list_true: [[357, 431], [502, 569], [633, 715], [778, 849], [907, 989]]
        - step_list_pred: [[293, 365], [422, 508], [565, 642], [701, 789]]
    Arguments:
        step_list_true {List} -- list of true steps
        step_list_pred {List} -- list of predicted steps
    Returns:
        float -- recall, between 0.0 and 1.0
    """
    _check_step_list(step_list_pred)

    n_detected_true = 0
    predicted_index_set = set()  # set of indexes of predicted steps

    for step_true in step_list_true:
        for (index, step_pred) in enumerate(step_list_pred):
            if (index not in predicted_index_set) and (
                inter_over_union(step_pred, step_true) > THRESHOLD_IoU
            ):
                n_detected_true += 1
                predicted_index_set.add(index)
                break
    return n_detected_true / len(step_list_true)

In [None]:
def f1_score_step_detection(y_true, y_pred) -> float:
    """
    Calculate f-score (geometric mean between precision and recall) for each instance (each
    signal) and return the weighted average over instances.
    The lists y_true_ and y_pred are lists of lists of steps, for instance:
        - y_true: [[[907, 989]] [[357, 431], [502, 569]], [[633, 715], [778, 849]]]
        - y_pred: [[[293, 365]], [[422, 508], [565, 642]], [[701, 789]]]
    Arguments:
        y_true {List} -- true steps
        y_pred {List} -- predicted steps
    Returns:
        float -- f-score, between 0.0 and 1.0
    """
    # to prevent throwing an exception when passing empty lists
    if len(y_true) == 0:
        return 0

    fscore_list = list()

    for (step_list_true, step_list_pred) in zip(y_true, y_pred):
        prec = _step_detection_precision(step_list_true, step_list_pred)
        rec = _step_detection_recall(step_list_true, step_list_pred)
        if prec + rec < 1e-6:
            fscore_list.append(0.0)
        else:
            fscore_list.append((2 * prec * rec) / (prec + rec))

    return np.mean(fscore_list)

In [None]:
def sparsify_codes(z_1D, atom_length: int):
    """Set to zero codes that are too close to each others.
    
    Keep only the (local) maximum code and set to zero codes that are less than
    `atom_length` away.
    `z_1D` is assumed to be univariate.
    """
    res = np.zeros_like(z_1D)
    argmax_indexes, = argrelmax(z_1D, order=atom_length)
    res[argmax_indexes] = z_1D[argmax_indexes]
    return res


def sparse_codes_to_list_of_steps(z_1D, atom_length: int):
    """Return a list of steps from a 1D activation vector."""
    z_1D_sparser = sparsify_codes(z_1D=z_1D, atom_length=atom_length)
    start_array, = np.nonzero(z_1D_sparser)
    end_array = start_array + atom_length
    return np.c_[start_array, end_array].tolist()

In [None]:
def fig_ax(figsize=(15, 5)):
    fig, ax = plt.subplots(figsize=figsize)
    ax.autoscale(enable=True, axis='x', tight=True)
    return fig, ax

# Convolutional dictionary learning (CDL)

## Data

This data set consists of signals collected with inertial measurement units (accelerometer+gyroscope), from 230 subjects undergoing a fixed protocol:
- standing still,
- walking 10 m,
- turning around,
- walking back,
- stopping.

In this assignment, we only consider the vertical acceleration of the left foot and all signals are truncated to 20 seconds (as a result, they all have same length). Signals are sampled at 100 Hz.

The measured population is composed of healthy subjects as well as patients with neurological or orthopedic disorders.

The start and end time stamps of thousands of footsteps are available.

The data are part of a larger data set described in [1].

[1] Truong, C., Barrois-Müller, R., Moreau, T., Provost, C., Vienne-Jumeau, A., Moreau, A., Vidal, P.-P., Vayatis, N., Buffat, S., Yelnik, A., Ricard, D., & Oudre, L. (2019). A data set for the study of human locomotion with inertial measurements units. Image Processing On Line (IPOL), 9.

The following cell defines the training set `(X_train, y_train)` and testing set `(X_test, y_test)`.

In [None]:
subset_indexes_train = [851, 428, 739, 621, 147, 281, 95, 619, 441, 149, 951, 803, 214, 754, 34, 516, 684, 514, 465, 675, 654, 665, 297, 217, 618, 37, 954, 888, 630, 839, 897, 146, 559, 896, 941, 93, 658, 674, 78, 498, 575, 525, 36, 313, 300, 710, 56, 460, 397, 943]
subset_indexes_test = [683, 259, 59, 387, 634, 611, 87, 201, 86, 849, 538, 962, 205, 15, 883, 42]

code_list = get_code_list()

X_train = list()  # list of signals
y_train = list()  # list of list of steps (the "labels")
list_of_pathologies_train = list()

X_test = list()  # list of signals
y_test = list()  # list of list of steps (the "labels")
list_of_pathologies_test = list()


for (X, y, list_of_pathologies, subset_indexes) in zip([X_train, X_test], [y_train, y_test], [list_of_pathologies_train, list_of_pathologies_test], [subset_indexes_train, subset_indexes_test]):
    for code in np.take(code_list, subset_indexes):
        single_trial = load_human_locomotion_dataset(code)
        signal = single_trial.signal.LAZ.to_numpy()  # keeping only one dimension (from the left sensor)
        steps = single_trial.left_steps
        X.append(signal[:2000])  # truncate signals to have the same length 
        y.append(steps[(steps<2000).prod(axis=1).astype(bool)])
        list_of_pathologies.append(single_trial.metadata["PathologyGroup"])
    
X_train = np.asarray(X_train)
X_test = np.asarray(X_test)

Display one signal. Notice the repetitive patterns: those are the footsteps to detect.

In [None]:
ind = 45  # choose a signal
(signal, steps, pathology) = X_train[ind], y_train[ind], list_of_pathologies_train[ind]

# plotting and saving the figure
fig, ax = fig_ax()
ax.plot(signal)
_ = ax.set_title(f"Pathology group: {pathology}")

## Question 3

In [None]:
from alphacsc import learn_d_z
from alphacsc.utils import construct_X

In [None]:
ind = 45  # choose a signal
(signal, steps, pathology) = X_train[ind], y_train[ind], list_of_pathologies_train[ind]

### Change this part (at least)
reconstruction = np.zeros(signal.shape)  # add your own reconstruction
mse = 0
###

# plotting and saving the figure
fig, ax = fig_ax()
ax.set_title(f"MSE: {mse:.2f}")
ax.plot(signal, label="Original")
ax.plot(reconstruction, label="Reconstruction")
plt.legend()
# saving the figure
plt.savefig(fname="figure-question-3.pdf", dpi=200, transparent=True, bbox_inches="tight", pad_inches=0)

## Question 4

Hints:
- For ease of use, code a scikit-learn estimator class `ConvDL` that implements 
    - `.fit()` (learn dictionary),
    - `.predict()` method (return a list of list of steps). Use the helper function `sparse_codes_to_list_of_steps`.
- In the cross-validation, use the `f1_score_step_detection` function to compute the F-score between prediction and label.

In [None]:
from alphacsc import update_z, learn_d_z
from sklearn.base import BaseEstimator

class ConvDL(BaseEstimator):
    def __init__(self, reg, n_atoms, atom_length, n_iter=30):
        ...

    def fit(self, X, y=None):
        ...
        return self
    
    def predict(self, X):
        ...

Use helper functions from scikit-learn to find the optimal combination of parameters.

In [None]:
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer

# list of values for lambda as a percentage of lambda_max (see Question 2)
# use the argument `lmbd_max="scaled"` in alphacsc.learn_d_z 
penalty_list = [0.1, 0.2, 0.5, 0.8, 0.9, 0.95]

# list of values for K
n_atoms_list = [2, 4, 6, 8, 10, 20]

# list of values for L
atom_length_list = [20, 50, 80, 100, 150]

In [None]:
# use X_train and y_train


## Question 5

In [None]:
# adapt this code to your variables

dictionary = np.zeros((3, 100))  # add the learned dictionary

for (k, atom) in enumerate(dictionary):
    fig, ax = fig_ax(figsize=(5, 3))
    ax.plot(atom)
    # saving the figure
    plt.savefig(fname=f"figure-question-5-atom-{k}.pdf", dpi=200, transparent=True, bbox_inches="tight", pad_inches=0)

## Question 6

In [None]:
# use X_test and y_test



# Dynamic time warping (DTW)

For this section, the data remain the same but the task is different. We want to classify footsteps in healthy/non-healthy (instead of detecting them as before).

## Data

In [None]:
subset_indexes = [95, 619, 441, 149, 951, 803, 214, 34, 37, 630]
code_list = get_code_list()

X_train = list()  # list of footstep signals
y_train = list()  # list of pathologies (the "labels")

for code in np.take(code_list, subset_indexes):
    single_trial = load_human_locomotion_dataset(code)
    signal = single_trial.signal.LAZ.to_numpy()  # keeping only one dimension (from the left sensor)
    steps = single_trial.left_steps
    pathology = single_trial.metadata["PathologyGroup"]
    label = 0 if pathology=="Healthy" else 1  # 0: healthy, 1: non-healthy
    for (start, end) in steps:
        X_train.append(signal[start:end])
        y_train.append(label)

fig, ax = fig_ax()
for (step_signal, label) in zip(X_train, y_train):
    color = "b" if label==0 else "r"
    ax.plot(step_signal, color=color, alpha=0.1)

## Question 7

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import f1_score
from dtw import dtw

## Question 8