# Auto-Encoder Proof-Of-Concept

In [None]:
import os
import sys

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.preprocessing import MinMaxScaler

import torch
from torch import nn
from torch.autograd import Variable
from torch.utils.data import TensorDataset, DataLoader

sys.path.append(os.path.join(os.pardir, os.pardir))
from amlutils.task2.loading import load_train_set

from IPython.display import display

In [None]:
EPOCHS = 20
BATCH_SIZE = 64
LEARNING_RATE = 0.001

In [None]:
X_train, y_train = load_train_set(os.path.join(os.pardir, 'data'))

display(X_train)

#X_train = X_train.fillna(0.0)
# Small hack, want that each signal is betweeb -1 and 1, so need to make
# signals the features and transform, then switch back.
X_train_scaled = X_train.T
X_train_scaled.values[:] = MinMaxScaler(feature_range=(-1,1)).fit_transform(X_train_scaled)
X_train_scaled = X_train_scaled.T

X_train = X_train.fillna(0.0)
X_train_scaled = X_train_scaled.fillna(0.0)

display(X_train_scaled)

In [None]:
# Compare original with auto-encoded signal.
num_rows = 1
num_cols = 1
fig, axs = plt.subplots(num_rows, num_cols, figsize=(12 * num_cols,6 * num_rows))

fig.set_facecolor('white')

axs.plot(list(range(17842)), X_train_scaled.iloc[0])
axs.set(xlabel='Time', ylabel='Signal', title='Class 0 Sample')

plt.show()

In [None]:
X_train_tensor = torch.tensor(X_train_scaled.values, dtype=torch.float)
# Unsqueeze X tensor to have another dimension representing the channel, this
# is needed for convolutions.
X_train_tensor = torch.unsqueeze(X_train_tensor, 1)
y_train_tensor = torch.tensor(y_train.values)

train_tensor = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(dataset=train_tensor, batch_size=BATCH_SIZE, shuffle=True)

In [None]:
from math import floor

def conv1d_out_dim(in_dim, kernel_size, stride, padding=0, dilation=1):
    return floor(float(in_dim + 2 * padding - dilation * (kernel_size - 1) - 1) / stride + 1)

def maxpool1d_out_dim(in_dim, kernel_size, stride, padding=0, dilation=1):
    return floor(float(in_dim + 2 * padding - dilation * (kernel_size - 1) - 1) / stride + 1)

def convtranspose1d_out_dim(in_dim, kernel_size, stride, padding=0, output_padding=0, dilation=1):
    return (in_dim - 1) * stride - 2 * padding + dilation * (kernel_size - 1) + output_padding + 1


In [None]:
feature_dim = 17842

conv_1_out_dim = conv1d_out_dim(feature_dim, 10, 3)
maxpool_1_out_dim = maxpool1d_out_dim(conv_1_out_dim, 10, 2)
conv_2_out_dim = conv1d_out_dim(maxpool_1_out_dim, 50, 3)
maxpool_2_out_dim = maxpool1d_out_dim(conv_2_out_dim, 10, 2)

convtranspose_1_out_dim = convtranspose1d_out_dim(maxpool_2_out_dim, 25, 3)
upsample_1_out_dim = maxpool_1_out_dim
convtranspose_2_out_dim = convtranspose1d_out_dim(upsample_1_out_dim, 50, 3, 0)
upsample_2_out_dim = feature_dim

print(feature_dim)
print(conv_1_out_dim)
print(maxpool_1_out_dim)
print(conv_2_out_dim)
print(maxpool_2_out_dim)
print(convtranspose_1_out_dim)
print(upsample_1_out_dim)
print(convtranspose_2_out_dim)
print(upsample_2_out_dim)

In [None]:
# Architecture based on: https://pythonwife.com/convolutional-autoencoders-opencv/

