In [1]:
import seaborn as sns
import numpy as np
import torch
import torch.nn as nn
import sys
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_recall_curve
import matplotlib.pyplot as plt


In [2]:
# # Generate synthetic sequential data
n_samples = 100
n_features = 3
sequence_length = 20

# Generate random features
features = np.random.rand(n_samples, sequence_length, n_features)
labels = np.random.randint(0, 2, size=n_samples)

In [3]:
# Split data into train, validation, and test sets
X_train, X_temp, y_train, y_temp = train_test_split(features, labels, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

In [4]:
# Normalize features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train.reshape(-1, n_features)).reshape(-1, sequence_length, n_features)
X_val_scaled = scaler.transform(X_val.reshape(-1, n_features)).reshape(-1, sequence_length, n_features)
X_test_scaled = scaler.transform(X_test.reshape(-1, n_features)).reshape(-1, sequence_length, n_features)

In [5]:
# Convert data to PyTorch tensors
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_val_tensor = torch.tensor(X_val_scaled, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)


In [6]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(LSTMModel, self).__init__()
        self.lstm1 = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.lstm2 = nn.LSTM(hidden_size, hidden_size, batch_first=True)  # Additional LSTM layer
        self.fc1 = nn.Linear(hidden_size, 64)  # First dense layer
        self.fc2 = nn.Linear(64, 32)  # Second dense layer
        self.fc3 = nn.Linear(32, 1)  # Single output neuron
        self.fc4 = nn.Sigmoid()  # Sigmoid activation for binary classification

    def forward(self, x):
        out, (h_n, c_n) = self.lstm1(x)
        out, _ = self.lstm2(out, (h_n, c_n))  # Pass through the second LSTM layer
        out = self.fc1(out[:, -1, :])
        out = self.fc2(out)
        out = self.fc3(out)
        out = self.fc4(out)
        return out

In [7]:
# Instantiate the model
input_size = n_features
hidden_size = 64
output_size = 1
model = LSTMModel(input_size, hidden_size)

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [8]:
# Initialize empty lists to store training and validation losses
train_losses = []
val_losses = []

# Training loop
n_epochs = 100
for epoch in range(n_epochs):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor.unsqueeze(1))
    loss.backward()
    optimizer.step()

    # Validation
    model.eval()
    with torch.no_grad():
        val_outputs = model(X_val_tensor)
        val_loss = criterion(val_outputs, y_val_tensor.unsqueeze(1))

    # Append losses to lists
    train_losses.append(loss.item())
    val_losses.append(val_loss.item())

    # print(f"Epoch [{epoch+1}/{n_epochs}], Loss: {loss.item():.4f}, Val Loss: {val_loss.item():.4f}")
    # Training progress bar
    label = f"{epoch+1}/{n_epochs} epochs"
    progress = (epoch+1)/ n_epochs
    progress_bar_len = 50
    filled_len = int(progress_bar_len * progress)
    bar = '=' * filled_len + '-' * (progress_bar_len - filled_len)
    sys.stdout.write(f'\rTraining in progress: [{bar}] {progress * 100:.1f}% [{label}]')
    sys.stdout.flush()


# Plot the losses
# plt.figure(figsize=(8, 6))
# plt.plot(range(1, len(train_losses) + 1), train_losses, label="Training Loss")
# plt.plot(range(1, len(val_losses) + 1), val_losses, label="Validation Loss")
# plt.xlabel("Epoch")
# plt.ylabel("Loss")
# plt.title("Training and Validation Losses")
# plt.grid(True)
# plt.legend()
# plt.show()

Epoch [1/100], Loss: 0.2483, Val Loss: 0.2541
Training in progress: [--------------------------------------------------] 1.0% [1/100 epochs]Epoch [2/100], Loss: 0.2481, Val Loss: 0.2548
Training in progress: [=-------------------------------------------------] 2.0% [2/100 epochs]Epoch [3/100], Loss: 0.2479, Val Loss: 0.2552
Training in progress: [=-------------------------------------------------] 3.0% [3/100 epochs]Epoch [4/100], Loss: 0.2478, Val Loss: 0.2555
Training in progress: [==------------------------------------------------] 4.0% [4/100 epochs]Epoch [5/100], Loss: 0.2477, Val Loss: 0.2555
Training in progress: [==------------------------------------------------] 5.0% [5/100 epochs]Epoch [6/100], Loss: 0.2475, Val Loss: 0.2553
Training in progress: [===-----------------------------------------------] 6.0% [6/100 epochs]Epoch [7/100], Loss: 0.2472, Val Loss: 0.2550
Training in progress: [===-----------------------------------------------] 7.0% [7/100 epochs]Epoch [8/100], Loss:

In [9]:
# Evaluate on test set
model.eval()
with torch.no_grad():
    test_outputs = model(X_test_tensor)

    test_loss = criterion(test_outputs, y_test_tensor.unsqueeze(1))
    print(f"Test Loss: {test_loss.item():.4f}")

