# Dynamic Time Warping

DTW is an algorithm for comparing two sequences. The sequences may be of different length. The algorithm will return that sequences are similar if they longer one has a subsequence that is very similar to the shorter one, even if some parts are stretched too much.

I will implement a simple version of DTW that performs $n \times m$ operations of calculating a distance, where $n$ and $m$ are lengths of compared sequences (lets call them $N$ and $M$ with elements $N_i$ and $M_j$, where indexes start at $0$). The algorithm uses a method of dynamic programming. I will keep whole arrays to visualize the results better, but you can keep only last two rows and compute distances row-wise. It keeps memory usage linear with $m$ and the algorithm remains simple. The final distance between sequences is in the bottom right corner.

The algorithm requires choosing a metric $d$ for measuring distance of single elements. 

The algorithms looks like this: We create a distance matrix $D$ of size $(N + 1) \times (M + 1)$ and set first column and first row all to $\infty$, except the corner which is set to $0$.
Then we fill all the remaining $N \times M$ cells going row-wise according to this rule:

$$D+{i+1, j+1} = min(D_{i+1, j}, D_{i, j+1}, D_{i, j}) + d(N_i, M_j)$$

* Taking $D_{i+1, j}$ means that our current result consists of $d(N_i, M_{j-1})$ and $d(N_i, M_j)$, so you compared same element of sequence N with two elements from pattern M. Eg. $[1, 20, 20, 20, 20], [1, 1, 1, 10]$ will probably take this case a few times.
* $D_{i, j+1}$ so your result will consist of $d(N_{i-1}, M_j)$ and $d(N_i, M_j)$, so you matched two elements from sequence with same element in pattern M. Eg. $[1, 1, 1, 1, 1, 20], [1, 10]$ will use this case to minimize the distance.
* $D_{i, j}$ so you used both the next element in sequence N and in pattern M. Eg. $[1, 2, 3, 4]$ and $[1, 2, 3, 4]$.

In [34]:
import functools as ft
import itertools as it
import json
import math
import operator as op
import os

from IPython.display import display
from ipywidgets import interact, interact_manual, widgets
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy import misc, spatial, stats
from sklearn import metrics

In [10]:
def calculate_dtw_full(metric, pattern, sequence):
    pattern_size = len(pattern)
    sequence_size = len(sequence)

    distances = np.zeros((sequence_size + 1, pattern_size + 1), dtype=np.float64)
    distances[0, :] = math.inf
    distances[:, 0] = math.inf
    distances[0, 0] = 0

    for i, sequence_window in enumerate(sequence):
        for j, pattern_window in enumerate(pattern):
            distance = metric(pattern_window, sequence_window)
            prev_distance = min(distances[i, j], distances[i, j + 1], distances[i + 1, j])
            distances[i + 1, j + 1] = prev_distance + distance

    return distances

dtw_full_norm = lambda pat, seq: calculate_dtw_full(lambda x, y: np.linalg.norm(x - y), pat, seq)

In [27]:
print('Matches same element in sequence with multiple elements in pattern')
print(dtw_full_norm([1,1,1,10], [1,20,20,20,20]))

Matches same element in sequence with multiple elements in pattern
[[  0.  inf  inf  inf  inf]
 [ inf   0.   0.   0.   9.]
 [ inf  19.  19.  19.  10.]
 [ inf  38.  38.  38.  20.]
 [ inf  57.  57.  57.  30.]
 [ inf  76.  76.  76.  40.]]


In [28]:
print('Matches multiple elements from sequence with the same element in pattern')
print(dtw_full_norm([1,10], [1,1,1,1,1,20]))

Matches multiple elements from sequence with the same element in pattern
[[  0.  inf  inf]
 [ inf   0.   9.]
 [ inf   0.   9.]
 [ inf   0.   9.]
 [ inf   0.   9.]
 [ inf   0.   9.]
 [ inf  19.  10.]]


In [29]:
print('Matches elements one to one')
print(dtw_full_norm([1, 2, 3, 4], [1, 2, 3, 4]))

Matches elements one to one
[[  0.  inf  inf  inf  inf]
 [ inf   0.   1.   3.   6.]
 [ inf   1.   0.   1.   3.]
 [ inf   3.   1.   0.   1.]
 [ inf   6.   3.   1.   0.]]


In [30]:
print('If the sequence is equal to pattern except some elements are repeated more or less times, the distance is still 0')
print(dtw_full_norm([1, 2, 2, 2, 3, 4], [1, 1, 2, 3, 3, 3, 4, 4]))

If the sequence is equal to pattern except some elements are repeated more or less times, the distance is still 0
[[  0.  inf  inf  inf  inf  inf  inf]
 [ inf   0.   1.   2.   3.   5.   8.]
 [ inf   0.   1.   2.   3.   5.   8.]
 [ inf   1.   0.   0.   0.   1.   3.]
 [ inf   3.   1.   1.   1.   0.   1.]
 [ inf   5.   2.   2.   2.   0.   1.]
 [ inf   7.   3.   3.   3.   0.   1.]
 [ inf  10.   5.   5.   5.   1.   0.]
 [ inf  13.   7.   7.   7.   2.   0.]]


In [31]:
print(dtw_full_norm([1, 10], [1, 1, 1, 1, 1]))
print(dtw_full_norm([1, 10], [10, 10, 10, 10, 10]))

[[  0.  inf  inf]
 [ inf   0.   9.]
 [ inf   0.   9.]
 [ inf   0.   9.]
 [ inf   0.   9.]
 [ inf   0.   9.]]
[[  0.  inf  inf]
 [ inf   9.   9.]
 [ inf  18.   9.]
 [ inf  27.   9.]
 [ inf  36.   9.]
 [ inf  45.   9.]]


In [33]:
print(dtw_full_norm([1, 2, 2, 2, 3, 4], [2, 3, 4, 4, 4, 5]))

[[  0.  inf  inf  inf  inf  inf  inf]
 [ inf   1.   1.   1.   1.   2.   4.]
 [ inf   3.   2.   2.   2.   1.   2.]
 [ inf   6.   4.   4.   4.   2.   1.]
 [ inf   9.   6.   6.   6.   3.   1.]
 [ inf  12.   8.   8.   8.   4.   1.]
 [ inf  16.  11.  11.  11.   6.   2.]]


# Using DTW on RedDots

In [None]:
class DynamicTimeWarpingClassifier:
    def __init__(self, metric=spatial.distance.euclideanu):
        self.metric = metric
        self.patterns = None
        self.labels = None

    def fit(self, features, labels):
        self.patterns = features
        self.labels = labels

    def predict(self, features):
        sequence_num = len(features)

        results = np.zeros(sequence_num, dtype=self.labels.dtype)
        for i, sequence in enumerate(features):
            min_distance = None
            min_label = None
            for j, pattern in enumerate(self.patterns):
                distance, _ = fastdtw.fastdtw(pattern, sequence, dist=self.metric)
                if min_distance is None or distance < min_distance:
                    min_distance = distance
                    min_label = self.labels[j]
            results[i] = min_label

        return results