### Reference script for dtw test creation

Since the dtw test creation contains hardcoded values and its implementation is including multiple design choices, this script here attempts to make its expected behaviour that is tested understandable

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import ehrdata as ed
import numpy as np
import scipy

import ehrapy as ep

To follow along the implementation: minimal single time-series example

In [6]:
x = [1, 3, 9, 2, 1]
y = [2, 0, 0, 8, 7, 2]
N = len(x)
M = len(y)

#### Minimal dtw

For dtw, a tiny implementation of cost matrix can look like this

In [8]:
def compute_cost_matrix(X, Y, metric="euclidean"):
    """Compute the cost matrix of two feature sequences

    Notebook: C3/C3S2_DTWbasic.ipynb

    Args:
        X (np.ndarray): Sequence 1
        Y (np.ndarray): Sequence 2
        metric (str): Cost metric, a valid strings for scipy.spatial.distance.cdist (Default value = 'euclidean')

    Returns:
        C (np.ndarray): Cost matrix
    """
    X, Y = np.atleast_2d(X, Y)
    # whether to square the distance or not is a design choice - tslearn does so
    C = scipy.spatial.distance.cdist(X.T, Y.T, metric=metric) ** 2  # the **2 is what tslearn does!
    return C


C = compute_cost_matrix(x, y, metric="euclidean")
print("Cost matrix C =", C, sep="\n")

Cost matrix C =
[[ 1.  1.  1. 49. 36.  1.]
 [ 1.  9.  9. 25. 16.  1.]
 [49. 81. 81.  1.  4. 49.]
 [ 0.  4.  4. 36. 25.  0.]
 [ 1.  1.  1. 49. 36.  1.]]


In [9]:
def compute_accumulated_cost_matrix(C):
    """Compute the accumulated cost matrix given the cost matrix.

    Notebook: C3/C3S2_DTWbasic.ipynb

    Args:
        C (np.ndarray): Cost matrix

    Returns:
        D (np.ndarray): Accumulated cost matrix
    """
    N = C.shape[0]
    M = C.shape[1]
    D = np.zeros((N, M))
    D[0, 0] = C[0, 0]
    for n in range(1, N):
        D[n, 0] = D[n - 1, 0] + C[n, 0]
    for m in range(1, M):
        D[0, m] = D[0, m - 1] + C[0, m]
    for n in range(1, N):
        for m in range(1, M):
            D[n, m] = C[n, m] + min(D[n - 1, m], D[n, m - 1], D[n - 1, m - 1])
    return D


D = compute_accumulated_cost_matrix(C)
print("Accumulated cost matrix D =", D, sep="\n")

# to sqrt the distance is a design choice - tslearn does so
print("DTW distance np.sqrt(DTW(X, Y)) =", np.sqrt(D[-1, -1]))

Accumulated cost matrix D =
[[ 1.  2.  3. 52. 88. 89.]
 [ 2. 10. 11. 28. 44. 45.]
 [51. 83. 91. 12. 16. 65.]
 [51. 55. 59. 48. 37. 16.]
 [52. 52. 53. 97. 73. 17.]]
DTW distance np.sqrt(DTW(X, Y)) = 4.123105625617661


#### Minimal data example to use for ehrapy's neighbors test

##### Data

An array of 5 observations, 2 variables, and 4 timesteps

In [10]:
data = np.random.default_rng(42).integers(0, 5, (5, 2, 4))
data

array([[[0, 3, 3, 2],
        [2, 4, 0, 3]],

       [[1, 0, 2, 4],
        [3, 3, 3, 3]],

       [[2, 0, 4, 2],
        [2, 1, 0, 4]],

       [[3, 3, 2, 4],
        [2, 2, 2, 1]],

       [[0, 2, 4, 0],
        [4, 4, 1, 3]]])

##### Reference implementation

We assemble the open-box reference implementation to yield pairwise distances of the example data

In [11]:
pairwise_distances = np.zeros((data.shape[0], data.shape[0]))

pairwise_distances_feature_wise = np.zeros((data.shape[0], data.shape[0], data.shape[1]))

for feature in range(data.shape[1]):
    for i in range(data.shape[0]):
        for j in range(data.shape[0]):
            if i != j:
                C = compute_cost_matrix(data[i, feature], data[j, feature], metric="euclidean")
                D = compute_accumulated_cost_matrix(C)
                pairwise_distances_feature_wise[i, j, feature] = np.sqrt(D[-1, -1])

pairwise_distances = np.mean(pairwise_distances_feature_wise, axis=2)

pairwise_distances

array([[0.        , 2.98118805, 2.44948974, 3.30277564, 2.34277886],
       [2.98118805, 0.        , 3.43649167, 3.05492646, 3.28629768],
       [2.44948974, 3.43649167, 0.        , 3.16227766, 3.31318964],
       [3.30277564, 3.05492646, 3.16227766, 0.        , 4.35228539],
       [2.34277886, 3.28629768, 3.31318964, 4.35228539, 0.        ]])

##### `ehrapy.tools.distances.timeseries.timeseries_distance`

This should be matched by ehrapy's timeseries_distance, and the array above is included in its test.

Below just a quick show that it works as it is expected, but this is evaluated in the test of timeseries_distance, too.

In [12]:
pairwise_distances = np.zeros((data.shape[0], data.shape[0]))
for i in range(data.shape[0]):
    for j in range(data.shape[0]):
        if i != j:
            pairwise_distances[i, j] = ep.tl.distances.timeseries.timeseries_distance(
                np.array([i]), np.array([j]), data
            )

pairwise_distances

array([[0.        , 2.98118805, 2.44948974, 3.30277564, 2.34277886],
       [2.98118805, 0.        , 3.43649167, 3.05492646, 3.28629768],
       [2.44948974, 3.43649167, 0.        , 3.16227766, 3.31318964],
       [3.30277564, 3.05492646, 3.16227766, 0.        , 4.35228539],
       [2.34277886, 3.28629768, 3.31318964, 4.35228539, 0.        ]])

##### `ehrapy.preprocessing.neighbors`

This should be matched by ehrapy's neighbors impelmentation, and the array above is included in its test.

Below just a quick show that it works as it is expected, but this is evaluated in the test of timeseries_distance, too.

In [13]:
edata = ed.EHRData(X=None, R=data)
ep.pp.neighbors(edata, n_neighbors=3, metric="dtw")

EHRData object with n_obs × n_vars × n_t = 5 × 2 × 4
    uns: 'neighbors'
    obsp: 'distances', 'connectivities'
    shape of .X: (5, 2)
    shape of .R: (5, 2, 4)

In [14]:
edata.obsp["distances"].toarray()

array([[0.        , 0.        , 2.44948974, 0.        , 2.34277886],
       [2.98118805, 0.        , 0.        , 3.05492646, 0.        ],
       [2.44948974, 0.        , 0.        , 3.16227766, 0.        ],
       [0.        , 3.05492646, 3.16227766, 0.        , 0.        ],
       [2.34277886, 3.28629768, 0.        , 0.        , 0.        ]])