In [7]:

import grid2op
from grid2op.Parameters import Parameters
from grid2op.Agent import DoNothingAgent
from grid2op.gym_compat import GymEnv
from lightsim2grid import LightSimBackend
from grid2op.Reward import CombinedReward, IncreasingFlatReward, DistanceReward
from common.reward import LineMarginReward, RedispRewardv1

# Create the environment (using bus14 as a simple example)
print("Creating environment...")
g2op_env = grid2op.make(
    "l2rpn_case14_sandbox",
    reward_class=CombinedReward,
    backend=LightSimBackend()
)

# Setup rewards
cr = g2op_env.get_reward_instance() 
cr.addReward("IncreasingFlatReward", 
            IncreasingFlatReward(per_timestep=1/g2op_env.chronics_handler.max_episode_duration()),
            0.1)
cr.addReward("TopologyReward", DistanceReward(), 0.3)
cr.addReward("redispatchReward", RedispRewardv1(), 0.3)
cr.addReward("LineMarginReward", LineMarginReward(), 0.3)
cr.initialize(g2op_env)

# Wrap in GymEnv
gym_env = GymEnv(g2op_env, shuffle_chronics=True)

# Set parameters directly from the environment's parameters object (level 1 settings)
p = gym_env.init_env.parameters
p.HARD_OVERFLOW_THRESHOLD = 999
p.NB_TIMESTEP_OVERFLOW_ALLOWED = 9999999
p.SOFT_OVERFLOW_THRESHOLD = 1.0
p.NO_OVERFLOW_DISCONNECTION = False
p.MAX_LINE_STATUS_CHANGED = 1
p.MAX_SUB_CHANGED = 1

# Apply the modified parameters
gym_env.init_env.change_parameters(p)

# Print parameters being tested
print("Testing with parameters:")
print(f"  HARD_OVERFLOW_THRESHOLD: {p.HARD_OVERFLOW_THRESHOLD}")
print(f"  NB_TIMESTEP_OVERFLOW_ALLOWED: {p.NB_TIMESTEP_OVERFLOW_ALLOWED}")
print(f"  SOFT_OVERFLOW_THRESHOLD: {p.SOFT_OVERFLOW_THRESHOLD}")
print(f"  NO_OVERFLOW_DISCONNECTION: {p.NO_OVERFLOW_DISCONNECTION}")

# The reset is required after changing parameters
gym_env.reset()

# For testing, we'll use the underlying Grid2Op environment
agent = DoNothingAgent(gym_env.init_env.action_space)

# Play episodes
n_episodes = 7
max_steps_per_episode = gym_env.init_env.chronics_handler.max_episode_duration()
print(f"Playing {n_episodes} episodes with max {max_steps_per_episode} steps each")

survival_rates = []

for ep in range(n_episodes):
    obs = gym_env.reset()
    done = False
    step = 0
    
    # Get the underlying grid2op observation
    grid2op_obs = gym_env.init_env.get_obs()
    
    while not done and step < max_steps_per_episode:
        # Generate action using the agent
        action = agent.act(grid2op_obs, None, done)
        
        # Apply action to the Grid2Op environment
        grid2op_obs, reward, done, info = gym_env.init_env.step(action)
        step += 1
    
    survival_rate = step / max_steps_per_episode * 100
    survival_rates.append(survival_rate)
    print(f"Episode {ep+1}: Survived {step}/{max_steps_per_episode} steps ({survival_rate:.2f}%)")

# Print overall results
avg_survival = sum(survival_rates) / len(survival_rates)
print(f"\nAverage survival rate over {n_episodes} episodes: {avg_survival:.2f}%")



Creating environment...
Testing with parameters:
  HARD_OVERFLOW_THRESHOLD: 999.0
  NB_TIMESTEP_OVERFLOW_ALLOWED: 9999999
  SOFT_OVERFLOW_THRESHOLD: 1.0
  NO_OVERFLOW_DISCONNECTION: False
Playing 7 episodes with max 8064 steps each
Episode 1: Survived 8064/8064 steps (100.00%)
Episode 2: Survived 8064/8064 steps (100.00%)
Episode 3: Survived 8064/8064 steps (100.00%)
Episode 4: Survived 8064/8064 steps (100.00%)
Episode 5: Survived 8064/8064 steps (100.00%)
Episode 6: Survived 8064/8064 steps (100.00%)
Episode 7: Survived 8064/8064 steps (100.00%)

