In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
# from sklearn.preprocessing import StandardScaler
import joblib

from sklearn.preprocessing import (
    StandardScaler,        # z-score (mean 0, std 1)
    MinMaxScaler,          # linear to [0,1]
    RobustScaler,          # median / IQR
    QuantileTransformer,   # empirical CDF → normal/uniform
    PowerTransformer       # Box-Cox / Yeo-Johnson
)

import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F
import torch.optim as optim
from torch import nn, Tensor
import math
import numpy as np

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=256, device='cpu'):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe) # register as non-learnable parameter
        self.d_model = d_model #number of features in the embeddings
        self.pe = self.pe.to(device)
        
    def forward(self, x):
        # print("Shape of x:", x.shape)
        # print("Shape of positional encoding:", self.pe.shape)
        x = x + self.pe[:x.size(0), :, :] # adding positional encoding along the sequence len
        return self.dropout(x)


class LSTM_Transformer_BinaryClassifier(nn.Module):
    def __init__(self, input_dim, d_model, nhead, num_layers, dim_feedforward, max_len, dropout, lstm_hidden_size, lstm_n_layers, device='cpu'):
        super(LSTM_Transformer_BinaryClassifier, self).__init__()
        # feature_dim (embedding dimension) is omited since data is already embedded
        self.d_model = d_model

        self.lstm = nn.LSTM(d_model, lstm_hidden_size, lstm_n_layers, batch_first=True, bidirectional=True)
        self.fc_lstm = nn.Linear(lstm_hidden_size * 2, d_model)  

        self.input_transform = nn.Linear(input_dim, d_model)
        
        self.pos_encoder = PositionalEncoding(d_model, dropout, max_len, device=device)
        encoder_layers = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout)

        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)

        self.classifier = nn.Linear(d_model, 1)  # 1 output for binary classification
        
        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        self.classifier.bias.data.zero_()
        self.classifier.weight.data.uniform_(-initrange, initrange)

    # Transformers First
    def forward(self, src, src_mask=None):
        # Positional encoding
        src = self.input_transform(src)
        src = src.permute(1, 0, 2)  # data_collate padding result shape : [batch_size, sequence_length, feature_dim] => [sequence_length, batch_size, feature_dim] 
        src = src * math.sqrt(self.d_model)
        src = self.pos_encoder(src)

        transformer_out = self.transformer_encoder(src, src_key_padding_mask=src_mask)
        # Pass through LSTM after Transformer
        transformer_out = transformer_out.permute(1, 0, 2)  # [batch_size, sequence_length, feature_dim]
        lstm_out, _ = self.lstm(transformer_out)
        output = self.fc_lstm(lstm_out[:, -1, :])  # Take the output of the last timestep from LSTM
        #output = transformer_out.squeeze(1)
        output = self.classifier(output)
        # output = torch.sigmoid(output)
        output = output.squeeze(-1)

        return output
    

# dataset_path = "F+R-AES.csv" #TPR: 99.59% FPR: 0.57%
# dataset_path = "F+F-AES.csv" # TPR: 100% FPR: 0.19%

# dataset_path = "F+R-RSA.csv" # TPR: 99.9% FPR: 0.25%
# dataset_path = "F+F-RSA.csv" # TPR: 99.78% FPR: 0.23%

# dataset_path = "F+R-LibGDK.csv" # TPR: 98.94% FPR: 0.83%

# dataset_path = "Spectre-PHT.csv" # TPR: 100% FPR: 0.2%
# dataset_path = "Spectre-BTB.csv" # TPR: 98.63% FPR: 0.33% 
# dataset_path = "Spectre-RSB.csv"  # TPR: 99.13% FPR: 0.26%
dataset_path = "Spectre-STL.csv" # TPR: 99.64% FPR: 0.54%
model_path = "onnxmain"


# Parameters (must match those used in training)
sequence_length = 1
batch_size = 32
input_dim = 8
d_model = 512
nhead = 8
num_layers = 4
dim_feedforward = 512
lstm_n_layers = 2
lstm_hidden_size = 64
max_len = 256
dropout = 0.5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

df_inference = pd.read_csv(dataset_path)