class ConvAutoEncoder(nn.Module):

    def __init__(self) -> None:
        super(ConvAutoEncoder, self).__init__()

        self.encoder = nn.Sequential(
            nn.Conv1d(in_channels=1, out_channels=1, kernel_size=10, stride=3),
            nn.BatchNorm1d(num_features=1),
            nn.Tanh(),
            nn.AvgPool1d(kernel_size=10, stride=2),
            nn.Dropout(p=0.3),
            #nn.Conv1d(in_channels=1, out_channels=1, kernel_size=25, stride=3),
            #nn.ReLU(True),
            #nn.MaxPool1d(kernel_size=10, stride=2)
        )

        self.decoder = nn.Sequential(
            #nn.ConvTranspose1d(in_channels=1, out_channels=1, kernel_size=25, stride=3),
            #nn.ReLU(True),
            #nn.Upsample(size=2961),
            nn.ConvTranspose1d(in_channels=1, out_channels=1, kernel_size=10, stride=3),
            nn.BatchNorm1d(num_features=1),
            nn.Tanh(),
            nn.Upsample(size=17842),
            nn.Dropout(p=0.3),
        )

    def forward(self, X):
        X = self.encoder(X)
        X = self.decoder(X)
        return X

In [None]:
ecg_auto_encoder = ConvAutoEncoder()

criterion = nn.MSELoss()
optimizer = torch.optim.AdamW(ecg_auto_encoder.parameters(), lr=LEARNING_RATE, weight_decay=1e-5)

total_loss = 0

losses = []

for epoch in range(EPOCHS):
    total_loss = 0

    for data in train_loader:
        signal, _ = data
        signal = Variable(signal)

        output = ecg_auto_encoder(signal)
        loss = criterion(output, signal)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss

    print(f'Epoch {epoch + 1}/{EPOCHS}: loss={total_loss}')
    losses.append(total_loss)

In [None]:
# Compare original with auto-encoded signal.
num_rows = 1
num_cols = 1
fig, axs = plt.subplots(num_rows, num_cols, figsize=(12 * num_cols,6 * num_rows))

fig.set_facecolor('white')

sample = 30
time_limit = 5000

axs.plot(list(range(time_limit)), X_train_scaled.iloc[sample, np.r_[:time_limit]], alpha=0.5, label='Original')

with torch.no_grad():
    ecg_auto_encoder.eval()
    signal_tensor = torch.tensor(X_train_scaled.iloc[sample].values, dtype=torch.float)
    signal_tensor = torch.unsqueeze(signal_tensor, 0)
    signal_tensor = torch.unsqueeze(signal_tensor, 0)
    enc_dec_signal = ecg_auto_encoder(signal_tensor)
    enc_dec_signal = torch.squeeze(enc_dec_signal)
    axs.plot(list(range(time_limit)), enc_dec_signal[:time_limit], label='Reconstructed')
axs.set(xlabel='Time', ylabel='Signal', title=f'Original vs. Reconstructed Signal No. {sample}')
axs.legend(loc='upper right')

plt.show()

## Classification with Encoded Signals

In [None]:
from sklearn.svm import LinearSVC
from sklearn.model_selection import cross_val_score

In [None]:
# Compute encoded version of training set.
with torch.no_grad():
    ecg_auto_encoder.eval()
    X_train_tensor = torch.tensor(X_train_scaled.values, dtype=torch.float)
    X_train_tensor = torch.unsqueeze(X_train_tensor, 1)
    X_train_enc = ecg_auto_encoder.encoder(signal_tensor)
    X_train_enc = torch.squeeze(X_train_enc)
    X_train_enc = pd.DataFrame(X_train_enc.numpy(), index=X_train.index)
    display(X_train_enc)

In [None]:
enc_valid_score = cross_val_score(
    LinearSVC(tol=1e-2),
    X_train_enc,
    pd.Series.ravel(y_train),
    cv=5,
    scoring='f1_micro',
    verbose=4,
    n_jobs=-1).mean()
print(enc_valid_score)

In [None]:
raw_valid_score = cross_val_score(
    LinearSVC(tol=1e-2),
    X_train,
    pd.Series.ravel(y_train),
    cv=5,
    scoring='f1_micro',
    verbose=4,
    n_jobs=-1).mean()
print(raw_valid_score)