In [None]:
!pip install pyod
!pip install scikit-learn
!pip install lightgbm xgboost

In [None]:
import json
import numpy as np
import ast

def return_data(path_file="/content/RATs-Uni-TSImage_Reason.json", key="TSAD_test"):

  with open(path_file, "r", encoding="utf-8") as f:
    data = json.load(f)


  tsad_test = data.get(key, {})

  observations = {}
  labels = {}
  for key, item in tsad_test.items():
    if item!=None:
      observation = item.get("Observation", "")
      label_text = item.get("Label", "")

      label = 1 if "Anomaly"==label_text else 0
      len_obs=len(observation)
      if len_obs not in observations:
        observations[len_obs]=[observation]
        labels[len_obs]=[label]
      else:
        observations[len_obs].append(observation)
        labels[len_obs].append(label)
  return tsad_test, observations, labels

def return_data2(path_file="/content/RATs-Uni-TSImage_Reason.json", key="TSAD_test"):

  with open(path_file, "r", encoding="utf-8") as f:
    data = json.load(f)


  tsad_test = data.get(key, {})

  observations = {}
  labels = {}
  for key, item in tsad_test.items():
    if item!=None:
      observation_multi = item.get("Observation", "")
      label_text = item.get("Label", "")

      label = 1 if "Anomaly"==label_text else 0

      observation=[]
      observation_multi = '{' + observation_multi.replace('; ', ', ') + '}'
      observation_multi = ast.literal_eval(observation_multi)

      for f in observation_multi.keys():
        obs = observation_multi[f]

        observation+=obs
      len_obs=len(observation)
      if len_obs not in observations:
        observations[len_obs]=[observation]
        labels[len_obs]=[label]
      else:
        observations[len_obs].append(observation)
        labels[len_obs].append(label)
  return tsad_test, observations, labels

tsad_test, obs_test, labels_test = return_data("your_test_data.json", "TSAD_test")
tsad_train, obs_train, labels_train = return_data("your_training_data.json", "TSAD_train")
for key in obs_test.keys():
  obs_test[key] = np.array(obs_test[key])
  obs_train[key] = np.array(obs_train[key])
  labels_test[key] = np.array(labels_test[key])
  labels_train[key] = np.array(labels_train[key])

In [None]:
from pyod.models.knn import KNN   # kNN detector
from pyod.utils.data import generate_data
from sklearn.metrics import precision_score, recall_score, f1_score

def knn_clf(X_train, X_test):
  clf = KNN()
  clf.fit(X_train)
  y_train_pred = clf.labels_  # binary labels (0: inliers, 1: outliers)
  y_train_scores = clf.decision_scores_  # raw outlier scores
  y_test_pred = clf.predict(X_test)  # outlier labels (0 or 1)
  # y_test_scores = clf.decision_function(X_test)
  return y_test_pred

true_labels = []
pred_labels=[]
for key in obs_test.keys():
  y_test_scores = knn_clf(obs_train[key], obs_test[key])
  pred_labels+=y_test_scores.tolist()
  true_labels+=labels_test[key].tolist()
label_precision = precision_score(true_labels, pred_labels, zero_division=0)
label_recall = recall_score(true_labels, pred_labels, zero_division=0)
label_f1 = f1_score(true_labels, pred_labels, zero_division=0)
label_precision,label_recall,label_f1

In [None]:
from pyod.models.lof import LOF   # kNN detector
from pyod.utils.data import generate_data
from sklearn.metrics import precision_score, recall_score, f1_score

def lof_clf(X_train, X_test):
  clf = LOF()
  clf.fit(X_train)
  y_train_pred = clf.labels_  # binary labels (0: inliers, 1: outliers)
  y_train_scores = clf.decision_scores_  # raw outlier scores
  y_test_pred = clf.predict(X_test)  # outlier labels (0 or 1)
  # y_test_scores = clf.decision_function(X_test)
  return y_test_pred

true_labels = []
pred_labels=[]
for key in obs_test.keys():
  y_test_scores = lof_clf(obs_train[key], obs_test[key])
  pred_labels+=y_test_scores.tolist()
  true_labels+=labels_test[key].tolist()
label_precision = precision_score(true_labels, pred_labels, zero_division=0)
label_recall = recall_score(true_labels, pred_labels, zero_division=0)
label_f1 = f1_score(true_labels, pred_labels, zero_division=0)
label_precision,label_recall,label_f1

In [None]:
from pyod.models.ae1svm import AE1SVM
from pyod.utils.data import generate_data
from sklearn.metrics import precision_score, recall_score, f1_score

def data_clf(X_train, X_test):
  clf = AE1SVM()
  clf.fit(X_train)
  y_train_pred = clf.labels_  # binary labels (0: inliers, 1: outliers)
  y_train_scores = clf.decision_scores_  # raw outlier scores
  y_test_pred = clf.predict(X_test)  # outlier labels (0 or 1)
  # y_test_scores = clf.decision_function(X_test)
  return y_test_pred

true_labels = []
pred_labels=[]
for key in obs_test.keys():
  y_test_scores = data_clf(obs_train[key], obs_test[key])
  pred_labels+=y_test_scores.tolist()
  true_labels+=labels_test[key].tolist()
label_precision = precision_score(true_labels, pred_labels, zero_division=0)
label_recall = recall_score(true_labels, pred_labels, zero_division=0)
label_f1 = f1_score(true_labels, pred_labels, zero_division=0)
label_precision,label_recall,label_f1

In [None]:
from pyod.models.deep_svdd  import DeepSVDD
from pyod.utils.data import generate_data
from sklearn.metrics import precision_score, recall_score, f1_score

