## Model Running

In [1]:
import pandas as pd 
import numpy as np 
import matplotlib.pyplot as plt 
import seaborn as sns 

import torch
import torch.nn as nn 
from torch.utils.data import DataLoader, TensorDataset

from tqdm import tqdm

In [2]:
data = torch.load('datasets/spec_datasets.pt')

  data = torch.load('datasets/spec_datasets.pt')


In [3]:
X_train = data['X_train']
y_train = data['y_train']
X_dev = data['X_dev']
y_dev = data['y_dev']
X_test = data['X_test']
y_test = data['y_test']

In [4]:
X_train.shape

torch.Size([2600, 1, 36, 2048])

In [5]:
normal_indices = [i for i, label in enumerate(y_train) if label == 1]

In [6]:
X_train = X_train[normal_indices]

In [7]:
y_train = y_train[normal_indices]

In [8]:
X_train.shape, y_train.shape

(torch.Size([1430, 1, 36, 2048]), torch.Size([1430]))

In [9]:
X_train_input = X_train.reshape(X_train.shape[0], -1)
X_train_input.shape

torch.Size([1430, 73728])

In [10]:
X_dev_input = X_dev.reshape(X_dev.shape[0], -1)
X_test_input = X_test.reshape(X_test.shape[0], -1)
X_dev_input.shape, X_test_input.shape

(torch.Size([370, 73728]), torch.Size([20, 73728]))

In [11]:
class AnomalyDetector(nn.Module):
    def __init__(self):
        super(AnomalyDetector, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(73728, 6144),
            nn.ReLU(),
            nn.Linear(6144, 2048),
            nn.ReLU(),
            nn.Linear(2048, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, 8),
            nn.ReLU(),
            nn.Linear(8, 4),
            nn.ReLU(),
            nn.Linear(4, 2),
            nn.ReLU()
        )

        self.decoder = nn.Sequential(
            nn.Linear(2, 4),
            nn.ReLU(),
            nn.Linear(4, 8),
            nn.ReLU(),
            nn.Linear(8, 16),
            nn.ReLU(),
            nn.Linear(16, 32),
            nn.ReLU(),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Linear(512, 1024),
            nn.ReLU(),
            nn.Linear(1024, 2048),
            nn.ReLU(),
            nn.Linear(2048, 6144),
            nn.ReLU(),
            nn.Linear(6144, 73728),
            nn.Sigmoid()
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

# Initialize model
autoencoder = AnomalyDetector()

In [12]:
criterion = nn.L1Loss()  # MAE in PyTorch
optimizer = torch.optim.Adam(autoencoder.parameters(), lr=0.001)

In [13]:
train_dataset = TensorDataset(X_train_input)
test_dataset = TensorDataset(X_test_input, y_test)
dev_dataset = TensorDataset(X_dev_input, y_dev)

In [14]:
dataloader_train = DataLoader(train_dataset, batch_size = 8, shuffle = True)
dataloader_test = DataLoader(test_dataset, batch_size = 8, shuffle = True)
dataloader_dev = DataLoader(dev_dataset, batch_size = 8, shuffle = True)

In [15]:
num_epochs = 20

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
autoencoder.to(device)

for epoch in range(num_epochs):
    autoencoder.train()
    running_loss = 0.0
    for x_batch in tqdm(dataloader_train):
        # print(x_batch[0].shape)
        x_batch = x_batch[0].to(device)
        outputs = autoencoder(x_batch)
        loss = criterion(outputs, x_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    # if (epoch+1) % 20 == 0 :
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(dataloader_train)}")

cuda


 40%|████████████████████████████████▏                                                | 71/179 [07:06<10:48,  6.00s/it]


KeyboardInterrupt: 

In [None]:
from sklearn.metrics import confusion_matrix

In [None]:
autoencoder.eval()  # Set the model to evaluation mode
total_loss = 0.0
correct = 0
# error_correct = 0
total = 0
correct_cnt = 0
neg_pred_cnt = 0

threshold =3.5

all_labels = []
all_predictions = []

with torch.no_grad():  # No gradients needed for evaluation
    for x_batch, labels in dataloader_test:  # Assuming labels are included in your test loader
        # print(x_batch.shape)
        outputs = autoencoder(x_batch[0])
        loss = criterion(outputs, x_batch[0])  # Calculate reconstruction loss
        total_loss += loss.item()
        # Calculate reconstruction error
        reconstruction_error = torch.mean((outputs - x_batch) ** 2, dim = 1 )
        print(reconstruction_error)
        # Identify anomalies based on the threshold
        predictions = (reconstruction_error > threshold).float() 

        # Compare predictions with actual labels
        # error_mask = (labels == 0)
        # error_correct += ((predictions[error_mask] == 0).sum()).item()  
        # correct += (predictions == labels).sum().item()
        total += labels.size(0)
        all_labels.extend(labels.cpu().numpy())  # Move to CPU and convert to numpy
        all_predictions.extend(predictions.cpu().numpy())

# Calculate average loss and accuracy
conf_matrix = confusion_matrix(all_labels, all_predictions)
average_loss = total_loss / len(dataloader_test)
accuracy = correct / total

TN = conf_matrix[0, 0]  
FP = conf_matrix[0, 1] 
FN = conf_matrix[1, 0]  
TP = conf_matrix[1, 1]  
Neg_precision = TN/(FN+TN) if (FN+TN) > 0 else 0;
Neg_recall =TN/(FP+TN) if (FP+TN) > 0 else 0;

for i in range(len(all_labels)) :
    if all_labels[i] == 0 and all_predictions[i] == 0:
        correct_cnt += 1
for pred in all_predictions:
    if pred == 0:
        neg_pred_cnt += 1

print(f"Test Loss: {average_loss:.4f}, Accuracy: {accuracy:.4f},")
print(f'Neg_Precision: {Neg_precision:.4f} = {TN}/{FN + TN}, Neg_Recall: {Neg_recall:.4f} = {TN}/{FP + TN}, Neg_F1: {(Neg_precision * Neg_recall)/(Neg_precision + Neg_recall):.4f}') 
print(f'correct_cnt {correct_cnt}, neg_pred_cnt {neg_pred_cnt}')

