In [None]:
# STEP 1: Setup Environment
!pip install numpy==1.26.4 --quiet
!pip install scipy==1.13.1 --quiet

# Install primary RL and simulation packages
!pip install stable-baselines3[extra] grid2op l2rpn-baselines --quiet
print("✓ Core RL/simulation libraries installed.")

# Install analysis and plotting packages
!pip install matplotlib shap pandas seaborn --quiet
print("✓ Analysis/plotting libraries installed.")

# Project Imports
import copy
import warnings
from collections import defaultdict

import grid2op
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import shap
from grid2op.Agent import DoNothingAgent, RandomAgent
from grid2op.MakeEnv import make
from grid2op.Parameters import Parameters
from grid2op.Reward import *
from grid2op.gym_compat import DiscreteActSpace, GymEnv
from scipy import stats
from stable_baselines3 import DQN
from stable_baselines3.common.env_util import DummyVecEnv
from tqdm.auto import tqdm

# Suppress common warnings for cleaner output
warnings.filterwarnings("ignore", category=UserWarning)

print("✓ All libraries installed and imports successful.")

In [None]:
# STEP 2: Experiment Configuration

# --- Primary Settings ---
ENV_NAME = "l2rpn_case14_sandbox"
TRAINING_TIMESTEPS = 150000
N_TEST_EPISODES = 20
N_STATISTICAL_RUNS = 5      # For future work: run the whole experiment 5 times

# --- Analysis Flags ---
PERFORM_ACTION_IMPACT_ANALYSIS = False # Set to True for a deep (but slow) action analysis

# Defines the different data degradation scenarios for evaluation.
ROBUSTNESS_SCENARIOS = {
    # Control group
    'baseline':          {'missing_rate': 0.0, 'corruption_rate': 0.0},

    # Missing data scenarios
    'light_missing':     {'missing_rate': 0.05, 'corruption_rate': 0.0},
    'moderate_missing':  {'missing_rate': 0.15, 'corruption_rate': 0.0},
    'heavy_missing':     {'missing_rate': 0.30, 'corruption_rate': 0.0},

    # Data corruption scenarios
    'light_corruption':  {'missing_rate': 0.0, 'corruption_rate': 0.05},
    'mixed_degradation': {'missing_rate': 0.10, 'corruption_rate': 0.10},
}

print(f"Configuration loaded:")
print(f"  - Training Timesteps: {TRAINING_TIMESTEPS}")
print(f"  - Evaluation Episodes: {N_TEST_EPISODES}")
print(f"  - Robustness Scenarios: {len(ROBUSTNESS_SCENARIOS)}")

In [None]:
# STEP 3: Environment Utilities

def create_environment(env_name, params=None):
    """A helper function to create a Grid2Op environment, optionally with custom parameters."""
    if params:
        return make(env_name, param=params)
    return make(env_name)

def get_reward_functions():
    """Returns a dictionary of reward functions to be used for multi-faceted evaluation."""
    return {
        'L2RPN': L2RPNReward(),
        'Economic': EconomicReward(),
        'FlatHazard': FlatReward(),      # Measures survival without other penalties
        'Gameplay': GameplayReward(),    # Balances survival and action costs
        'Redispatching': RedispReward()  # Focuses on generation stability
    }

# Define the set of reward functions for the analysis
reward_functions = get_reward_functions()
print(f"✓ Defined {len(reward_functions)} reward functions for evaluation.")

In [None]:
# STEP 4: Prepare Data & Environments

def get_available_chronics(env_name, max_test=100):
    """Robustly discovers available scenarios by attempting to load them."""
    available_ids = []
    print("Discovering available chronics...")
    for i in tqdm(range(max_test), desc="Probing Chronics"):
        try:
            p = Parameters()
            p.CHRONICS_ID_TO_USE = [i]
            make(env_name, param=p, test=True) # test=True loads faster
            available_ids.append(i)
        except Exception:
            break # Stop when a chronic can't be found
    print(f"✓ Found {len(available_ids)} available chronics.")
    return available_ids

