# Attack Generation and Demonstration

This notebook demonstrates various attack generation techniques for power system security evaluation.

## Attack Types:
1. FDIA (False Data Injection Attack)
2. Temporal Stealth Attacks
3. Replay Attacks
4. ML Adversarial Attacks (FGSM, PGD)

In [None]:
import sys
sys.path.append('../src')

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import yaml

from attacks import AttackGenerator
from pandapower_utils import create_power_system_model
from data_loader import load_sample_data

plt.style.use('seaborn-v0_8')
%matplotlib inline

print("Libraries imported successfully")

## 1. Load Configuration and Data

In [None]:
# Load configuration
with open('../config.yaml', 'r') as f:
    config = yaml.safe_load(f)

# Load sample data
dataset_path = config['data']['dataset_path']
data = load_sample_data(dataset_path, small_data_mode=True)

print(f"Loaded {len(data)} data modalities")
for modality, df in data.items():
    print(f"  {modality}: {df.shape}")

## 2. Initialize Attack Generator

In [None]:
# Initialize attack generator
generator = AttackGenerator(config)
generator.initialize_power_model()

# Generate base measurements for attacks
if generator.power_model:
    measurements, timestamps = generator.power_model.generate_base_measurements(n_samples=200)
    print(f"Generated measurements: {measurements.shape}")
else:
    # Fallback synthetic measurements
    measurements = np.random.randn(200, 15) * 0.1 + 1.0
    timestamps = pd.date_range('2024-01-01', periods=200, freq='1S')
    print(f"Using synthetic measurements: {measurements.shape}")

print(f"Jacobian available: {generator.jacobian is not None}")
if generator.jacobian is not None:
    print(f"Jacobian shape: {generator.jacobian.shape}")

## 3. FDIA (False Data Injection Attack)

In [None]:
# Generate FDIA
print("Generating FDIA...")

# Select a measurement sample
z_clean = measurements[50]
print(f"Original measurement: {z_clean[:5]}")

# Generate attack with different magnitudes
attack_magnitudes = [0.05, 0.1, 0.2]
fdia_results = {}

for mag in attack_magnitudes:
    z_attacked = generator.make_fdia(z_clean, attack_magnitude=mag)
    attack_vector = z_attacked - z_clean
    fdia_results[mag] = {
        'attacked': z_attacked,
        'attack_vector': attack_vector,
        'attack_norm': np.linalg.norm(attack_vector)
    }
    print(f"Attack magnitude {mag}: norm = {fdia_results[mag]['attack_norm']:.4f}")

# Visualize FDIA
fig, axes = plt.subplots(1, 2, figsize=(15, 5))

# Plot original vs attacked measurements
indices = np.arange(len(z_clean))
axes[0].bar(indices - 0.2, z_clean, 0.4, label='Original', alpha=0.7)
axes[0].bar(indices + 0.2, fdia_results[0.1]['attacked'], 0.4, label='Attacked', alpha=0.7)
axes[0].set_title('FDIA: Original vs Attacked Measurements')
axes[0].set_xlabel('Measurement Index')
axes[0].set_ylabel('Value')
axes[0].legend()

# Plot attack vectors
for mag in attack_magnitudes:
    axes[1].plot(fdia_results[mag]['attack_vector'], label=f'Magnitude {mag}', marker='o')
axes[1].set_title('FDIA Attack Vectors')
axes[1].set_xlabel('Measurement Index')
axes[1].set_ylabel('Attack Value')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 4. Temporal Stealth Attack

In [None]:
# Generate temporal stealth attack
print("Generating Temporal Stealth Attack...")

# Use a subset of measurements for temporal attack
Z_window = measurements[100:150]  # 50 timesteps
print(f"Time window shape: {Z_window.shape}")

# Select measurements to attack (first 3 measurements)
attack_indices = [0, 1, 2]

# Generate stealth attack
Z_stealth = generator.make_temporal_stealth(
    Z_window, 
    indices=attack_indices,
    duration=20,
    max_step=0.02
)

# Visualize temporal stealth attack
fig, axes = plt.subplots(len(attack_indices), 1, figsize=(12, 8))

for i, idx in enumerate(attack_indices):
    ax = axes[i] if len(attack_indices) > 1 else axes
    
    time_indices = np.arange(len(Z_window))
    ax.plot(time_indices, Z_window[:, idx], label='Original', linewidth=2)
    ax.plot(time_indices, Z_stealth[:, idx], label='Stealth Attack', linewidth=2, linestyle='--')
    
    ax.set_title(f'Temporal Stealth Attack - Measurement {idx}')
    ax.set_xlabel('Time Step')
    ax.set_ylabel('Value')
    ax.legend()
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Calculate attack statistics
attack_diff = Z_stealth - Z_window
max_attack = np.max(np.abs(attack_diff))
mean_attack = np.mean(np.abs(attack_diff))
print(f"\nStealth Attack Statistics:")
print(f"  Maximum attack magnitude: {max_attack:.4f}")
print(f"  Mean attack magnitude: {mean_attack:.4f}")
print(f"  Attacked measurements: {len(attack_indices)}")