def data_clf(X_train, X_test):
  clf = DeepSVDD(len(X_train[0]), epochs=5)
  clf.fit(X_train)
  y_train_pred = clf.labels_  # binary labels (0: inliers, 1: outliers)
  y_train_scores = clf.decision_scores_  # raw outlier scores
  y_test_pred = clf.predict(X_test)  # outlier labels (0 or 1)
  # y_test_scores = clf.decision_function(X_test)
  return y_test_pred

true_labels = []
pred_labels=[]
for key in obs_test.keys():
  y_test_scores = data_clf(obs_train[key], obs_test[key])
  pred_labels+=y_test_scores.tolist()
  true_labels+=labels_test[key].tolist()
label_precision = precision_score(true_labels, pred_labels, zero_division=0)
label_recall = recall_score(true_labels, pred_labels, zero_division=0)
label_f1 = f1_score(true_labels, pred_labels, zero_division=0)
label_precision,label_recall,label_f1

In [None]:
from lightgbm import LGBMClassifier
from pyod.utils.data import generate_data
from sklearn.metrics import precision_score, recall_score, f1_score

def data_clf(X_train, y_train, X_test):
  clf = LGBMClassifier()
  clf.fit(X_train, y_train)
  y_test_pred = clf.predict(X_test)  # outlier labels (0 or 1)
  # y_test_scores = clf.decision_function(X_test)
  return y_test_pred

true_labels = []
pred_labels=[]
for key in obs_test.keys():
  y_test_scores = data_clf(obs_train[key], labels_train[key], obs_test[key])
  pred_labels+=y_test_scores.tolist()
  true_labels+=labels_test[key].tolist()
label_precision = precision_score(true_labels, pred_labels, zero_division=0)
label_recall = recall_score(true_labels, pred_labels, zero_division=0)
label_f1 = f1_score(true_labels, pred_labels, zero_division=0)
label_precision,label_recall,label_f1

In [None]:
import xgboost as xgb
from pyod.utils.data import generate_data
from sklearn.metrics import precision_score, recall_score, f1_score

def data_clf(X_train, y_train, X_test):
  clf = xgb.XGBClassifier()
  clf.fit(X_train, y_train)
  y_test_pred = clf.predict(X_test)  # outlier labels (0 or 1)
  # y_test_scores = clf.decision_function(X_test)
  return y_test_pred

true_labels = []
pred_labels=[]
for key in obs_test.keys():
  y_test_scores = data_clf(obs_train[key], labels_train[key], obs_test[key])
  pred_labels+=y_test_scores.tolist()
  true_labels+=labels_test[key].tolist()
label_precision = precision_score(true_labels, pred_labels, zero_division=0)
label_recall = recall_score(true_labels, pred_labels, zero_division=0)
label_f1 = f1_score(true_labels, pred_labels, zero_division=0)
label_precision,label_recall,label_f1

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import precision_score, recall_score, f1_score
import numpy as np

# --- 1. Define the LSTM Model ---
class LSTMClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers, dropout=0.5):
        super(LSTMClassifier, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout)
        # Using a linear layer for classification from the last hidden state
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid() # For binary classification

    def forward(self, x):
        # x shape: (batch_size, sequence_length, input_dim)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)

        # We pass all the timesteps to the LSTM
        # out shape: (batch_size, sequence_length, hidden_dim)
        # hidden, cell: (num_layers, batch_size, hidden_dim)
        out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))

        # We take the output from the last time step for classification
        # hn[-1, :, :] gets the hidden state of the last layer, across all batches
        out = self.fc(hn[-1, :, :])
        out = self.sigmoid(out) # Apply sigmoid for binary classification probability
        return out

# --- 2. Training and Prediction Function for LSTM ---
def data_clf_lstm(X_train, y_train, X_test, hidden_dim=64, num_layers=2, learning_rate=0.01, num_epochs=5):
    # Example reshaping if X_train is (num_samples, num_features)
    # and you want to treat each sample as a sequence of length 1.
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32).unsqueeze(2) # (samples, 1, features)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1) # (samples, 1) for BCEWithLogitsLoss or (samples) for BCELoss
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32).unsqueeze(2)

    # Move to GPU if available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = LSTMClassifier(1, hidden_dim, output_dim=1, num_layers=num_layers).to(device)

    # Binary Cross Entropy Loss is suitable for binary classification
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Create DataLoader for batch processing
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

    # --- Training Loop ---
    model.train() # Set model to training mode
    for epoch in range(num_epochs):
        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)

            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
        # print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item():.4f}") # Optional: print loss per epoch

    # --- Prediction ---
    model.eval() # Set model to evaluation mode
    with torch.no_grad(): # Disable gradient calculation for inference
        X_test_tensor = X_test_tensor.to(device)
        y_test_pred_prob = model(X_test_tensor).squeeze().cpu().numpy()

    # Convert probabilities to binary labels (0 or 1)
    y_test_pred = (y_test_pred_prob > 0.5).astype(int) # Threshold at 0.5

    return y_test_pred

true_labels = []
pred_labels = []
for key in obs_test.keys():
    # input_dim should be the number of features in your time series data
    # Here, it's num_features assuming each time step is a vector of num_features
    y_test_pred = data_clf_lstm(obs_train[key], labels_train[key], obs_test[key])
    pred_labels += y_test_pred.tolist()
    true_labels += labels_test[key].tolist()

label_precision = precision_score(true_labels, pred_labels, zero_division=0)
label_recall = recall_score(true_labels, pred_labels, zero_division=0)
label_f1 = f1_score(true_labels, pred_labels, zero_division=0)

print(f"LSTM Classification Results:")
print(f"Precision: {label_precision:.4f}")
print(f"Recall: {label_recall:.4f}")
print(f"F1 Score: {label_f1:.4f}")