## Dynamic Time Warping
* 1NN + bespoke distance metric

In [9]:
import pandas as pd
import numpy as np
import scipy
import matplotlib
from matplotlib import pyplot as plt
import os
import re
import time
import sklearn
from glob import glob
from tqdm import tqdm # show progress bar
from time import sleep
import numpy as np
import math
import sys # for sys.float_info.epsilon
from numba import njit # speed up numpy calculation
from sklearn.tree import DecisionTreeClassifier
os.getcwd()

'/Users/kangshuoli/Documents/VScode_workspace/GR5398/doc'

#### Calculate the optimal distance in a recursion fashion

In [5]:
@njit
def DTW(a, b, MTSC = True, method = "dependent"):
    '''
    Input: 
    a -> time seires m * d numpy array, 
    b -> time series m * d numpy array, 
    MTSC -> default is multivariate time series, 
    method: use `dependent` warping (DTWd) by default, also can use `independent`

    Output:
    optimal distance
    '''
    if a.shape[0] != b.shape[0]:
        raise ValueError("Time length are not the same!")
    if not MTSC:
        raise ValueError("Input should be MTS!")
    m = a.shape[0] # number of time steps
    d = a.shape[1]
    assert a.shape[1] == b.shape[1]
    M = np.zeros((m,m)) # initialize dependent distance matrix M
    M_I = np.zeros((d,m,m)) # initialize independent idstance matrix M_I for each dimension
    dp = np.zeros((m,m)) # define dp table -> DTW
    if method == "dependent": # calculate distance
        for i in range(m):
            for j in range(i, m):
                a_vec_i = a[i,:]
                b_vec_j = b[j,:]
                diff_vec = a_vec_i - b_vec_j
                M[i,j] = np.dot(diff_vec.T, diff_vec)
                M[j,i] = M[i,j]
        # find the warping path
        for i in range(m):
            for j in range(m):
                # base case
                if i == 0 and j == 0:
                    dp[i,j] = M[i,j]
                    continue
                elif i == 0 and j != 0:
                    dp[i,j] = M[i,j] + dp[i,j-1]
                    continue
                elif i != 0 and j == 0:
                    dp[i,j] = M[i,j] + dp[i-1, j]
                    continue
                dp[i,j] = M[i,j] + min(dp[i-1, j], dp[i,j-1], dp[i-1,j-1])
        return dp[m-1,m-1]
    elif method == "independent":
        for k in range(d): # calculate distance independently
            for i in range(m):
                for j in range(i, m):
                    a_i = a[i,k]
                    b_j = b[j,k]
                    diff = (a_i - b_j) ** 2
                    M_I[k,i,j] = diff
                    M_I[k,j,i] = M_I[k,i,j]
        # find the warping path
        dp_I = np.zeros((d,m,m))
        for i in range(m):
            for j in range(m):
                # base case
                if i == 0 and j == 0:
                    dp_I[:,i,j] = M_I[:,i,j]
                    continue
                elif i == 0 and j != 0:
                    dp_I[:,i,j] = M_I[:,i,j] + dp_I[:,i,j-1]
                    continue
                elif i != 0 and j == 0:
                    dp_I[:,i,j] = M_I[:,i,j] + dp_I[:,i-1, j]
                    continue
                for k in range(d):
                    dp_I[k,i,j] = M_I[k,i,j] + min(dp_I[k,i-1,j], dp_I[k,i,j-1], dp_I[k,i-1,j-1])
        dp[i,j] = np.sum(dp_I[:,i,j], axis = 0)
        return dp[m-1,m-1]
    else:
        raise ValueError("Invalid method!")


def find_scores(mts_train, label_train):
    '''
    Input:
    mts_train: traning set of multivariate time series
    label_train: labels for training set

    Use cross validation

    Output:
    threshold: adjusted threshold
    '''
    nn_label_D, nn_label_I = np.empty(mts_train.shape[0]), np.empty(mts_train.shape[0])
    S_dSuccess, S_iSuccess = [], []
    S = np.empty(mts_train.shape[0])
    print("Finding Scores:")
    for i in tqdm(range(mts_train.shape[0])):
        curr_label = label_train[i]
        curr_min_distance_I, curr_min_distance_D = math.inf, math.inf
        curr_index_list = list(range(0,mts_train.shape[0]))
        curr_index_list.remove(i)
        temp_mts_train = mts_train[curr_index_list,:]
        temp_label_train = label_train[curr_index_list]
        for j in range(temp_mts_train.shape[0]):
            curr_distance_D = DTW(mts_train[i,:,:], temp_mts_train[j,:,:], "dependent")
            curr_distance_I = DTW(mts_train[i,:,:], temp_mts_train[j,:,:], "independent")
            if curr_distance_D <= curr_min_distance_D:
                curr_min_distance_D = curr_distance_D
                nn_label_D[j] = temp_label_train[j]
            if curr_distance_I <= curr_min_distance_I:
                curr_min_distance_I = curr_distance_I
                nn_label_I[j] = temp_label_train[j]
        curr_score = curr_min_distance_D / (curr_min_distance_I + sys.float_info.epsilon)
        S[i] = curr_score
        if nn_label_D[i] == curr_label and nn_label_I[i] != curr_label: # dSuccess
            S_dSuccess.append(curr_score)
        if nn_label_D[i] != curr_label and nn_label_I[i] == curr_label: # iSuccess
            S_iSuccess.append(curr_score)
        sleep(0.01)
    print("Done!")
    sleep(0.3)
    return S, S_iSuccess, S_dSuccess


