# This notebook used generated random samples to simulate time series data, and use LSTM model to do the binary classification.

In [9]:
import random
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset

In [3]:
# imbalanced data set generation
def generate_imbalanced_data(num_samples, ratio):
    random.seed(32)
    data = []
    positive_samples = int(num_samples * ratio)
    positive_count = 0

    for _ in range(num_samples):
        feature1 = random.random()
        feature2 = random.random()

        # generate positive samples first
        if positive_count < positive_samples:
            label = 1
            positive_count += 1
        else:
            label = 0

        data.append([feature1, feature2, label])

    random.shuffle(data)

    return data

In [4]:
# LSTM model definition
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])

        return out

In [10]:
# data preparation
ratio = 0.0001
data = generate_imbalanced_data(500000, ratio)

X = [d[:-1] for d in data]
y = [d[-1] for d in data]

X_tensor = torch.tensor(X, dtype=torch.float)
y_tensor = torch.tensor(y, dtype=torch.long)

X_train, X_test, y_train, y_test = train_test_split(X_tensor, y_tensor, test_size=0.2, random_state=32)

train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [5]:
# Model parameters
input_size = 2
hidden_size = 50
num_layers = 1
num_classes = 2

In [7]:
# Instantize model, loss function and optimizer
model = LSTMModel(input_size, hidden_size, num_layers, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [14]:
# Model training
num_epochs = 10
for epoch in range(num_epochs):
    for inputs, labels in train_loader:
        # adjust the input shape to be (batch_size, seq_len, input_size), namely [64, 1, 2),
        # which is the expected input shape for LSTM model.
        # In this case, LSTM is applied to a simulated time series data, with time step of 1.
        inputs = inputs.unsqueeze(1)

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

Epoch [1/10], Loss: 0.0001
Epoch [2/10], Loss: 0.0001
Epoch [3/10], Loss: 0.0001
Epoch [4/10], Loss: 0.0001
Epoch [5/10], Loss: 0.0001
Epoch [6/10], Loss: 0.0001
Epoch [7/10], Loss: 0.0001
Epoch [8/10], Loss: 0.0001
Epoch [9/10], Loss: 0.0001
Epoch [10/10], Loss: 0.0001


In [18]:
# Model evaluation
model.eval()

with torch.no_grad():
    y_true = []
    y_pred = []
    fn_records = []
    correct = 0

    for inputs, labels in test_loader:
        inputs = inputs.unsqueeze(1)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        y_true.extend(labels.tolist())
        y_pred.extend(predicted.tolist())

        # Find false negative records
        for i, (label, pred) in enumerate(zip(labels, predicted)):
            if label == 1 and pred == 0:
                fn_records.append((inputs[i].tolist(), label.item(), pred.item()))

        # count correct predictions
        correct += (predicted == labels).sum().item()

    total = len(y_true)
    print(f'Accuracy on test set: {100 * correct / total}%')

    # confusion matrix
    conf_matrix = confusion_matrix(y_true, y_pred)
    print("confusion matrix:")
    print(conf_matrix)

    print("\n false negative records:")
    for record in fn_records:
        print(f"Input: {record[0]}, True Label: {record[1]}, predicted label: {record[2]}")

Accuracy on test set: 99.992%
confusion matrix:
[[99992     0]
 [    8     0]]

 false negative records:
Input: [[0.9534358978271484, 0.20167024433612823]], True Label: 1, predicted label: 0
Input: [[0.15954242646694183, 0.020099591463804245]], True Label: 1, predicted label: 0
Input: [[0.6887012124061584, 0.9517923593521118]], True Label: 1, predicted label: 0
Input: [[0.49170637130737305, 0.954414427280426]], True Label: 1, predicted label: 0
Input: [[0.04933225363492966, 0.038266588002443314]], True Label: 1, predicted label: 0
Input: [[0.9671679139137268, 0.9225612878799438]], True Label: 1, predicted label: 0
Input: [[0.7805687189102173, 0.3158128559589386]], True Label: 1, predicted label: 0
Input: [[0.09090518206357956, 0.06240416318178177]], True Label: 1, predicted label: 0
