In [None]:
import numpy as np 
import random
import scipy
import torch
import torch.nn as nn
import torch.optim as optim

from numpy import genfromtxt

In [None]:
FORECAST_HORIZON = 16

In [None]:
file_name_prefix = "data/ECG200/ECG200"

In [None]:
# Load data
# In order to perform 10-fold cross-validation, we
# will merge the provided train and test splits and 
# we will split the data durign the cross-validation

train_data_with_class_labels = np.genfromtxt(file_name_prefix+'_TRAIN.txt')
test_data_with_class_labels = np.genfromtxt(file_name_prefix+'_TEST.txt')

data_with_class_labels = np.vstack( (train_data_with_class_labels, 
                                     test_data_with_class_labels))
data_without_class_labels = data_with_class_labels[:,1:]
input_data = data_without_class_labels[:,:-FORECAST_HORIZON]
target = data_without_class_labels[:,-FORECAST_HORIZON:]

# We make sure that the length of the time series is a multiple of 4 

NUM_INPUT_FEATURES = len(input_data[0]) 
values_to_cut = NUM_INPUT_FEATURES % 4
if values_to_cut != 0:
    input_data = input_data[:,values_to_cut:]
    NUM_INPUT_FEATURES = NUM_INPUT_FEATURES - values_to_cut

In [None]:
def dtw(ts1, ts2):
    LEN_TS1 = len(ts1)
    LEN_TS2 = len(ts2)
    dtw_matrix = np.zeros( (LEN_TS1, LEN_TS2), dtype=float )
    dtw_matrix[0,0] = abs(ts1[0]-ts2[0])
    
    for i in range(1, LEN_TS1):
        dtw_matrix[i,0] = dtw_matrix[i-1,0]+abs(ts1[i]-ts2[0])
        
    for j in range(1, LEN_TS2):
        dtw_matrix[0,j] = dtw_matrix[0,j-1]+abs(ts1[0]-ts2[j])

    for i in range(1, LEN_TS1):
        for j in range(1, LEN_TS2):
            dtw_matrix[i,j] = min(dtw_matrix[i-1,j-1], dtw_matrix[i-1,j], 
                            dtw_matrix[i, j-1]) + abs(ts1[i]-ts2[j])
            
    return dtw_matrix[ len(ts1)-1, len(ts2)-1 ]

In [None]:
def dc_activations(data, convolutional_filters):
    """
    Calculation of the activations of the distortion-aware convolutional layer.

    Inputs
    ------
    data : np.array 
      Two-dimensional array containing the input data, 
      each row of the array corresponds to one of the time series
    convolutional_filters : np.array
      Three-dimensional array containing the parameters of the dynamic 
      convolutional layer. The first index corresponds to the output channel
      of the convolution, the second index corresponds to the input channel 
      (the current implementation only works with 1 input channel, so the second
      index is always zero), the third index is the position within the local
      pattern corresponding to a convolutional filter
    """
    num_instances = len(data)
    length_of_time_series = len(data[0])
    num_conv_filters = len(convolutional_filters)
    conv_filter_size = len(convolutional_filters[0][0])

    activations = np.zeros( (num_instances, 2*num_conv_filters, 
                           length_of_time_series-conv_filter_size+1) )
    for i in range(num_instances):
        for j in range(length_of_time_series-conv_filter_size+1):
            for k in range(num_conv_filters):
                activations[i,k,j] = 1.0/(1.0 + dtw(convolutional_filters[k][0],
                                            data[i,j:j+conv_filter_size]))
    for i in range(num_instances):
        for j in range(length_of_time_series-conv_filter_size+1):
            for k in range(num_conv_filters):
                activations[i,num_conv_filters+k,j] = np.sum(convolutional_filters[k][0]*data[i,j:j+conv_filter_size])

    return activations

In [None]:
# Definition of the neural networks

CONV_FILTERS = 25
CONV_FILTER_SIZE = 9


class Net_CNN(nn.Module):
    def __init__(self):
        super(Net_CNN, self).__init__()
        num_units_fc = 100
        self.num_inputs_fc = int(CONV_FILTERS*(NUM_INPUT_FEATURES-CONV_FILTER_SIZE+1)/2)

        self.conv1 = nn.Conv1d(in_channels = 1, out_channels = CONV_FILTERS, 
                               kernel_size=CONV_FILTER_SIZE, padding = 0, stride = 1)
        self.max_pool = nn.MaxPool1d(2)
        self.fc = nn.Linear(self.num_inputs_fc, num_units_fc)
        self.out = nn.Linear(num_units_fc, FORECAST_HORIZON) 

    def forward(self, x):
        x = x.view(-1, 1, NUM_INPUT_FEATURES)
        x = self.conv1(x)
        x = self.max_pool(x)
        x = x.view(-1, self.num_inputs_fc)
        x = torch.relu(self.fc(x))
        x = self.out(x)
        return x