Average survival rate over 7 episodes: 100.00%


## Visualization of TOPO

In [1]:
import grid2op
from grid2op.Parameters import Parameters
from lightsim2grid import LightSimBackend
from grid2op.gym_compat import DiscreteActSpace
from grid2op.PlotGrid import PlotMatplot
import numpy as np
import matplotlib.pyplot as plt
import os
from matplotlib.backends.backend_pdf import PdfPages
import time
import argparse

def visualize_topology_actions(n_actions=5, save_dir="topo_visualizations", plot_backend="matplotlib"):
    """
    Visualize the topology structure after applying each action from the action space.
    
    Parameters:
    -----------
    n_actions: int
        Number of actions to visualize
    save_dir: str
        Directory to save visualizations
    plot_backend: str
        Either "matplotlib" or "plotly"
    """
    print("Creating environment...")
    # Create parameters based on level 1 (easiest)
    params = Parameters()
    params.HARD_OVERFLOW_THRESHOLD = 999
    params.NB_TIMESTEP_OVERFLOW_ALLOWED = 9999999
    params.SOFT_OVERFLOW_THRESHOLD = 1.0
    params.NO_OVERFLOW_DISCONNECTION = True
    params.MAX_LINE_STATUS_CHANGED = 1
    params.MAX_SUB_CHANGED = 1
    
    env = grid2op.make(
        "l2rpn_case14_sandbox",
        backend=LightSimBackend(),
        param=params
    )
    
    # Load the precalculated action space from numpy file
    action_space_path = os.path.join("env", "action_spaces/bus14_action_space.npy")
    print(f"Loading action space from: {action_space_path}")
    loaded_action_space = np.load(action_space_path, allow_pickle=True)
    print(f"Loaded {len(loaded_action_space)} predefined actions")
    
    # Limit the number of actions to visualize
    n_actions = min(n_actions, len(loaded_action_space))
    print(f"Will visualize {n_actions} actions")
    
    # Create DiscreteActSpace
    discrete_action_space = DiscreteActSpace(
        env.action_space,
        action_list=loaded_action_space[:n_actions]
    )
    
    # Create the plot object
    if plot_backend == "matplotlib":
        plot_helper = PlotMatplot(env.observation_space)
    else:
        # Importing here to avoid dependency issues if not installed
        from grid2op.PlotGrid import PlotPlotly
        plot_helper = PlotPlotly(env.observation_space)
    
    # Create directory for saving visualizations if it doesn't exist
    os.makedirs(save_dir, exist_ok=True)
    
    # Create a PDF file to save all visualizations together
    pdf_path = os.path.join(save_dir, "all_topologies.pdf")
    pdf = PdfPages(pdf_path)
    
    # Dictionary to store information about each action for summary
    action_info = {}
    
    # Visualize each action
    for action_idx in range(n_actions):
        # Get the action
        action = discrete_action_space.from_gym(action_idx)
        action_dict = action.as_dict()
        
        # Reset the environment
        obs = env.reset()
        
        # Print action details
        print(f"\nAction {action_idx} details:")
        
        # Classify the action type
        action_type = "Do Nothing"
        impacted_subs = []
        affected_lines = []
        
        if 'change_bus_vect' in action_dict:
            action_type = "Change Bus"
            impacted_subs = action_dict['change_bus_vect']['modif_subs_id']
            print(f"  Type: Change Bus - Impacts substations: {impacted_subs}")
            
            # Get affected elements
            for sub_id in impacted_subs:
                elements = action_dict['change_bus_vect']['modif_objects_id']
                print(f"  Substation {sub_id}: Changing bus for elements {elements}")
                
        elif 'set_bus_vect' in action_dict:
            action_type = "Set Bus"
            if 'modif_subs_id' in action_dict['set_bus_vect']:
                impacted_subs = action_dict['set_bus_vect']['modif_subs_id']
                print(f"  Type: Set Bus - Impacts substations: {impacted_subs}")
            
        elif 'change_line_status' in action_dict:
            action_type = "Change Line Status"
            affected_lines = action_dict['change_line_status']['changed_id']
            line_states = ["Disconnected->Connected" if env.get_line_status()[line_id] == False else "Connected->Disconnected" 
                           for line_id in affected_lines]
            print(f"  Type: Change Line Status - Lines: {affected_lines} - States: {line_states}")
            
        elif 'set_line_status' in action_dict:
            action_type = "Set Line Status"
            lines_and_status = []
            for i, status in enumerate(action_dict['set_line_status']):
                if status != 0:  # 0 means don't change
                    lines_and_status.append((i, "Connected" if status == 1 else "Disconnected"))
            affected_lines = [l[0] for l in lines_and_status]
            print(f"  Type: Set Line Status - Lines and states: {lines_and_status}")
        
        # Store action info
        action_info[action_idx] = {
            'type': action_type,
            'impacted_subs': impacted_subs,
            'affected_lines': affected_lines
        }
        
        # Apply the action
        obs, reward, done, info = env.step(action)
        
        if done:
            print(f"  WARNING: Action {action_idx} led to game over!")
            if 'exception' in info:
                print(f"  Reason: {info['exception'].__class__.__name__}")
            continue
        
        # Plot the resulting topology
        fig = plt.figure(figsize=(15, 10))
        plot_helper.plot_obs(obs, fig=fig, line_info="rho", load_info=None, gen_info=None)
        plt.title(f"Action {action_idx} - {action_type}")
        
        # Add subtitle with details
        subtitle = ""
        if impacted_subs:
            subtitle += f"Affected Substations: {impacted_subs}"
        if affected_lines:
            subtitle += f"\nAffected Lines: {affected_lines}"
        
        plt.figtext(0.5, 0.01, subtitle, ha='center', fontsize=10)
        
        # Save the plot to file
        plt.savefig(os.path.join(save_dir, f"action_{action_idx}.png"), bbox_inches='tight')
        
        # Save to PDF
        pdf.savefig(fig)
        plt.close()
        
        print(f"  Visualization saved for action {action_idx}")
    
    # Create a summary page in the PDF
    fig, ax = plt.subplots(figsize=(12, 8))
    ax.axis('off')
    text = "Summary of Topology Actions\n\n"
    for idx, info in action_info.items():
        text += f"Action {idx}: {info['type']}\n"
        if info['impacted_subs']:
            text += f"  Substations: {info['impacted_subs']}\n"
        if info['affected_lines']:
            text += f"  Lines: {info['affected_lines']}\n"
        text += "\n"
    
    plt.text(0.05, 0.95, text, transform=ax.transAxes, fontsize=12, 
             verticalalignment='top', family='monospace')
    
    pdf.savefig(fig)
    plt.close()
    
    # Close PDF
    pdf.close()
    
    print(f"\nAll visualizations saved to directory: {save_dir}")
    print(f"Combined PDF saved to: {pdf_path}")
    
    # Close environment
    env.close()

