# DTW Time checking and Multiprocessing

First, let's import modules and define functions!

In [None]:
from joblib import Parallel, delayed
from tqdm.notebook import trange, tqdm
import multiprocessing
from fastdtw import fastdtw
from datetime import datetime
import random
import numpy as np
import h5py
import hdf5plugin
from typing import List
import matplotlib.pyplot as plt

ALL_LEADS = ['I', 'II', 'III', 'aVR', 'aVL', 'aVF', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6']
ecgno = np.load('./ecgno.npy')
print(len(ecgno))

dtw_ecgs = np.random.choice(ecgno, 100000)
train_id = dtw_ecgs[:60000]
val_id = dtw_ecgs[60000:80000]
test_id = dtw_ecgs[80000:]

manhattan_distance = lambda x, y: np.abs(x - y)

def load_ecg(
    hd5: h5py.File, date: str,
    target_length: int = 2500, 
    leads: List[str] = ALL_LEADS,
):
    out = np.empty((target_length, len(leads)))
    for i, lead in enumerate(leads):
        lead_array = hd5["ecg"][date][lead][()]
        out[:, i] = np.interp(
            np.linspace(0, 1, target_length),
            np.linspace(0, 1, lead_array.shape[0]),
            lead_array,
        )
    return out

# def fastdtw(ecg1, ecg2):
#     return fastdtw.fastdtw(ecg1, ecg2)[0]

def calc_dtw_each(ecg1_id, ecg2_id, dtw_matrix):
    if dtw_matrix[i][j] == 0:
        with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg1_id.split('_')[0], ecg1_id.split('_')[1]), "r") as hd5:
            print('loaded hd5')
            ecg1 = load_ecg(hd5, ecg1_id.split('_')[-1])
        with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg2_id.split('_')[0], ecg2_id.split('_')[1]), "r") as hd5:
            ecg2 = load_ecg(hd5, ecg2_id.split('_')[-1])

        print ('loaded ecg')
        dtw_val = fastdtw(ecg1, ecg2)[0]
        dtw_matrix[i][j] = dtw_val
        dtw_matrix[j][i] = dtw_val
    else:
        print('multiple calculation'+ecg1_id+ecg2_id)
        
    return dtw_matrix

In [None]:
ecg1_id.split('_')

In [None]:
ecg1_id = ecgno[0]
ecg2_id = ecgno[1]
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg1_id.split('_')[0], ecg1_id.split('_')[1]), "r") as hd5:
    print('loaded hd5')
    ecg1 = load_ecg(hd5, ecg1_id.split('_')[-1])
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg2_id.split('_')[0], ecg2_id.split('_')[1]), "r") as hd5:
    ecg2 = load_ecg(hd5, ecg2_id.split('_')[-1])
for i in range(12):
    plt.plot(ecg1[:,i].T)
    plt.show()

In [None]:
print(ecg1.shape, ecg2.shape)

# DTW surrogate model speed Check

In [None]:
def get_model(args, device=None, valid=False, dtw=False, finetuning=False, dml_model=None):
    if valid:
        model = Classifier(args=args) #two layer fc classifier (with BN, ReLU, Dropout)
        
    elif finetuning:        
        # Load model for finetuning
        model = Finetune_Classifier(args, dml_model)
        print('model for finetuning')
        
    else:
        if args.model == "cnn":
            if args.dtw or dtw:
                model = resnet18_dtw(pretrained=args.pretrain, args=args)
            else:
                model = resnet18(pretrained=args.pretrain, args=args)

        elif args.model == "cnn_prev":
            model = models.resnet18(pretrained=False).to(device)
            model.conv1 = nn.Conv2d(1, 64, (7, 7), (2, 2), (3, 3), bias=False)
            model.avgpool = nn.AdaptiveAvgPool2d(1)
            model.fc = nn.Linear(in_features=512, out_features=1, bias=True)

        else:
            model_module = importlib.import_module("model." + args.model)
            model_class = getattr(model_module, args.model.upper())
            model = model_class(args)

    model = model.to(device)
    # model = nn.DataParallel(model)

    return model

In [21]:
# def load_ecg(
#     hd5: h5py.File, date: str,
#     target_length: int = 2500, 
#     leads: List[str] = ALL_LEADS,
# ):
#     out = np.empty((target_length, len(leads)))
#     for i, lead in enumerate(leads):
#         lead_array = hd5["ecg"][date][lead][()]
#         out[:, i] = np.interp(
#             np.linspace(0, 1, target_length),
#             np.linspace(0, 1, lead_array.shape[0]),
#             lead_array,
#         )
#     return out

hd5 = h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg1_id.split('_')[0], ecg1_id.split('_')[1]), "r")
lead_array = hd5["ecg"][ecg1_id.split('_')[-1]]['II'][()][:1024]

