In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import nn, optim
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
from sklearn.decomposition import PCA

import pandas as pd
import numpy as np
from sklearn import svm
from sklearn import preprocessing
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, accuracy_score, recall_score

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

## Config

In [3]:
class config:
    TEST_SIZE = 0.30
    VAL_SIZE = 0.50
    N_COMPONENTS = 5
    SCALER = False
    SEED = 2022
    COLUMN_SELECT = False
    NTH_COLUMN = 10

## Basic data management

In [4]:
df = pd.read_csv('../data/covid_and_healthy_spectra.csv')
df.diagnostic = df.diagnostic.apply(lambda x: 1 if x == 'SARS-CoV-2' else 0)

In [525]:
y = df.diagnostic
X = df[df.columns.drop('diagnostic')]

In [526]:
def load_and_standardize_data(df, test_size, seed):
    df = df.values.reshape(-1, df.shape[1]).astype('float64')
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=seed)
    scaler = preprocessing.MinMaxScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)   
    return X_train, X_test, y_train, y_test, scaler

In [527]:
X_train, X_test, y_train, y_test, scaler = load_and_standardize_data(df, config.TEST_SIZE, config.SEED)

## Dataset

In [580]:
class DataBuilder(Dataset):
    def __init__(self, df, train=True):
        self.X_train, self.X_test, self.y_train, self.y_test, self.standardizer = load_and_standardize_data(df, config.TEST_SIZE, config.SEED)
        if train:
            self.x = torch.from_numpy(self.X_train)
            self.y = torch.from_numpy(np.array(self.y_train))
            self.len=self.x.shape[0]
        else:
            self.x = torch.from_numpy(self.X_test)
            self.y = torch.from_numpy(np.array(self.y_test))
            self.len=self.x.shape[0]
        del self.X_train, self.X_test, self.y_train, self.y_test
    def __getitem__(self,index):      
        return self.x[index], self.y[index]
    def __len__(self):
        return self.len

## DataLoader

In [529]:
traindata_set=DataBuilder(df, train=True)
testdata_set=DataBuilder(df, train=False)

trainloader=DataLoader(dataset=traindata_set,batch_size=16)
testloader=DataLoader(dataset=testdata_set,batch_size=16)

In [530]:
trainloader.dataset.x.shape

torch.Size([216, 900])

In [531]:
testloader.dataset.x.shape

torch.Size([93, 900])

## Define AE architecture

In [532]:
class AutoEncoder(nn.Module):
    def __init__(self):
        super(AutoEncoder, self).__init__()
        self.conv1 = nn.Conv1d(1, 8, 3)
        self.emb = nn.Conv1d(4, 1, 3)
        self.conv2 = nn.Conv1d(1, 8, 3)
        self.out = nn.Linear(888, 900)
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2) 
        x_emb = F.relu(self.emb(x))
        x = F.relu(self.conv2(x_emb))
        x = F.max_pool2d(x, 2)
        x = torch.flatten(x, 1)
        output = self.out(x)            
        return output, torch.flatten(x_emb,1)

In [533]:
model = AutoEncoder().to(device).double()
optimizer = optim.Adam(model.parameters(), lr=1e-2)
loss_mse = nn.MSELoss(reduction = 'sum')

In [534]:
epochs = 100
log_interval = 50
val_losses = []
train_losses = []
test_losses = []

## Define train and test loop

In [445]:
def train(epoch):
    model.train()
    train_loss = 0
    for batch_idx, (inp, y_true) in enumerate(trainloader):
        data = inp.to(device).double().unsqueeze(dim=0).permute(1, 0, 2)
        optimizer.zero_grad()
        out, _ = model(data)
        data = data.squeeze()
        loss = loss_mse(out, data)
        loss.backward()
        train_loss += loss.item()
        optimizer.step()
    if epoch % 5 == 0:        
        print('====> Epoch: {} Average training loss: {:.4f}'.format(
            epoch, train_loss / len(trainloader.dataset)))
        train_losses.append(train_loss / len(trainloader.dataset))

In [446]:
def test(epoch):
    with torch.no_grad():
        test_loss = 0
        for batch_idx, (inp, y_true) in enumerate(testloader):
            data = inp.to(device).double().unsqueeze(dim=0).permute(1, 0, 2)
            optimizer.zero_grad()
            out, _ = model(data)
            data = data.squeeze()
            loss = loss_mse(out, data)
            test_loss += loss.item()
            if epoch % 200 == 0:        
                print('====> Epoch: {} Average test loss: {:.4f}'.format(
                    epoch, test_loss / len(testloader.dataset)))
            test_losses.append(test_loss / len(testloader.dataset))