def create_preconfigured_env(env_name, ids_to_use):
    """Creates a gym-compatible environment pre-configured for a specific set of chronics."""
    params = Parameters()
    params.CHRONICS_ID_TO_USE = ids_to_use
    env = create_environment(env_name, params=params)
    gym_env = GymEnv(env)
    gym_env.action_space = DiscreteActSpace(
        env.action_space,
        attr_to_keep=['set_line_status', 'change_line_status']
    )
    return gym_env

# Discover and split available scenarios
all_chronics_ids = get_available_chronics(ENV_NAME)
if all_chronics_ids:
    np.random.shuffle(all_chronics_ids)
    train_end = int(len(all_chronics_ids) * 0.6)
    val_end = int(len(all_chronics_ids) * 0.8)
    train_ids = all_chronics_ids[:train_end]
    val_ids = all_chronics_ids[train_end:val_end]
    test_ids = all_chronics_ids[val_end:]
else:
    # Fallback if no chronics are found
    train_ids, val_ids, test_ids = [0], [0], [0]

# Ensure sets are not empty for stability
if not train_ids: train_ids = [0]
if not val_ids: val_ids = [0]
if not test_ids: test_ids = [0]
print(f"✓ Chronics split: {len(train_ids)} train, {len(val_ids)} validation, {len(test_ids)} test.")

# Create dedicated environments for each data split
print("Creating pre-configured environments...")
train_env = create_preconfigured_env(ENV_NAME, train_ids)
val_env = create_preconfigured_env(ENV_NAME, val_ids)
test_env = create_preconfigured_env(ENV_NAME, test_ids)
print("✓ Train, validation, and test environments created successfully.")

In [None]:
# STEP 5: Agent Training

def train_agent_with_validation(train_env, val_env, timesteps):
    """Trains a DQN agent with periodic validation checks."""
    print("\n--- Training DQN Agent ---")
    vec_env = DummyVecEnv([lambda: train_env])

    model = DQN("MultiInputPolicy", vec_env, verbose=0, # Set verbose=1 for detailed logs
                learning_rate=0.001,
                buffer_size=100000,
                learning_starts=2000,
                batch_size=128,
                train_freq=4,
                target_update_interval=1000,
                exploration_initial_eps=1.0,
                exploration_final_eps=0.05)

    validation_scores = []
    num_phases = 5
    validation_interval = timesteps // num_phases

    for i in range(num_phases):
        print(f"\nTraining phase {i+1}/{num_phases}...")
        model.learn(total_timesteps=validation_interval,
                    progress_bar=True,
                    reset_num_timesteps=False)

        val_score = quick_validation(model, val_env)
        validation_scores.append(val_score)
        print(f"  > Validation score after phase {i+1}: {val_score:.2f}")

    print("✓ Training complete.")
    return model, validation_scores

def quick_validation(model, val_env, n_episodes=5):
    """Helper function to evaluate model performance on the validation set."""
    scores = []
    for _ in range(n_episodes):
        obs, _ = val_env.reset()
        done = False
        total_reward = 0
        while not done:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, done, truncated, _ = val_env.step(action)
            done = done or truncated
            total_reward += reward
        scores.append(total_reward)
    return np.mean(scores)

# Train the agent
dqn_model, training_validation_scores = train_agent_with_validation(
    train_env, val_env, TRAINING_TIMESTEPS
)
dqn_model.save("dqn_grid_agent_long_train.zip")

In [None]:
# STEP 6: Evaluation Framework

def apply_data_degradation(obs, scenario_config):
    """Applies missing data and noise corruption to an observation."""
    if not scenario_config['missing_rate'] and not scenario_config['corruption_rate']:
        return obs

    degraded_obs = copy.deepcopy(obs)

    # Apply missing data (impute with zero)
    if scenario_config['missing_rate'] > 0:
        for key, value in degraded_obs.items():
            if isinstance(value, np.ndarray) and value.size > 0:
                flat_value = value.flatten()
                n_missing = int(flat_value.size * scenario_config['missing_rate'])
                if n_missing > 0:
                    missing_indices = np.random.choice(flat_value.size, n_missing, replace=False)
                    flat_value[missing_indices] = 0.0
                    degraded_obs[key] = flat_value.reshape(value.shape)

    # Apply noise corruption
    if scenario_config['corruption_rate'] > 0:
        for key, value in degraded_obs.items():
            if isinstance(value, np.ndarray) and value.size > 0:
                flat_value = value.flatten().astype(np.float32)
                n_corrupt = int(flat_value.size * scenario_config['corruption_rate'])
                if n_corrupt > 0:
                    corrupt_indices = np.random.choice(flat_value.size, n_corrupt, replace=False)
                    std_dev = np.std(flat_value) if np.std(flat_value) > 1e-6 else 1.0
                    noise = np.random.normal(0, std_dev * 0.1, n_corrupt)
                    flat_value[corrupt_indices] += noise
                    degraded_obs[key] = flat_value.reshape(value.shape)

    return degraded_obs