In [None]:
len(lead_array)

In [None]:
plt.plot(lead_array.T)
plt.show()

In [None]:
for i in range(12):
    plt.plot(ecg2[:,i].T)
    plt.show()

# Dynamic Time Warping

In this step I tried multiple versions of DTW 

Todo: Check whether each dtw method provides multidimensional distance calculation (for 12 lead ECGs)

In [None]:
# FastDTW: https://github.com/DynamicTimeWarping/dtw-python
ecg1_id = ecgno[0]
ecg2_id = ecgno[1]

now = datetime.now()
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg1_id.split('_')[0], ecg1_id.split('_')[1]), "r") as hd5:
    print('loaded hd5')
    ecg1 = load_ecg(hd5, ecg1_id.split('_')[-1])
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg2_id.split('_')[0], ecg2_id.split('_')[1]), "r") as hd5:
    ecg2 = load_ecg(hd5, ecg2_id.split('_')[-1])

print ('loaded ecg')
dtw_val, route = fastdtw(ecg1, ecg2)
print(route)
later = datetime.now()
difference = (later - now).total_seconds()

print(difference)
print(dtw_val)

In [None]:
# fast dtw channelwise
now = datetime.now()
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg1_id.split('_')[0], ecg1_id.split('_')[1]), "r") as hd5:
    print('loaded hd5')
    ecg1 = load_ecg(hd5, ecg1_id.split('_')[-1])
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg2_id.split('_')[0], ecg2_id.split('_')[1]), "r") as hd5:
    ecg2 = load_ecg(hd5, ecg2_id.split('_')[-1])

for lead in tqdm(range(12)):
    x = ecg1[:, lead]
    y = ecg2[:, lead]
    
    d, route = fastdtw(x, y)
    dtw_d.append(d)
    print (d)

later = datetime.now()
difference = (later - now).total_seconds()
print(np.mean(dtw_d))
print(difference)

In [None]:
# FastDTW: https://github.com/DynamicTimeWarping/dtw-python
ecg1_id = ecgno[0]
ecg2_id = ecgno[1]
dtw_d = []

now = datetime.now()
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg1_id.split('_')[0], ecg1_id.split('_')[1]), "r") as hd5:
    print('loaded hd5')
    ecg1 = load_ecg(hd5, ecg1_id.split('_')[-1])
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg2_id.split('_')[0], ecg2_id.split('_')[1]), "r") as hd5:
    ecg2 = load_ecg(hd5, ecg2_id.split('_')[-1])

print ('loaded ecg')
dtw_val, route = fastdtw(ecg1, ecg2)
print(route)
later = datetime.now()
difference = (later - now).total_seconds()

for lead in tqdm(range(12)):
    x = ecg1[:, lead]
    y = ecg2[:, lead]
    
    d, cost_matrix, acc_cost_matrix, path = dtw(x, y, dist=manhattan_distance)
    dtw_d.append(d)
    print (d)

later = datetime.now()
difference = (later - now).total_seconds()
print(np.mean(dtw_d))

print(difference)
print(dtw_val)

In [None]:
# DTW: https://github.com/markdregan/K-Nearest-Neighbors-with-Dynamic-Time-Warping
from dtw import dtw
manhattan_distance = lambda x, y: np.abs(x - y)
dtw_d = []

now = datetime.now()

with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg1_id.split('_')[0], ecg1_id.split('_')[1]), "r") as hd5:
    print('loaded hd5')
    ecg1 = load_ecg(hd5, ecg1_id.split('_')[-1])
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg2_id.split('_')[0], ecg2_id.split('_')[1]), "r") as hd5:
    ecg2 = load_ecg(hd5, ecg2_id.split('_')[-1])

for lead in tqdm(range(12)):
    x = ecg1[:, lead]
    y = ecg2[:, lead]
    
    d, cost_matrix, acc_cost_matrix, path = dtw(x, y, dist=manhattan_distance)
    dtw_d.append(d)
    print (d)

later = datetime.now()
difference = (later - now).total_seconds()
print(np.mean(dtw_d))

In [None]:
# DTW-python: https://github.com/DynamicTimeWarping/dtw-python
import numpy as np

## A noisy sine wave as query
idx = np.linspace(0,6.28,num=100)
query = np.sin(idx) + np.random.uniform(size=100)/10.0

## A cosine is for template; sin and cos are offset by 25 samples
template = np.cos(idx)

## Find the best match with the canonical recursion formula
from dtw import *

now = datetime.now()
ecg1_id = ecgno[0]
ecg2_id = ecgno[1]
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg1_id.split('_')[0], ecg1_id.split('_')[1]), "r") as hd5:
    print('loaded hd5')
    ecg1 = load_ecg(hd5, ecg1_id.split('_')[-1])
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg2_id.split('_')[0], ecg2_id.split('_')[1]), "r") as hd5:
    ecg2 = load_ecg(hd5, ecg2_id.split('_')[-1])
    
