# Solar Power Grid Sensor Anomaly Detection

This notebook demonstrates the implementation of an LSTM-based autoencoder for anomaly detection in solar power grid sensor data. The system is designed to identify unusual patterns in time-series data from solar grid sensors that may indicate faults, failures, or other issues requiring attention.

## 1. Setup and Imports

In [None]:
import os
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import precision_recall_curve, f1_score, precision_score, recall_score

# Add parent directory to path for imports
sys.path.append('..')

# Import project modules
from utils.data_generator import generate_solar_sensor_data, create_sequences, train_test_split
from utils.visualization import (
    plot_time_series, 
    plot_reconstruction, 
    plot_reconstruction_error,
    plot_evaluation_metrics
)
from models.lstm_autoencoder import LSTMAutoencoder

# Set random seed for reproducibility
np.random.seed(42)
torch.manual_seed(42)

# Set up device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## 2. Generate Synthetic Solar Grid Sensor Data

In [None]:
# Generate solar sensor data
df = generate_solar_sensor_data(
    num_days=30,
    num_sensors=5,
    sampling_interval_minutes=15,
    noise_level=0.05,
    anomaly_prob=0.02,
    anomaly_scale=3.0
)

# Display the first few rows
print(f"Data shape: {df.shape}")
df.head()

In [None]:
# Visualize the data with anomalies
sensor_cols = [f'sensor_{i+1}' for i in range(5)]
plot_time_series(df, sensor_cols, anomaly_col='anomaly', figsize=(14, 10))

## 3. Data Preprocessing

Create sequences for the LSTM autoencoder and prepare train/test split.

In [None]:
# Define sequence length and target columns
seq_length = 24  # 6 hours with 15-minute sampling
target_cols = [f'sensor_{i+1}' for i in range(5)]

# Create sequences
X, y = create_sequences(df, target_cols, seq_length)
print(f"X shape: {X.shape}, y shape: {y.shape}")

In [None]:
# Split into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
print(f"Train shapes: X_train {X_train.shape}, y_train {y_train.shape}")
print(f"Test shapes: X_test {X_test.shape}, y_test {y_test.shape}")

In [None]:
# Create PyTorch datasets
X_train_tensor = torch.FloatTensor(X_train).to(device)
X_test_tensor = torch.FloatTensor(X_test).to(device)

## 4. LSTM Autoencoder Model

Initialize and train the LSTM autoencoder model.

In [None]:
# Model parameters
input_dim = len(target_cols)  # Number of features
hidden_dim = 64
latent_dim = 32
num_layers = 2

# Initialize model
model = LSTMAutoencoder(
    input_dim=input_dim,
    hidden_dim=hidden_dim,
    latent_dim=latent_dim,
    seq_len=seq_length,
    num_layers=num_layers
).to(device)

print(model)

In [None]:
# Training parameters
num_epochs = 50
batch_size = 32
learning_rate = 0.001

# Loss function and optimizer
criterion = nn.MSELoss(reduction='mean')
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
train_losses = []

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    
    # Create batches
    num_batches = len(X_train) // batch_size
    
    for i in range(num_batches):
        # Get batch
        start_idx = i * batch_size
        end_idx = start_idx + batch_size
        batch_X = X_train_tensor[start_idx:end_idx]
        
        # Forward pass
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_X)
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
    
    # Average training loss
    train_loss /= num_batches
    train_losses.append(train_loss)
    
    # Print progress
    if (epoch + 1) % 5 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss:.4f}")

In [None]:
# Plot training loss
plt.figure(figsize=(10, 4))
plt.plot(train_losses, 'b-')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.grid(True, linestyle='--', alpha=0.5)
plt.show()

## 5. Anomaly Detection and Evaluation

Use the trained autoencoder to detect anomalies and evaluate its performance.

In [None]:
# Compute reconstruction error on test set
model.eval()
with torch.no_grad():
    # Get reconstructions for test data
    test_reconstructions = model(X_test_tensor)
    
    # Compute reconstruction errors
    test_errors = model.get_reconstruction_error(X_test_tensor).cpu().numpy()

# Plot example reconstructions
X_test_np = X_test_tensor.cpu().numpy()
reconstructions_np = test_reconstructions.cpu().numpy()

# Show a normal example
normal_idx = np.where(y_test == 0)[0][0]
plot_reconstruction(X_test_np, reconstructions_np, idx=normal_idx, figsize=(12, 6))
plt.suptitle('Normal Sequence: Original vs Reconstructed', y=1.02)
plt.show()

# Find and show an anomalous example
anomaly_idx = np.where(y_test == 1)[0][0]
plot_reconstruction(X_test_np, reconstructions_np, idx=anomaly_idx, figsize=(12, 6))
plt.suptitle('Anomalous Sequence: Original vs Reconstructed', y=1.02)
plt.show()

In [None]:
# Plot reconstruction error with true anomalies
anomaly_indices = np.where(y_test == 1)[0]
plot_reconstruction_error(test_errors, anomaly_indices=anomaly_indices, figsize=(14, 5))
plt.show()

In [None]:
# Find optimal threshold for anomaly detection
fig, optimal_threshold = plot_evaluation_metrics(y_test, test_errors)
plt.show()
print(f"Optimal threshold: {optimal_threshold:.4f}")

In [None]:
# Apply threshold and compute metrics
y_pred = (test_errors >= optimal_threshold).astype(int)

precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

## 6. Model Interpretation and Visualization

In [None]:
# Visualize latent space
with torch.no_grad():
    # Get latent representations
    latent_vectors = model.encoder(X_test_tensor).cpu().numpy()

# Plot latent space with PCA if dimensions are high
from sklearn.decomposition import PCA

if latent_dim > 2:
    pca = PCA(n_components=2)
    latent_2d = pca.fit_transform(latent_vectors)
else:
    latent_2d = latent_vectors

plt.figure(figsize=(10, 8))
scatter = plt.scatter(latent_2d[:, 0], latent_2d[:, 1], c=y_test, cmap='viridis', alpha=0.7)
plt.colorbar(scatter, label='Anomaly')
plt.title('Latent Space Visualization')
plt.xlabel('Component 1')
plt.ylabel('Component 2')
plt.grid(True, linestyle='--', alpha=0.5)
plt.show()

## 7. Save Model

In [None]:
# Save model
model_path = '../models/lstm_autoencoder.pth'
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'threshold': optimal_threshold,
    'hyperparameters': {
        'input_dim': input_dim,
        'hidden_dim': hidden_dim,
        'latent_dim': latent_dim,
        'seq_len': seq_length,
        'num_layers': num_layers
    }
}, model_path)

print(f"Model saved to {model_path}")

## 8. Conclusion

In this notebook, we've implemented an LSTM autoencoder for anomaly detection in solar power grid sensor data. The key components include:

1. Synthetic data generation with realistic patterns and anomalies
2. LSTM-based autoencoder architecture for time series data
3. Training and evaluation of the model
4. Anomaly detection using reconstruction error
5. Visualization and interpretation of results

Future enhancements could include:
- Testing with real-world solar grid data
- Adding weather data as additional features
- Implementing online learning for continuous model updates
- Developing an alerting system based on detected anomalies