In [None]:
# STEP 7: COMPREHENSIVE EVALUATION FRAMEWORK

class ComprehensiveEvaluator:
    """A class to handle comprehensive agent evaluation across scenarios."""

    def __init__(self, reward_functions):
        self.reward_functions = reward_functions

    def evaluate(self, agent, eval_env, n_episodes, scenario_name, scenario_config):
        print(f"\nRunning evaluation: Agent='{agent.__class__.__name__}', Scenario='{scenario_name}'")

        survival_steps_raw = []
        episode_rewards = {name: [] for name in self.reward_functions.keys()}
        action_counts = defaultdict(int)

        for _ in tqdm(range(n_episodes), desc=f"  - Episodes"):
            obs, _ = eval_env.reset()
            done = False
            step_count = 0
            while not done:
                degraded_obs = apply_data_degradation(obs, scenario_config)
                action, _ = agent.predict(degraded_obs, deterministic=True)
                action = int(action)

                action_counts[action] += 1
                obs, reward, done, truncated, _ = eval_env.step(action)
                done = done or truncated

                for name in self.reward_functions.keys():
                    episode_rewards[name].append(reward) # Simplified: use default reward for all

                step_count += 1
            survival_steps_raw.append(step_count)

        # Aggregate and return results
        results = {
            'survival_steps_raw': survival_steps_raw,
            'survival_mean': np.mean(survival_steps_raw) if survival_steps_raw else 0,
            'survival_std': np.std(survival_steps_raw) if survival_steps_raw else 0,
            'action_diversity': len(action_counts)
        }
        for name, rewards in episode_rewards.items():
             results[f'{name}_mean'] = np.mean(rewards) if rewards else 0

        return results

In [None]:
# STEP 8: BASELINE AGENTS AND COMPREHENSIVE COMPARISON

print("\n--- Comprehensive Agent Evaluation ---")

evaluator = ComprehensiveEvaluator(reward_functions)

# Define simple, gym-compatible baseline agents
class SimpleDoNothingAgent:
    def predict(self, obs, deterministic=True):
        return 0, {} # Action 0 is always do_nothing

class SimpleRandomAgent:
    def __init__(self, action_space):
        self.action_space = action_space
    def predict(self, obs, deterministic=True):
        return self.action_space.sample(), {}

agents_to_test = {
    'DQN': dqn_model,
    'DoNothing': SimpleDoNothingAgent(),
    'Random': SimpleRandomAgent(test_env.action_space)
}

# Run evaluation for all agents across all robustness scenarios
all_results = {}
for agent_name, agent in agents_to_test.items():
    agent_results = {}
    for scenario_name, scenario_config in ROBUSTNESS_SCENARIOS.items():
        current_test_env = copy.deepcopy(test_env)
        results = evaluator.evaluate(
            agent, current_test_env,
            N_TEST_EPISODES, scenario_name, scenario_config
        )
        agent_results[scenario_name] = results
    all_results[agent_name] = agent_results

print("\n✓ Evaluation finished.")

In [None]:
# STEP 9: Statistical Analysis