Test Loss: 0.3753


In [None]:
# Evaluation
precision, recall, thresholds = precision_recall_curve(y_test_tensor, test_outputs)

# Calculate F1 scores for each threshold
f1_scores = 2 * (precision * recall) / (precision + recall)

# Find the optimal threshold that maximizes the F1 score
optimal_threshold = thresholds[f1_scores.argmax()]
max_f1_score = f1_scores.max()

print(f"Optimal Threshold: {optimal_threshold:.4f}")
print(f"Max F1 Score: {max_f1_score:.4f}")

# # Plot the precision-recall curve
# plt.figure(figsize=(8, 6))
# plt.plot(recall, precision, label='Precision-Recall Curve')
# plt.xlabel('Recall')
# plt.ylabel('Precision')
# plt.title('Precision-Recall Curve')
# plt.grid(True)
# plt.legend()
# plt.show()


In [None]:
# Plot the F1 curve
# plt.figure(figsize=(8, 6))
# plt.plot(thresholds, f1_scores[:-1], label='F1 Score')
# plt.axvline(x=optimal_threshold, color='r', linestyle='--', label='Optimal Threshold')
# plt.xlabel('Threshold')
# plt.ylabel('F1 Score')
# plt.title('F1 Curve')
# plt.grid(True)
# plt.legend()
# plt.show()

In [None]:
# Plot the Confusion matrix

# test_outputs = (test_outputs >= optimal_threshold).float()  # Convert probabilities to 0 or 1
# # test_outputs.squeeze(-1)

# cm = confusion_matrix(y_test_tensor, test_outputs)
# labels = ['No Robbery', 'Robbery']

# # Transpose the confusion matrix and swap axis labels
# cm_transposed = cm.T

# plt.figure(figsize=(8, 6))
# sns.heatmap(cm_transposed, annot=True, fmt='d', cmap='Blues', cbar=False,
#             xticklabels=labels, yticklabels=labels)  # Set labels for x and y axes
# plt.title('Confusion Matrix')
# plt.xlabel('True Labels')
# plt.ylabel('Predicted Labels')
# plt.show()

In [10]:
# from datetime import datetime

# timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# Save the model with the time stamp in the file name
torch.save(model.state_dict(), f'model.pt')


In [11]:
# Create an instance of your LSTM model
model = LSTMModel(input_size, hidden_size)

# Load the saved weights
model.load_state_dict(torch.load('model.pt'))
model.eval()  # Set the model to evaluation mode

LSTMModel(
  (lstm1): LSTM(3, 64, batch_first=True)
  (lstm2): LSTM(64, 64, batch_first=True)
  (fc1): Linear(in_features=64, out_features=64, bias=True)
  (fc2): Linear(in_features=64, out_features=32, bias=True)
  (fc3): Linear(in_features=32, out_features=1, bias=True)
  (fc4): Sigmoid()
)

---------------------**Activate API Endpoint** before proceeding-------------------------

In [12]:
import requests


In [20]:
for i in range(int(n_samples*0.15)):
    input_data = {
    "input":  X_train_scaled.tolist()[i] 
}
    response = requests.post('http://localhost:5000/predict', json=input_data).json()

    threshold = 0.5
    RBP = float(f"{response['output']:.2f}")
    # RBP = 1

    actualRBP = y_train.tolist()[i]
    print("\n", i, "RBP =", RBP, "| actual RBP =", actualRBP, end=" ")
    # if RBP >= threshold:
    #     print("| WARNING! Impending robbery.", end="")
    if ((RBP >= threshold) & (actualRBP == 1)) | ((RBP < threshold) & (actualRBP == 0)):
        print("| CORRECT", end="")



 0 RBP = 1.0 | actual RBP = 1 | CORRECT
 1 RBP = 0.0 | actual RBP = 0 | CORRECT
 2 RBP = 0.88 | actual RBP = 1 | CORRECT
 3 RBP = 1.0 | actual RBP = 1 | CORRECT
 4 RBP = 0.12 | actual RBP = 0 | CORRECT
 5 RBP = 0.0 | actual RBP = 0 | CORRECT
 6 RBP = 0.86 | actual RBP = 1 | CORRECT
 7 RBP = 1.0 | actual RBP = 1 | CORRECT
 8 RBP = 0.15 | actual RBP = 0 | CORRECT
 9 RBP = 0.02 | actual RBP = 0 | CORRECT
 10 RBP = 0.98 | actual RBP = 1 | CORRECT
 11 RBP = 1.0 | actual RBP = 1 | CORRECT
 12 RBP = 0.0 | actual RBP = 0 | CORRECT
 13 RBP = 0.01 | actual RBP = 0 | CORRECT
 14 RBP = 1.0 | actual RBP = 1 | CORRECT