alignment = dtw(ecg1, ecg2, keep_internals=True)

## Display the warping curve, i.e. the alignment curve
alignment.plot(type="threeway")

## Align and plot with the Rabiner-Juang type VI-c unsmoothed recursion
dtw(query, template, keep_internals=True, 
    step_pattern=rabinerJuangStepPattern(6, "c"))\
    .plot(type="twoway",offset=-2)

## See the recursion relation, as formula and diagram
print(rabinerJuangStepPattern(6,"c"))
rabinerJuangStepPattern(6,"c").plot()

## And much more!

In [None]:
# Dtaidistance: faster per some report from others
from dtaidistance import dtw
dtw_d = []

now = datetime.now()

with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg1_id.split('_')[0], ecg1_id.split('_')[1]), "r") as hd5:
    ecg1 = load_ecg(hd5, ecg1_id.split('_')[-1])
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg2_id.split('_')[0], ecg2_id.split('_')[1]), "r") as hd5:
    ecg2 = load_ecg(hd5, ecg2_id.split('_')[-1])
print('loaded hd5')

for lead in tqdm(range(12)):
    x = ecg1[:, lead]
    y = ecg2[:, lead]
    
    d = dtw(x, y, use_mp=True)
    dtw_d.append(d)
    print (d)

later = datetime.now()
difference = (later - now).total_seconds()
print(np.mean(dtw_d))
print(difference)

In [None]:
# Dtaidistance: faster per some report from others
from dtaidistance import dtw_ndim
dtw_d = []

now = datetime.now()

with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg1_id.split('_')[0], ecg1_id.split('_')[1]), "r") as hd5:
    ecg1 = load_ecg(hd5, ecg1_id.split('_')[-1])
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg2_id.split('_')[0], ecg2_id.split('_')[1]), "r") as hd5:
    ecg2 = load_ecg(hd5, ecg2_id.split('_')[-1])
print('loaded hd5')

print(dtw_ndim.distance(ecg1, ecg2))
later = datetime.now()
difference = (later - now).total_seconds()
print(difference)

# MultiProcessing

Now let's try multiprocessing with dtaidistance

In [None]:
import multiprocessing
import itertools
import tqdm

num_cores = multiprocessing.cpu_count() - 1 # total 80
print(num_cores)

DTW_MATRIX = np.zeros((len(train_id), len(train_id)))
prod = itertools.product(train_id, train_id)
print('prepared ids and pairs')

def calc_dtw_each(ecg1_id, ecg2_id):
    now = datetime.now()
    if DTW_MATRIX[i][j] == 0:
        with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg1_id.split('_')[0], ecg1_id.split('_')[1]), "r") as e1:
            print('loaded hd5')
            ecg1 = load_ecg(e1, ecg1_id.split('_')[-1])
        with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg2_id.split('_')[0], ecg2_id.split('_')[1]), "r") as e2:
            ecg2 = load_ecg(e2, ecg2_id.split('_')[-1])

        print ('loaded ecg')
        dtw_d = np.mean([dtw_ndim.distance(ecg1[:, lead], ecg2[:, lead])[0] for lead in range(12)])
        DTW_MATRIX[i][j] = dtw_val
        DTW_MATRIX[j][i] = dtw_val 
    later = datetime.now()
    difference = (later - now).total_seconds()
    print(difference)
    return dtw_matrix

# Parallel(n_jobs=num_cores)(delayed(calculate_dtw)(train_id) for j in range(len(train_id)))
# print (time.time() - init)

print('preparing multiprocesing')
pool = multiprocessing.Pool(num_cores)
for _ in tqdm.tqdm(pool.imap_unordered(calc_dtw_each, prod), total=len(prod)):
    pass
pool.close()
# pool.map(calc_dtw_each, prod)

print('saving files...')
np.save('./dtw_train_dtai.npy', DTW_MATRIX)
# np.save('./dtw_val.npy', dtw_val)
# np.save('./dtw_test.npy', dtw_test)

In [None]:
# Dtaidistance: faster per some report from others
from dtaidistance import dtw_ndim
dtw_d = []

now = datetime.now()

with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg1_id.split('_')[0], ecg1_id.split('_')[1]), "r") as hd5:
    ecg1 = load_ecg(hd5, ecg1_id.split('_')[-1])
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg2_id.split('_')[0], ecg2_id.split('_')[1]), "r") as hd5:
    ecg2 = load_ecg(hd5, ecg2_id.split('_')[-1])
print('loaded hd5')

print(dtw_ndim.distance_matrix(ecg1, ecg2, use_mp=True))
later = datetime.now()
difference = (later - now).total_seconds()
print(difference)