## Train the model

In [447]:
for epoch in range(1, epochs + 1):
    train(epoch)
    test(epoch)

====> Epoch: 5 Average training loss: 29.4436
====> Epoch: 10 Average training loss: 26.2881
====> Epoch: 15 Average training loss: 5.0575
====> Epoch: 20 Average training loss: 3.5390
====> Epoch: 25 Average training loss: 2.9539
====> Epoch: 30 Average training loss: 2.8743
====> Epoch: 35 Average training loss: 2.3016
====> Epoch: 40 Average training loss: 2.0881
====> Epoch: 45 Average training loss: 1.9538
====> Epoch: 50 Average training loss: 1.8435
====> Epoch: 55 Average training loss: 1.7973
====> Epoch: 60 Average training loss: 2.4968
====> Epoch: 65 Average training loss: 1.7209
====> Epoch: 70 Average training loss: 1.5796
====> Epoch: 75 Average training loss: 1.4676
====> Epoch: 80 Average training loss: 1.3753
====> Epoch: 85 Average training loss: 1.4206
====> Epoch: 90 Average training loss: 1.4081
====> Epoch: 95 Average training loss: 1.4646
====> Epoch: 100 Average training loss: 1.3729


## Save model weights

In [451]:
torch.save(model.state_dict(), '../model/ae_model.pth')

## Load the model

In [562]:
%%capture
model = AutoEncoder().double()
model.load_state_dict(torch.load('../model/ae_model.pth'))
model.eval()

In [563]:
traindata_set=DataBuilder(df, train=True)
testdata_set=DataBuilder(df, train=False)

trainloader=DataLoader(dataset=traindata_set,batch_size=1000)
testloader=DataLoader(dataset=testdata_set,batch_size=1000)

## Generate embeddings for training and test data

In [564]:
for batch_idx, (inp, y_train) in enumerate(trainloader):
    data = inp.to(device).double().unsqueeze(dim=0).permute(1, 0, 2)
    out, train_emb = model(data)
for batch_idx, (inp, y_test) in enumerate(testloader):
    data = inp.to(device).double().unsqueeze(dim=0).permute(1, 0, 2)
    out, test_emb = model(data)

## Concatenate embeddings and labels

In [565]:
train_emb, test_emb = train_emb.cpu().detach().numpy(), test_emb.cpu().detach().numpy()
y_train, y_test = y_train.cpu().detach().numpy(), y_test.cpu().detach().numpy()

In [566]:
embeddings = np.concatenate([train_emb, test_emb])
y_emb = np.concatenate([y_train, y_test])

## Prepare data for training

In [567]:
X_train, X_test, y_train, y_test = train_test_split(embeddings, y_emb, test_size=config.TEST_SIZE, random_state=config.SEED)
X_test, X_val, y_test, y_val = train_test_split(X_test, y_test, test_size=config.VAL_SIZE, random_state=config.SEED)

In [568]:
if config.SCALER:
    scaler = MinMaxScaler()
    scaler.fit(X_train)
    X_train = scaler.transform(X_train)
    X_test = scaler.transform(X_test)
    X_val = scaler.transform(X_val)

In [574]:
clf = svm.SVC()
clf.fit(X_train, y_train)

SVC()

In [575]:
cross_val_score(clf, X_train, y_train, cv=5, scoring='recall_macro')

array([0.91304348, 0.95326087, 0.90909091, 0.97619048, 1.        ])

In [576]:
y_test_pred = clf.predict(X_test)
y_val_pred = clf.predict(X_val)

## Generate precision, recall and accuracy report

In [577]:
precision, acc, recall = precision_score(y_test, y_test_pred), accuracy_score(y_test, y_test_pred), recall_score(y_test, y_test_pred)
print('Precision, recall and accuracy score for the test set: ', round(precision, 2), round(recall, 2), round(acc, 2))

Precision, recall and accuracy score for the test set:  0.95 1.0 0.98


In [578]:
precision, acc, recall = precision_score(y_val, y_val_pred), accuracy_score(y_val, y_val_pred), recall_score(y_val, y_val_pred)
print('Precision, recall and accuracy score for the val set: ', round(precision, 2), round(recall, 2), round(acc, 2))

Precision, recall and accuracy score for the val set:  1.0 0.79 0.87