## 5. Replay Attack

In [None]:
# Generate replay attack
print("Generating Replay Attack...")

# Use larger measurement window
Z_replay_window = measurements[50:150]  # 100 timesteps
print(f"Replay window shape: {Z_replay_window.shape}")

# Generate replay attack
source_window = (10, 20)  # Copy timesteps 10-20
target_time = 70  # Paste at timestep 70

Z_replay = generator.make_replay(
    Z_replay_window,
    source_window=source_window,
    target_time=target_time
)

# Visualize replay attack
fig, axes = plt.subplots(2, 1, figsize=(15, 8))

# Plot first measurement channel
time_indices = np.arange(len(Z_replay_window))
axes[0].plot(time_indices, Z_replay_window[:, 0], label='Original', linewidth=2)
axes[0].plot(time_indices, Z_replay[:, 0], label='With Replay Attack', linewidth=2, linestyle='--')

# Highlight source and target regions
axes[0].axvspan(source_window[0], source_window[1], alpha=0.3, color='green', label='Source Window')
axes[0].axvspan(target_time, target_time + (source_window[1] - source_window[0]), 
                alpha=0.3, color='red', label='Target Window')

axes[0].set_title('Replay Attack - Measurement Channel 0')
axes[0].set_xlabel('Time Step')
axes[0].set_ylabel('Value')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Plot difference (attack signal)
replay_diff = Z_replay - Z_replay_window
axes[1].plot(time_indices, replay_diff[:, 0], color='red', linewidth=2)
axes[1].set_title('Replay Attack Signal (Difference)')
axes[1].set_xlabel('Time Step')
axes[1].set_ylabel('Attack Magnitude')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Calculate replay statistics
replay_affected_samples = np.sum(np.any(replay_diff != 0, axis=1))
print(f"\nReplay Attack Statistics:")
print(f"  Source window: timesteps {source_window[0]}-{source_window[1]}")
print(f"  Target window: timesteps {target_time}-{target_time + (source_window[1] - source_window[0])}")
print(f"  Affected timesteps: {replay_affected_samples}")
print(f"  Window size: {source_window[1] - source_window[0]} timesteps")

## 6. ML Adversarial Attacks (ART Integration)

In [None]:
# Generate ML adversarial attacks using ART
print("Generating ML Adversarial Attacks...")

try:
    from sklearn.ensemble import RandomForestClassifier
    
    # Create a simple classifier for demonstration
    X_demo = measurements[:100]
    y_demo = np.random.choice([0, 1], 100, p=[0.8, 0.2])  # Synthetic labels
    
    # Train simple RF classifier
    rf_model = RandomForestClassifier(n_estimators=10, random_state=42)
    rf_model.fit(X_demo, y_demo)
    
    print(f"Trained RF classifier on {len(X_demo)} samples")
    
    # Generate adversarial examples
    X_test = measurements[100:120]  # 20 test samples
    
    adversarial_examples = generator.make_art_attacks(
        rf_model, 
        X_test,
        attack_params=config['attacks']['art_attacks']
    )
    
    if adversarial_examples:
        print(f"\nGenerated adversarial examples:")
        for attack_name, X_adv in adversarial_examples.items():
            # Calculate perturbation statistics
            perturbation = X_adv - X_test
            l2_norm = np.mean(np.linalg.norm(perturbation, axis=1))
            linf_norm = np.mean(np.max(np.abs(perturbation), axis=1))
            
            print(f"  {attack_name}:")
            print(f"    L2 norm: {l2_norm:.4f}")
            print(f"    L∞ norm: {linf_norm:.4f}")
            
            # Check if attack changed predictions
            pred_clean = rf_model.predict(X_test)
            pred_adv = rf_model.predict(X_adv)
            success_rate = np.mean(pred_clean != pred_adv)
            print(f"    Success rate: {success_rate:.2%}")
        
        # Visualize adversarial examples
        if len(adversarial_examples) > 0:
            attack_name = list(adversarial_examples.keys())[0]
            X_adv_demo = adversarial_examples[attack_name]
            
            fig, axes = plt.subplots(1, 2, figsize=(15, 5))
            
            # Plot original vs adversarial
            sample_idx = 0
            feature_indices = np.arange(min(10, X_test.shape[1]))
            
            axes[0].bar(feature_indices - 0.2, X_test[sample_idx, :len(feature_indices)], 
                       0.4, label='Original', alpha=0.7)
            axes[0].bar(feature_indices + 0.2, X_adv_demo[sample_idx, :len(feature_indices)], 
                       0.4, label='Adversarial', alpha=0.7)
            axes[0].set_title(f'Adversarial Example: {attack_name}')
            axes[0].set_xlabel('Feature Index')
            axes[0].set_ylabel('Value')
            axes[0].legend()
            
            # Plot perturbation
            perturbation_demo = X_adv_demo[sample_idx] - X_test[sample_idx]
            axes[1].bar(feature_indices, perturbation_demo[:len(feature_indices)], 
                       color='red', alpha=0.7)
            axes[1].set_title('Adversarial Perturbation')
            axes[1].set_xlabel('Feature Index')
            axes[1].set_ylabel('Perturbation')
            axes[1].grid(True, alpha=0.3)
            
            plt.tight_layout()
            plt.show()
    else:
        print("No adversarial examples generated (ART may not be available)")
        