def perform_statistical_analysis(all_results):
    """Processes raw results into a DataFrame and performs significance testing."""
    print("\n--- Statistical Analysis ---")

    # Flatten the nested results dictionary into a list for DataFrame creation
    flat_results = []
    for agent, agent_data in all_results.items():
        for scenario, scenario_data in agent_data.items():
            entry = {
                'Agent': agent,
                'Scenario': scenario,
                'Survival_Mean': scenario_data['survival_mean'],
                'Survival_Std': scenario_data['survival_std'],
                'L2RPN_Reward': scenario_data.get('L2RPN_mean', 0),
                'Action_Diversity': scenario_data['action_diversity']
            }
            flat_results.append(entry)

    df = pd.DataFrame(flat_results)

    print("\nSignificance Testing (Welch's t-test on baseline survival):")

    # Compare the distributions of survival times, not just the means
    dqn_dist = all_results['DQN']['baseline']['survival_steps_raw']
    donothing_dist = all_results['DoNothing']['baseline']['survival_steps_raw']
    random_dist = all_results['Random']['baseline']['survival_steps_raw']

    # Welch's t-test is appropriate here as we don't assume equal variance
    _, p_donothing = stats.ttest_ind(dqn_dist, donothing_dist, equal_var=False)
    print(f"  - DQN vs. DoNothing: p-value = {p_donothing:.4f} {'(Difference is significant)' if p_donothing < 0.05 else '(Not significant)'}")

    _, p_random = stats.ttest_ind(dqn_dist, random_dist, equal_var=False)
    print(f"  - DQN vs. Random:    p-value = {p_random:.4f} {'(Difference is significant)' if p_random < 0.05 else '(Not significant)'}")

    return df

# Process the results
statistical_df = perform_statistical_analysis(all_results)

In [None]:
# STEP 10: VISUALIZATION AND REPORTING
# ==============================================================================
def create_comprehensive_visualizations(statistical_df, all_results):
    """Create comprehensive visualizations."""
    print("\n--- CREATING VISUALIZATIONS ---")

    # Figure 1: Agent Performance Comparison
    plt.figure(figsize=(15, 10))

    # Subplot 1: Survival time comparison
    plt.subplot(2, 3, 1)
    baseline_data = statistical_df[statistical_df['Scenario'] == 'baseline']
    sns.barplot(data=baseline_data, x='Agent', y='Survival_Mean')
    plt.title('Agent Performance: Average Survival Time')
    plt.ylabel('Timesteps Survived')

    # Subplot 2: Robustness across scenarios (DQN only)
    plt.subplot(2, 3, 2)
    dqn_data = statistical_df[statistical_df['Agent'] == 'DQN']
    sns.barplot(data=dqn_data, x='Scenario', y='Survival_Mean')
    plt.title('DQN Robustness Across Scenarios')
    plt.xticks(rotation=45)
    plt.ylabel('Timesteps Survived')

    # Subplot 3: Action diversity
    plt.subplot(2, 3, 3)
    sns.barplot(data=baseline_data, x='Agent', y='Action_Diversity')
    plt.title('Action Diversity by Agent')
    plt.ylabel('Number of Different Actions Used')

    # Subplot 4: L2RPN Reward comparison
    plt.subplot(2, 3, 4)
    sns.barplot(data=baseline_data, x='Agent', y='L2RPN_Reward')
    plt.title('L2RPN Reward Performance')
    plt.ylabel('Average L2RPN Reward')

    # Subplot 5: Robustness heatmap
    plt.subplot(2, 3, 5)
    pivot_data = statistical_df.pivot(index='Agent', columns='Scenario', values='Survival_Mean')
    sns.heatmap(pivot_data, annot=True, fmt='.1f', cmap='viridis')
    plt.title('Performance Heatmap: Survival Time')

    # Subplot 6: Training validation curve
    plt.subplot(2, 3, 6)
    plt.plot(range(1, len(training_validation_scores) + 1), training_validation_scores, 'o-')
    plt.title('Training Validation Scores')
    plt.xlabel('Training Phase')
    plt.ylabel('Validation Score')
    plt.grid(True)

    plt.tight_layout()
    plt.show()

    # Figure 2: Detailed robustness analysis
    plt.figure(figsize=(12, 8))

    # Performance degradation analysis
    scenarios_order = ['baseline', 'light_missing', 'moderate_missing', 'heavy_missing',
                      'light_corruption', 'mixed_degradation']

    for agent in ['DQN', 'DoNothing', 'Random']:
        agent_survival = []
        for scenario in scenarios_order:
            if scenario in all_results[agent]:
                agent_survival.append(all_results[agent][scenario]['survival_mean'])
            else:
                agent_survival.append(0)
        plt.plot(scenarios_order, agent_survival, 'o-', label=agent, linewidth=2)

    plt.title('Agent Performance Degradation Under Different Scenarios')
    plt.xlabel('Degradation Scenario')
    plt.ylabel('Average Survival Time')
    plt.legend()
    plt.xticks(rotation=45)
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

