In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import torch.nn.functional as F

class Attn(nn.Module):
    def __init__(self, h_dim):
        super(Attn, self).__init__()
        self.h_dim = h_dim
        self.main = nn.Sequential(
            nn.Linear(h_dim, 24),
            nn.ReLU(True),
            nn.Linear(24, 1)
        )

    def forward(self, encoder_outputs):
        b_size = encoder_outputs.size(0)
        attn_ene = self.main(encoder_outputs.view(-1, self.h_dim)) # (b, s, h) -> (b * s, 1)
        return F.softmax(attn_ene.view(b_size, -1), dim=1).unsqueeze(2) # (b*s, 1) -> (b, s, 1)

class AttnClassifier(nn.Module):
    def __init__(self, h_dim, c_num):
        super(AttnClassifier, self).__init__()
        self.attn = Attn(h_dim)
        self.main = nn.Linear(h_dim, c_num)

    def forward(self, encoder_outputs):
        attns = self.attn(encoder_outputs) #(b, s, 1)
        feats = (encoder_outputs * attns).sum(dim=1) # (b, s, h) -> (b, h)
        return F.log_softmax(self.main(feats)), attns

class Predictor(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers, seq_length):
        super(Predictor, self).__init__()
        self.embedding_layer = nn.Linear(input_size, 128)
        self.encoder = nn.LSTM(128, hidden_size, num_layers, batch_first=True)
        self.attn_classifier = AttnClassifier(hidden_size, output_size)
        self.decoder = nn.LSTM(128, hidden_size, num_layers, batch_first=True)
        self.output_layer = nn.Linear(hidden_size, output_size)
        self.seq_length = seq_length

    def forward(self, x):
        embedded = self.embedding_layer(x)
        # print(embedded)
        encoder_output, (encoder_hidden, _) = self.encoder(embedded)
        attention_output, attns = self.attn_classifier(encoder_output)
        # print("attn_logits", attn_logits.shape)
        # print(encoder_hidden.shape)
        decoder_output, _ = self.decoder(attention_output)
        predicted_output = self.output_layer(decoder_output)
        # decoder_input = torch.cat((attn_logits, encoder_hidden[-1].unsqueeze(0).repeat(self.seq_length, 1, 1)), dim=-1)
        # print(attn_logits.shape)

        return predicted_output

# Load CSV data
df = pd.read_csv('train_part.csv')
df = df.fillna(0)

# Assuming you want to predict 'Label', you can split the data into features (X) and labels (y)
X = df[['Signal1', 'Signal2', 'Signal3', 'Signal4']].values
y = df['Label'].values

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Convert NumPy arrays to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# Create DataLoader for training and testing
batch_size = 64
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last = True)

test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last = True)

# Instantiate the Predictor model
input_size = 4  # Assuming 4 Signal columns in your data
hidden_size = 64
output_size = 4  # Assuming you want to predict a single label
num_layers = 2
seq_length = 10  # Adjust according to your data

predictor_model = Predictor(input_size, hidden_size, output_size, num_layers, seq_length)

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(predictor_model.parameters(), lr=0.001)

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    predictor_model.train()
    total_loss = 0

    for inputs, labels in train_loader:
        # Forward pass
        outputs= predictor_model(inputs)

        loss = criterion(outputs, inputs)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(train_loader)}')

# Testing loop
predictor_model.eval()
test_loss = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = predictor_model(inputs)
        test_loss += criterion(outputs, inputs).item()

average_test_loss = test_loss / len(test_loader)
print(f'Average Test Loss: {average_test_loss}')


  return F.log_softmax(self.main(feats)), attns


Epoch 1/10, Loss: 0.16398479044437408
Epoch 2/10, Loss: 0.09632661007344723
Epoch 3/10, Loss: 0.08484400622546673
Epoch 4/10, Loss: 0.08514694310724735
Epoch 5/10, Loss: 0.08286971226334572
Epoch 6/10, Loss: 0.08160734362900257
Epoch 7/10, Loss: 0.08301303721964359
Epoch 8/10, Loss: 0.08213127963244915
Epoch 9/10, Loss: 0.08184721320867538
Epoch 10/10, Loss: 0.08211404457688332
Average Test Loss: 0.07942967116832733


In [None]:
import numpy as np
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler

# Function to compute deviations using the trained predictor_model
def compute_deviations(model, data_loader):
    model.eval()
    deviations = []

    with torch.no_grad():
        for inputs, labels in data_loader:
            outputs = model(inputs)
            deviation = torch.abs(outputs - inputs)
            # print(deviation.shape)
            deviations.extend(deviation.numpy())

    return deviations

# Compute deviations for training and test data
train_deviations = compute_deviations(predictor_model, train_loader)
# print(train_deviations)
# print("Test")
test_deviations = compute_deviations(predictor_model, test_loader)


# Scale deviations using StandardScaler
scaler = StandardScaler()
scaled_train_deviations = scaler.fit_transform(train_deviations)
scaled_test_deviations = scaler.transform(test_deviations)

# Train OCSVM
ocsvm_model = OneClassSVM(kernel='rbf', nu=0.1)  # You might need to adjust the hyperparameters
ocsvm_model.fit(scaled_train_deviations)

# Predict anomalies using the trained OCSVM
anomaly_predictions = ocsvm_model.predict(scaled_test_deviations)


true_labels = [1] * 64
true_labels = np.array(true_labels)


# Evaluate the performance of anomaly detection (you might need to adjust based on your evaluation metric)
accuracy = np.mean(anomaly_predictions == true_labels)
print(f'Anomaly Detection Accuracy: {accuracy * 100:.2f}%')


Anomaly Detection Accuracy: 85.94%


  return F.log_softmax(self.main(feats)), attns


#### Testing

In [None]:
# Load test data
test_df = pd.read_csv('test_part.csv')
test_df = test_df.fillna(0)

# Extract features and labels
X_test_new = test_df[['Signal1_of_ID', 'Signal2_of_ID', 'Signal3_of_ID', 'Signal4_of_ID']].values
y_test_new = test_df['Label'].values

# Convert NumPy arrays to PyTorch tensors
X_test_new_tensor = torch.tensor(X_test_new, dtype=torch.float32)
y_test_new_tensor = torch.tensor(y_test_new, dtype=torch.float32)

# Create DataLoader for the new test data
new_test_dataset = TensorDataset(X_test_new_tensor, y_test_new_tensor)
new_test_loader = DataLoader(new_test_dataset, batch_size=batch_size, shuffle=False)


In [None]:
# Compute deviations for the new test data
new_test_deviations = compute_deviations(predictor_model, new_test_loader)

# Scale deviations using StandardScaler
scaled_new_test_deviations = scaler.transform(new_test_deviations)

# Predict anomalies using the trained OCSVM
new_anomaly_predictions = ocsvm_model.predict(scaled_new_test_deviations)


# Assuming y_test_new_tensor is a binary label indicating normal (0) or anomaly (1)
true_labels_new = y_test_new_tensor.numpy()

true_labels_new = 1 - 2 * true_labels_new

# Evaluate the performance of anomaly detection
accuracy_new = np.mean(new_anomaly_predictions == true_labels_new)
print(f'Anomaly Detection Accuracy on New Test Data: {accuracy_new * 100:.2f}%')



Anomaly Detection Accuracy on New Test Data: 61.63%


  return F.log_softmax(self.main(feats)), attns