except ImportError as e:
    print(f"ART not available: {e}")
    print("Skipping ML adversarial attacks demonstration")
except Exception as e:
    print(f"Error generating adversarial examples: {e}")

## 7. Generate Comprehensive Attack Dataset

In [None]:
# Generate comprehensive attack dataset
print("Generating comprehensive attack dataset...")

attack_data = generator.generate_attack_dataset(
    measurements=measurements,
    timestamps=timestamps,
    attack_types=['fdia', 'temporal_stealth', 'replay'],
    attack_ratio=0.3
)

print(f"\nAttack Dataset Summary:")
print(f"  Total samples: {attack_data['metadata']['total_samples']}")
print(f"  Attack samples: {attack_data['metadata']['attack_samples']}")
print(f"  Attack ratio: {attack_data['metadata']['attack_ratio']:.2%}")

# Analyze attack distribution
attack_types_count = pd.Series(attack_data['attack_types']).value_counts()
print(f"\nAttack Type Distribution:")
for attack_type, count in attack_types_count.items():
    print(f"  {attack_type}: {count} samples")

# Visualize attack dataset
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Plot 1: Attack labels over time
axes[0, 0].plot(attack_data['labels'], linewidth=1)
axes[0, 0].set_title('Attack Labels Over Time')
axes[0, 0].set_xlabel('Time Step')
axes[0, 0].set_ylabel('Attack Label (0=Clean, 1=Attack)')
axes[0, 0].grid(True, alpha=0.3)

# Plot 2: Attack type distribution
attack_types_count.plot(kind='bar', ax=axes[0, 1])
axes[0, 1].set_title('Attack Type Distribution')
axes[0, 1].set_xlabel('Attack Type')
axes[0, 1].set_ylabel('Count')
axes[0, 1].tick_params(axis='x', rotation=45)

# Plot 3: Sample measurements (clean vs attacked)
clean_indices = np.where(attack_data['labels'] == 0)[0][:50]
attack_indices = np.where(attack_data['labels'] == 1)[0][:50]

axes[1, 0].plot(attack_data['clean'][clean_indices, 0], label='Clean', alpha=0.7)
axes[1, 0].plot(attack_data['clean'][attack_indices, 0], label='Attacked', alpha=0.7)
axes[1, 0].set_title('Sample Measurements: Clean vs Attacked')
axes[1, 0].set_xlabel('Sample Index')
axes[1, 0].set_ylabel('Measurement Value')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# Plot 4: Attack magnitude distribution
if len(attack_indices) > 0:
    attack_magnitudes = np.linalg.norm(
        attack_data['clean'][attack_indices] - measurements[attack_indices], axis=1
    )
    axes[1, 1].hist(attack_magnitudes, bins=20, alpha=0.7, edgecolor='black')
    axes[1, 1].set_title('Attack Magnitude Distribution')
    axes[1, 1].set_xlabel('L2 Norm of Attack Vector')
    axes[1, 1].set_ylabel('Frequency')
    axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Save attack dataset
generator.save_attack_dataset(
    attack_data, 
    config['data']['output_path'], 
    'notebook_demo_attacks'
)

print(f"\nAttack dataset saved to: {config['data']['output_path']}/experiments/")

## 8. Summary and Next Steps

In [None]:
print("ATTACK GENERATION SUMMARY")
print("=" * 50)

print("Generated Attack Types:")
print("  ✓ FDIA (False Data Injection Attack)")
print("  ✓ Temporal Stealth Attack")
print("  ✓ Replay Attack")
print("  ✓ ML Adversarial Attacks (if ART available)")

print(f"\nDataset Statistics:")
print(f"  Total samples: {len(measurements)}")
print(f"  Attack samples: {attack_data['metadata']['attack_samples']}")
print(f"  Attack ratio: {attack_data['metadata']['attack_ratio']:.2%}")

print(f"\nKey Findings:")
print(f"  • FDIA attacks successfully generated using power system Jacobian")
print(f"  • Temporal stealth attacks show gradual attack progression")
print(f"  • Replay attacks demonstrate data copying between time windows")
print(f"  • ML adversarial attacks test classifier robustness")

print(f"\nNext Steps:")
print(f"  1. Use generated attacks to evaluate AIDM detection performance")
print(f"  2. Run notebook_3_train_evaluate.ipynb for model training")
print(f"  3. Analyze detection rates for different attack types")
print(f"  4. Optimize detection thresholds based on attack characteristics")

print("\n" + "=" * 50)
print("ATTACK GENERATION COMPLETED SUCCESSFULLY")
print("=" * 50)