def learn_threshold(mts_train, label_train):
    '''
    Input:
    mts_train: traning set of multivariate time series
    label_train: labels for training set

    Use cross validation

    Output:
    threshold: adjusted threshold
    '''
    S, S_iSuccess, S_dSuccess = find_scores(mts_train, label_train)
    print("Learning Threshold")
    if len(S_iSuccess) == 0 and len(S_dSuccess) == 0:
        threshold = 1
    elif len(S_iSuccess) == 0:
        threshold = max(S_dSuccess)
    elif len(S_dSuccess) == 0:
        threshold = max(S_iSuccess)
    else:
        # reconstruct the data: label iSuccess -> 0, dSuccess -> 1
        data = np.concatenate((np.asarray(S_iSuccess), np.asarray(S_dSuccess)))
        label = np.concatenate((np.ones(len(S_iSuccess)), np.zeros(len(S_dSuccess))))
        dec_tree = DecisionTreeClassifier(
            max_depth = 1, 
            max_leaf_nodes = 2, 
            random_state = 42, 
            max_features = 1, 
            criterion = "entropy", 
            splitter = "best"
        )
        dec_tree.fit(data, label)
        threshold = dec_tree.tree_.threshold[0]
    print("Done!")
    return threshold


def DTW_adaptive(a, b, threshold):
    '''
    Input:
    a -> time seires m * d numpy array, 
    b -> time series m * d numpy array, 
    threshold -> adjusted threshold
    
    Output:
    adaptive distance
    '''
    if a.shape[0] != b.shape[0]:
        raise ValueError("Time length are not the same!")
    m = a.shape[0] # number of time steps
    d = a.shape[1]
    assert a.shape[1] == b.shape[1]
    # dependent
    DTW_D = DTW(a, b, MTSC = True, method = "dependent")
    # independent
    DTW_I = DTW(a, b, MTSC = True, method = "independent")
    # adaptive
    S = DTW_D / (DTW_I + sys.float_info.epsilon)
    # thesis got wrong version, this code is correct instead
    if S > threshold:
        return DTW_I
    else:
        return DTW_D


def NN_DTW(mts_train, mts_test, label_train, index = label_test.index, method = "dependent"):
    '''
    Input:
    mts_train: traning set of multivariate time series
    mts_test: test set of multivariate time series
    label_train: labels for training set

    Use 1NN classifier

    Output:
    label_pred: list of label predicted by the 1NN
    '''
    label_pred = []
    if method != "adaptive":
        for i in tqdm(range(mts_test.shape[0])): # for each test data
            curr_min_distance = math.inf
            min_distance_label = None
            for j in range(mts_train.shape[0]): # for each training data
                curr_distance = DTW(mts_test[i,:,:], mts_train[j,:,:], method)
                if curr_distance <= curr_min_distance:
                    curr_min_distance = curr_distance
                    min_distance_label = label_train[j]
            label_pred.append(min_distance_label)
            sleep(0.01)
        sleep(0.3)
    else:
        # determine the threshold
        threshold = learn_threshold(mts_train, label_train)
        print("Classifying:")
        for i in tqdm(range(mts_test.shape[0])): # for each test data
            curr_min_distance = math.inf
            min_distance_label = None
            for j in range(mts_train.shape[0]): # for each training data
                curr_distance =  DTW_adaptive(mts_test[i,:,:], mts_train[j,:,:], threshold)
                if curr_distance <= curr_min_distance:
                    curr_min_distance = curr_distance
                    min_distance_label = label_train[j]
            label_pred.append(min_distance_label)
            sleep(0.01)
        sleep(0.3)

    return pd.Series(label_pred, index = index)


def NN_DTW(mts_train, mts_test, label_train, index = label_test.index, method = "dependent"):
    '''
    Input:
    mts_train: traning set of multivariate time series
    mts_test: test set of multivariate time series
    label_train: labels for training set

    Use 1NN classifier

    Output:
    label_pred: list of label predicted by the 1NN
    '''
    label_pred = []
    if method != "adaptive":
        for i in tqdm(range(mts_test.shape[0])): # for each test data
            curr_min_distance = math.inf
            min_distance_label = None
            for j in range(mts_train.shape[0]): # for each training data
                curr_distance = DTW(mts_test[i,:,:], mts_train[j,:,:], method)
                if curr_distance <= curr_min_distance:
                    curr_min_distance = curr_distance
                    min_distance_label = label_train[j]
            label_pred.append(min_distance_label)
            sleep(0.01)
        sleep(0.3)
    else:
        # determine the threshold
        threshold = learn_threshold(mts_train, label_train)
        print("Classifying:")
        for i in tqdm(range(mts_test.shape[0])): # for each test data
            curr_min_distance = math.inf
            min_distance_label = None
            for j in range(mts_train.shape[0]): # for each training data
                curr_distance =  DTW_adaptive(mts_test[i,:,:], mts_train[j,:,:], threshold)
                if curr_distance <= curr_min_distance:
                    curr_min_distance = curr_distance
                    min_distance_label = label_train[j]
            label_pred.append(min_distance_label)
            sleep(0.01)
        sleep(0.3)

    return pd.Series(label_pred, index = index)

In [None]:
'''
Usage:
mts_train_np = np.asarray(mts_train)
mts_test_np = np.asarray(mts_test)
label_train_np = np.asarray(label_train)
label_test_np = np.asarray(label_test)

label_pred = NN_DTW(mts_train_np, mts_test_np, label_train_np, method = "dependent")
label_pred = NN_DTW(mts_train_np, mts_test_np, label_train_np, method = "adaptive")
'''