def main():
    parser = argparse.ArgumentParser(description="Visualize topology actions")
    parser.add_argument("--n_actions", type=int, default=10, 
                      help="Number of topology actions to visualize (default: 10)")
    parser.add_argument("--save_dir", type=str, default="topo_visualizations", 
                      help="Directory to save visualizations (default: 'topo_visualizations')")
    parser.add_argument("--backend", type=str, choices=["matplotlib", "plotly"], default="matplotlib",
                      help="Plotting backend to use (default: matplotlib)")
    
    args = parser.parse_args()
    
    start_time = time.time()
    visualize_topology_actions(args.n_actions, args.save_dir, args.backend)
    elapsed = time.time() - start_time
    print(f"Total execution time: {elapsed:.2f} seconds")

if __name__ == "__main__":
    main()


	c:\Users\admin\miniconda3\envs\rl2grid\python.exe -m pip install numba

usage: ipykernel_launcher.py [-h] [--n_actions N_ACTIONS]
                             [--save_dir SAVE_DIR]
                             [--backend {matplotlib,plotly}]
ipykernel_launcher.py: error: unrecognized arguments: --f=c:\Users\admin\AppData\Roaming\jupyter\runtime\kernel-v37c779d7f6e9792a0b1426df03478e338897d9ba2.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