create_comprehensive_visualizations(statistical_df, all_results)



In [None]:
#ENHANCED XAI ANALYSIS

print("\n--- ENHANCED EXPLAINABLE AI ANALYSIS ---")

class EnhancedModelWrapper:
    """Enhanced wrapper for SHAP analysis with better feature interpretation."""

    def __init__(self, model, env):
        self.model = model
        self.env = env
        self.key_features = ['rho', 'gen_p', 'load_p', 'p_or', 'q_or', 'v_or']

        # Create feature mapping
        sample_obs, _ = self.env.reset()
        self.template_obs = sample_obs
        self.feature_map = {}
        self.feature_names = []

        current_idx = 0
        for key in self.key_features:
            if key in sample_obs and sample_obs[key] is not None:
                shape = sample_obs[key].shape
                size = sample_obs[key].size
                self.feature_map[key] = {
                    'start': current_idx,
                    'end': current_idx + size,
                    'shape': shape
                }
                # Create more descriptive feature names
                if len(shape) == 1:
                    self.feature_names.extend([f"{key}_{i}" for i in range(size)])
                else:
                    self.feature_names.extend([f"{key}_flat_{i}" for i in range(size)])
                current_idx += size

        self.n_features = current_idx
        print(f"✓ Enhanced wrapper created with {self.n_features} features")

    def _reconstruct_observation(self, x_flat):
        """Reconstruct observation from flattened features."""
        reconstructed_obs = self.template_obs.copy()
        for key, info in self.feature_map.items():
            data_slice = x_flat[info['start']:info['end']]
            reconstructed_obs[key] = data_slice.reshape(info['shape'])
        return reconstructed_obs

    def predict_q_values(self, X):
        """Predict Q-values for multiple observations."""
        if X.ndim == 1:
            X = X.reshape(1, -1)

        q_values_list = []
        for i in range(X.shape[0]):
            obs_dict = self._reconstruct_observation(X[i])
            q_values = self.model.policy.q_net(
                self.model.policy.obs_to_tensor(obs_dict)[0]
            )
            q_values_list.append(q_values.detach().cpu().numpy().flatten())

        return np.array(q_values_list)

# Create enhanced SHAP analysis
# FIX: Use the pre-configured "test_env" for the wrapper
enhanced_wrapper = EnhancedModelWrapper(dqn_model, test_env)

# Collect diverse background data
print("Collecting diverse background data for SHAP...")
background_data_list = []
# FIX: Use the pre-configured "test_env" for data collection
shap_env = test_env

for episode in range(5):  # Fewer episodes for speed
    obs, _ = shap_env.reset()
    for step in range(20):  # Multiple steps per episode
        features = []
        for key in enhanced_wrapper.key_features:
            if key in obs and obs[key] is not None:
                features.extend(obs[key].flatten())

        if len(features) == enhanced_wrapper.n_features:
            background_data_list.append(np.array(features, dtype=np.float32))

        action, _ = dqn_model.predict(obs, deterministic=True)
        obs, _, done, _, _ = shap_env.step(action)
        if done:
            break

background_data = np.array(background_data_list[:100])
print(f"✓ Collected {len(background_data)} background samples")

# Create SHAP explainer and analyze
if len(background_data) > 10:
    explainer = shap.KernelExplainer(
        enhanced_wrapper.predict_q_values,
        background_data[:20] # Use a small, representative subset
    )

    # Find interesting state to explain
    sample_features = background_data[0]
    shap_values = explainer.shap_values(sample_features, nsamples=50)

    # Visualize
    predicted_action = np.argmax(enhanced_wrapper.predict_q_values(sample_features))
    print(f"✓ SHAP analysis complete. Predicted action: {predicted_action}")

    shap.summary_plot(
        shap_values,
        feature_names=enhanced_wrapper.feature_names,
        plot_type="bar",
        max_display=15,
        title=f"Feature Importance for Action {predicted_action}"
    )