# Please note that the distortion-aware convolutional layer is initialized  
# using the parameters learned during the "initial stage" (in which a "usual" 
# convolutional network is trained). Once the initial stage is completed, 
# the parameters of the dynamic convoltuional layer are fixed, therefore,
# the activations of the dynamic convolutional layer will be pre-calculated 
# outside the network for efficient implementation.

class Net_DCNN(nn.Module):
    def __init__(self):
        super(Net_DCNN, self).__init__()
        num_units_fc = 100

        self.max_pool = nn.MaxPool1d(2)
        self.fc = nn.Linear(int(2*CONV_FILTERS*(NUM_INPUT_FEATURES-CONV_FILTER_SIZE+1)/2), num_units_fc)
        self.out = nn.Linear(num_units_fc, FORECAST_HORIZON) 

    def forward(self, x):
        x = x.view(-1, 2*CONV_FILTERS, NUM_INPUT_FEATURES-CONV_FILTER_SIZE+1)
        x = self.max_pool(x)
        x = x.view(-1,int(2*CONV_FILTERS*(NUM_INPUT_FEATURES-CONV_FILTER_SIZE+1)/2))
        x = torch.relu(self.fc(x))
        x = self.out(x)
        return x

In [None]:
# function used to evaluate the network 
def eval_net(net, test_input, test_target):
    test_dataset = torch.utils.data.TensorDataset( 
        torch.Tensor(test_input), 
        torch.Tensor(test_target)
    )
    testloader = torch.utils.data.DataLoader(test_dataset, batch_size=1)

    mae = 0
    mse = 0
    total = 0

    with torch.no_grad():
        for inputs, true_target in testloader:
            predicted_target = net(inputs).numpy()
            true_target = true_target.numpy()

            mae += float(np.mean(np.abs(predicted_target - true_target)))
            mse += float(np.mean((predicted_target - true_target)**2))
            total += 1

    return mse/total, mae/total

In [None]:
# functions used to distort time series

def atomic_elongational_noise(ts):
    n = len(ts)
    pos_del = random.randint(0,n-1)
    pos_elongate = random.randint(0,n-2)
    ts = list(ts)
    ts = ts[:pos_del] + ts[pos_del+1:] # delete a value
    ts = ts[:pos_elongate+1] + ts[pos_elongate:] # elongate a value
    return ts

def elongational_noise(timeseries_dataset, noise_level):
    timeseries_dataset_with_noise = []
    for i in range(len(timeseries_dataset)):
        ts = list(timeseries_dataset[i])
        for j in range(noise_level):
            ts = atomic_elongational_noise(ts)
        timeseries_dataset_with_noise.append(ts)
    return np.array(timeseries_dataset_with_noise)

In [None]:
# the main experimental loop

from sklearn.model_selection import KFold

mse_cnn  = [[],[],[],[]]
mse_dcnn = [[],[],[],[]]
mae_cnn  = [[],[],[],[]]
mae_dcnn = [[],[],[],[]]
kf = KFold(n_splits=10, random_state=42, shuffle=True)

fold = 0
for train_index, test_index in kf.split(input_data):
    fold = fold + 1

    train_data = input_data[train_index]
    train_target = target[train_index]
    test_data = input_data[test_index]
    test_target = target[test_index]

    # Training of CNN. This is simultaneously the initial stage of training of the DCNN.

    train_dataset = torch.utils.data.TensorDataset(
      torch.Tensor(train_data), 
      torch.Tensor(train_target) 
    )
    trainloader = torch.utils.data.DataLoader(
      train_dataset, shuffle=True, batch_size=16)

    cnn = Net_CNN()
    criterion = nn.MSELoss()
    optimizer = optim.Adam(cnn.parameters(), lr=1e-5)

    running_loss = 0.0
    running_n = 0

    print("Training CNN...")

    for epoch in range(500):  
        for input_batch, target_batch in trainloader:
            optimizer.zero_grad()

            prediction_batch = cnn(input_batch)

            loss = criterion(prediction_batch, target_batch)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            running_n = running_n + 1

        if epoch % 100 == 0:
            print("epoch: {:3d} loss: {:4.3f}".format(epoch, running_loss/running_n))
            running_loss = 0.0
            running_n = 0

    # Obtain the parameters of the dynamic convolutional layer, and 
    # precalculate its activations
    params = []
    for p in cnn.parameters():
        params.append(p)

    convolutional_filters = np.array(params[0].detach().numpy(), 
                                   dtype=float)

    dc_activations_train = dc_activations(train_data, convolutional_filters)

    # Train DCNN

    train_dataset = torch.utils.data.TensorDataset(
      torch.Tensor(dc_activations_train), 
      torch.Tensor(train_target) 
    )
    trainloader = torch.utils.data.DataLoader(
      train_dataset, shuffle=True, batch_size=16)


    dcnn = Net_DCNN()
    criterion = nn.MSELoss()
    optimizer = optim.Adam(dcnn.parameters(), lr=1e-5)

    running_loss = 0.0
    running_n = 0

    print("Training DCNN...")

    for epoch in range(500):  
        for input_batch, target_batch in trainloader:
            optimizer.zero_grad()

            prediction_batch = dcnn(input_batch) 

            loss = criterion(prediction_batch, target_batch)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            running_n = running_n + 1

        if epoch % 100 == 0:
            print("epoch: {:3d} loss: {:4.3f}".format(epoch, running_loss/running_n))
            running_loss = 0.0
            running_n = 0
  

    # Evaluation at different levels of distortion
    
    test_data1 = test_data_with_noise = elongational_noise(test_data, 1)
    test_data5 = test_data_with_noise = elongational_noise(test_data, 5)
    test_data10 = test_data_with_noise = elongational_noise(test_data, 10)
    
    testsets = [test_data, test_data1, test_data5, test_data10]
    noise_level = [0, 1, 5, 10]
    
    for i in range(4):
        a_mse, a_mae = eval_net(cnn, testsets[i], test_target)
        mse_cnn[i].append(a_mse)
        mae_cnn[i].append(a_mae)

        dc_activations_test = dc_activations(testsets[i], convolutional_filters)
        a_mse_dc, a_mae_dc = eval_net(dcnn, dc_activations_test, test_target)

        mse_dcnn[i].append(a_mse_dc)
        mae_dcnn[i].append(a_mae_dc)

        print(f"Fold: {fold:2d}, Distortion: {noise_level[i]}")
        print(f"  aMSE of CNN:  {a_mse:6.4f}")
        print(f"  aMSE of DCNN: {a_mse_dc:6.4f}")
        print(f"  aMAE of CNN:  {a_mae:6.4f}")
        print(f"  aMAE of DCNN: {a_mae_dc:6.4f}")