# Extract the ground truth labels (assumed to be in the last column)
y_true = df_inference.iloc[:, -1].values.astype(int)

x1 = df_inference.iloc[:, 1]
x2 = df_inference.iloc[:, 2]
x3 = df_inference.iloc[:, 3]
x4 = df_inference.iloc[:, 4]
x5 = df_inference.iloc[:, 5]
x6 = df_inference.iloc[:, 6]
x7 = df_inference.iloc[:, 7]
x8 = df_inference.iloc[:, 8]


# Combine features
features = pd.concat([x1, x2, x3, x4, x5, x6, x7, x8], axis=1)


# Load the scaler saved during training and transform the features
scaler = joblib.load(model_path + ".pkl")
X_scaled = scaler.transform(features)


def create_sequences_inference(data, seq_length):
    """Create sequences of length 'seq_length' from the given data."""
    sequences = []
    for i in range(len(data) - seq_length + 1):
        sequences.append(data[i:i+seq_length])
    return np.array(sequences)

X_seq = create_sequences_inference(X_scaled, sequence_length)
X_seq = torch.tensor(X_seq, dtype=torch.float32)

inference_dataset = TensorDataset(X_seq)
inference_loader = DataLoader(inference_dataset, batch_size=batch_size, shuffle=False)


model = LSTM_Transformer_BinaryClassifier(
    input_dim, d_model, nhead=nhead, num_layers=num_layers,
    dim_feedforward=dim_feedforward, max_len=max_len, dropout=dropout,
    lstm_hidden_size=lstm_hidden_size, lstm_n_layers=lstm_n_layers,
    device=device
)

# Load the model
model.load_state_dict(torch.load(model_path + ".pth", map_location=device))
model.to(device)
model.eval()

# Run inference and collect predictions
all_predictions = []
all_probs       = []
with torch.no_grad():
    for batch in inference_loader:
        inputs = batch[0].to(device)
        outputs = model(inputs)
        # Apply rounding as in training (output >= 0 becomes 1)
        #predictions = torch.round(outputs)
        probs      = torch.sigmoid(outputs)
        all_probs.extend(probs.cpu().numpy().flatten())
        predictions  = (probs >= 0.7).float()
        all_predictions.extend(predictions.cpu().numpy().flatten())

all_predictions = np.array(all_predictions, dtype=int)
all_probs       = np.array(all_probs,       dtype=float)

# Ensure the number of predictions matches the ground truth labels length
y_true = y_true[:len(all_predictions)]

TP = np.sum((all_predictions == 1) & (y_true == 1))
FP = np.sum((all_predictions == 1) & (y_true == 0))
FN = np.sum((all_predictions == 0) & (y_true == 1))
TN = np.sum((all_predictions == 0) & (y_true == 0))

TPR = TP / (TP + FN) if (TP + FN) > 0 else 0
FPR = FP / (FP + TN) if (FP + TN) > 0 else 0
FNR = FN / (TP + FN) if (TP + FN) > 0 else 0
TNR = TN / (TN + FP) if (TN + FP) > 0 else 0

print("Confusion Matrix:")
print(f"TP: {TP}, FP: {FP}, FN: {FN}, TN: {TN}")
print("Evaluation Metrics:")
print(f"TPR: {TPR*100:.2f}%")
print(f"FPR: {FPR*100:.2f}%")
print(f"FNR: {FNR*100:.2f}%")
print(f"TNR: {TNR*100:.2f}%")

def get_classification_type(gt, pred):
    if gt == 1 and pred == 1:
        return "TP"
    elif gt == 0 and pred == 0:
        return "TN"
    elif gt == 0 and pred == 1:
        return "FP"
    elif gt == 1 and pred == 0:
        return "FN"
    else:
        return "Undefined"
    
# Save results
gt_col = df_inference.columns[-1]
df_inference['Predicted'] = all_predictions[:len(df_inference)]
df_inference['Result'] = df_inference.apply(lambda row: get_classification_type(row[gt_col], row['Predicted']), axis=1)
df_inference['Prob']      = all_probs[:len(df_inference)]

df_inference.to_csv('inference_output_with_metrics.csv', index=False)
print("Inference complete. Results saved to 'inference_output_with_metrics.csv'.")