In [None]:
results = [[] for i in range(N)]

def calc_dtw_each(ecg1_id, ecg2_id, dtw_matrix):
    if dtw_matrix[i][j] == 0:
        with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg1.split('_')[0], ecg1_id.split('_')[1]), "r") as hd5:
            print('loaded hd5')
            ecg1 = load_ecg(hd5, ecg1_id.split('_')[-1])
        with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(batch[j].split('_')[0], ecg2_id.split('_')[1]), "r") as hd5:
            ecg2 = load_ecg(hd5, ecg2_id.split('_')[-1])

        print ('loaded ecg')
        dtw_val = fastdtw(ecg1, ecg2)
        dtw_matrix[i][j] = dtw_val
        dtw_matrix[j][i] = dtw_val
    else:
        print('multiple calculation'+ecg1_id+ecg2_id)
        
    return dtw_matrix

dtw_matrix = np.zeros((len(train_id), len(train_id)))
for i in tqdm(range(len(train_id))):
    dtw_matrix = Parallel(n_jobs=num_cores)(delayed(calc_dtw_each)(train_id[i],train_id[j], dtw_matrix) for j in range(len(train_id)) )

dtw_val = pool.map(calculate_dtw, val_id, val_id)
dtw_test = pool.map(calculate_dtw, test_id)
pool.close()
pool.join()

# dtw_train = calculate_dtw(train_id)
# dtw_val = calculate_dtw(val_id)
# dtw_test = calculate_dtw(test_id)

np.save('./dtw_train.npy', dtw_train)
np.save('./dtw_val.npy', dtw_val)
np.save('./dtw_test.npy', dtw_test)

In [None]:
# CUDA dtw channelwise
import torch
from utils.soft_dtw_cuda import SoftDTW

ecg1_id = ecgno[0]
ecg2_id = ecgno[1]

device = torch.device('cuda')
sdtw = SoftDTW(use_cuda=True, gamma=0.1)
now = datetime.now()
print('start loading files')
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg1_id.split('_')[0], ecg1_id.split('_')[1]), "r") as hd5:
    print('loaded hd5')
    ecg1 = torch.from_numpy(load_ecg(hd5, ecg1_id.split('_')[-1])).to(device)
with h5py.File("/storage/shared/ecg/{}/{}.hd5".format(ecg2_id.split('_')[0], ecg2_id.split('_')[1]), "r") as hd5:
    ecg2 = torch.from_numpy(load_ecg(hd5, ecg2_id.split('_')[-1])).to(device)

for lead in tqdm(range(12)):
    x = ecg1[:, lead]
    y = ecg2[:, lead]

    d = sdtw(x, y)
    dtw_d.append(d)
    print (d)

later = datetime.now()
difference = (later - now).total_seconds()
print(np.mean(dtw_d))
print(difference)

In [None]:
torch.cuda.is_available()

# Define DTW ID

In [None]:
import numpy as np

ecgno = np.load('./ecgno.npy')
print(len(ecgno))

dtw_ecgs = np.random.choice(ecgno, 10000)
train_id = dtw_ecgs[:6000]
val_id = dtw_ecgs[6000:8000]
test_id = dtw_ecgs[8000:]

np.save('./stores/dtw_trainid.npy', train_id) #6000
np.save('./stores/dtw_validid.npy', val_id) #2000
np.save('./stores/dtw_testid.npy', test_id) #2000

# Plotting DTW

Let's find out the distribution of DTW and get the sense on what's high and low value of DTW on ECG
--> How is this different from literature reports? 

In [8]:
DTW_MATRIX_TRAIN = np.load('./stores/dtw_matrix_train.npy').flatten()
DTW_MATRIX_VALID = np.load('./stores/dtw_matrix_valid.npy').flatten()

In [None]:
plt.plot(DTW_MATRIX_TRAIN)
plt.show()

In [None]:
plt.plot(DTW_MATRIX_VALID)
plt.show()

In [None]:
plt.hist(DTW_MATRIX_TRAIN*1000000)
plt.show()

In [None]:
DTW_MATRIX_TRAIN.()

In [None]:
len(DTW_MATRIX_TRAIN)

In [None]:
import seaborn as sns
sns.histplot(DTW_MATRIX_TRAIN)

In [None]:
sns.histplot(
    data=DTW_MATRIX_TRAIN, x="distance", hue="method",
    log_scale=True, element="step", fill=False,
    cumulative=True, stat="density", common_norm=False,
)

In [32]:
DTW_MATRIX_TRAIN = np.load('./stores/dtw_matrix_train.npy')

In [None]:
DTW_MATRIX_TRAIN.shape

In [None]:
DTW_MATRIX_TRAIN[105:140]