In [None]:
# print results, calculate p-values 

print(file_name_prefix.split('/')[-1])
print(f"Mean avgMSE CNN:   {np.mean(mse_cnn[0]):6.5f} {np.mean(mse_cnn[1]):6.5f} {np.mean(mse_cnn[2]):6.5f} {np.mean(mse_cnn[3]):6.5f}")
print(f"Mean avgMSE DCNN:  {np.mean(mse_dcnn[0]):6.5f} {np.mean(mse_dcnn[1]):6.5f} {np.mean(mse_dcnn[2]):6.5f} {np.mean(mse_dcnn[3]):6.5f}")
print(f"Std. avgMSE CNN:   {np.std(mse_cnn[0]):6.5f} {np.std(mse_cnn[1]):6.5f} {np.std(mse_cnn[2]):6.5f} {np.std(mse_cnn[3]):6.5f}")
print(f"Std. avgMSE DCNN:  {np.std(mse_dcnn[0]):6.5f} {np.std(mse_dcnn[1]):6.5f} {np.std(mse_dcnn[2]):6.5f} {np.std(mse_dcnn[3]):6.5f}")
print(f"p-value:           {scipy.stats.ttest_rel(mse_cnn[0], mse_dcnn[0])[1]:6.5f} "+
                        f" {scipy.stats.ttest_rel(mse_cnn[1], mse_dcnn[1])[1]:6.5f} "+
                        f" {scipy.stats.ttest_rel(mse_cnn[2], mse_dcnn[2])[1]:6.5f} "+
                        f" {scipy.stats.ttest_rel(mse_cnn[3], mse_dcnn[3])[1]:6.5f}")

print(f"Mean avgMAE CNN:   {np.mean(mae_cnn[0]):6.5f} {np.mean(mae_cnn[1]):6.5f} {np.mean(mae_cnn[2]):6.5f} {np.mean(mae_cnn[3]):6.5f}")
print(f"Mean avgMAE DCNN:  {np.mean(mae_dcnn[0]):6.5f} {np.mean(mae_dcnn[1]):6.5f} {np.mean(mae_dcnn[2]):6.5f} {np.mean(mae_dcnn[3]):6.5f}")
print(f"Std. avgMAE CNN:   {np.std(mae_cnn[0]):6.5f} {np.std(mae_cnn[1]):6.5f} {np.std(mae_cnn[2]):6.5f} {np.std(mae_cnn[3]):6.5f}")
print(f"Std. avgMAE DCNN:  {np.std(mae_dcnn[0]):6.5f} {np.std(mae_dcnn[1]):6.5f} {np.std(mae_dcnn[2]):6.5f} {np.std(mae_dcnn[3]):6.5f}")
print(f"p-value:           {scipy.stats.ttest_rel(mae_cnn[0], mae_dcnn[0])[1]:6.5f}"+
                        f" {scipy.stats.ttest_rel(mae_cnn[1], mae_dcnn[1])[1]:6.5f}"+
                        f" {scipy.stats.ttest_rel(mae_cnn[2], mae_dcnn[2])[1]:6.5f}"+
                        f" {scipy.stats.ttest_rel(mae_cnn[3], mae_dcnn[3])[1]:6.5f}")