<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#compute-the-LODF-matrix" data-toc-modified-id="compute-the-LODF-matrix-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>compute the LODF matrix</a></span></li><li><span><a href="#superposition-theorem-flow-computation" data-toc-modified-id="superposition-theorem-flow-computation-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>superposition theorem flow computation</a></span></li><li><span><a href="#Combinations-of-2-actions-of-same-kind" data-toc-modified-id="Combinations-of-2-actions-of-same-kind-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Combinations of 2 actions of same kind</a></span><ul class="toc-item"><li><span><a href="#2-line-reconnections" data-toc-modified-id="2-line-reconnections-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>2 line reconnections</a></span></li><li><span><a href="#2-line-disconnections" data-toc-modified-id="2-line-disconnections-3.2"><span class="toc-item-num">3.2&nbsp;&nbsp;</span>2 line disconnections</a></span></li><li><span><a href="#2-node-splitting" data-toc-modified-id="2-node-splitting-3.3"><span class="toc-item-num">3.3&nbsp;&nbsp;</span>2 node splitting</a></span></li><li><span><a href="#2-node-merging-actions" data-toc-modified-id="2-node-merging-actions-3.4"><span class="toc-item-num">3.4&nbsp;&nbsp;</span>2 node merging actions</a></span></li></ul></li><li><span><a href="#Four-action-combination-with-all-types" data-toc-modified-id="Four-action-combination-with-all-types-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Four action combination with all types</a></span></li></ul></div>

In [None]:
import sys
import grid2op
import numpy as np
from superposition_theorem import *
env = grid2op.make("l2rpn_case14_sandbox")
param = env.parameters
param.ENV_DC = True  # force the computation of the powerflow in DC mode
param.MAX_LINE_STATUS_CHANGED = 99999
param.MAX_SUB_CHANGED = 99999
param.NO_OVERFLOW_DISCONNECTION = True
env.change_parameters(param)
env.change_forecast_parameters(param)

In [None]:
time_series_id = 0

#if you want to reset the environment on your time serie, always do env.set_id first
env.set_id(time_series_id)
_ = env.reset()

In [None]:
paths = [i for i in sys.path]
for path in paths:
    sys.path.append(path)

# compute the LODF matrix 

This was the basis for the initial demonstrations. But we directly use the available grid states in the implementation now.

$$\delta_{i,j} = \frac{F_j^{i} - F_j}{F_i}$$

with:
- $\delta_{i,j}$ : "le coefficient de report"
- $F_k$: the active flow on line `k`
- $F_j^{i}$: the flow on line j if the line i is disconnected

We are interested, given all the $\delta_{i,j}$ to compute the $\delta_{\{i,j,k, ...\}, m}$ which are the flows on line `m` when the lines $\{i,j,k, ...\}$ are disconnected

**This is not explicitly needed anymore in our approach**

But is a common data structure used for all other approches such as LODF. So we keep its computation there

In [None]:
obs = env.reset()
#with env.copy() as tmp_env_init:
init_obs, *_ = obs.simulate(env.action_space(), 0)
F = 1. * init_obs.p_or

In [None]:
all_flows = np.zeros((env.n_line, env.n_line))
for l_id in range(env.n_line):
    obs_disco, *_ = obs.simulate(env.action_space({"set_line_status": [(l_id, -1)]}), time_step=0)
    all_flows[l_id, :] = obs_disco.p_or

In [None]:
A = (all_flows - F) / F.reshape(-1, 1)

In [None]:
from grid2op.PlotGrid import PlotMatplot
plot_helper = PlotMatplot(env.observation_space)
fig = plot_helper.plot_obs(init_obs,line_info="p")
fig.show()


# Flow computation: using the "superposition theorem"

Here is the extended superposition theroem for topological changes. The resulting powerflows is a linear combination of unitary change power flows:

\begin{align*}
    PF(T)=\alpha\times PF(T_{ref}) + \beta_1\times PF(T_{ref}\circ \tau_1) + \beta_2\times PF(T_{ref}\circ \tau_2) \\
    \text{with } T=T_{ref} \circ \tau_1 \circ \tau_2  \text{ and } \alpha=1-\beta_1-\beta_2
\end{align*}

We have $T_{ref}$ as the reference topology from which we apply topological changes $\tau_1$ and $\tau_2$ in indifferent order to reach a target topology $T$. 

Finding the betas simply stems from solving a linear system of dimension the number of considered changes. Only minimal information from individual power flow state is needed for this, without knowledge of any underlying grid properties or complete adjacency matrix.

In [None]:
from superposition_theorem.core.compute_power_flows import compute_flows_superposition_theorem_from_actions, compute_flows_superposition_theorem_from_unit_act_obs
import inspect
from IPython.display import Code

display(Code(inspect.getsource(compute_flows_superposition_theorem_from_unit_act_obs), language='python'))


This relies on the computation of beta coefficients from this function

In [None]:
from superposition_theorem.core.compute_beta_coefficients import get_betas_coeff_N_unit_acts_ultimate
display(Code(inspect.getsource(get_betas_coeff_N_unit_acts_ultimate), language='python'))


# Combinations of 2 actions of same kind

Notice: You can look at the tests of the package for extensive combinatorial use cases, and for getting a better grasp on the code

In [None]:
time_series_id = 0
decimal_accuracy = 4 # when assessing the accuracy of the superposition theorem computation compared to a full load flow computation

## 2 line reconnections

Compute the beta coefficients, compared to what betas are expected in the case we already know the target flows

In [None]:
from superposition_theorem.core.compute_power_flows import get_delta_theta_line

env.set_id(time_series_id)
env.reset()

id_l1 = 3  # 1#2#3
id_l2 = 7  # 2#4#7
unitary_action_list = [{"set_line_status": [(id_l1, +1)]},  # sub5
                       {"set_line_status": [(id_l2, +1)]},  # sub4
                       ]

# need opposite list of actions to start with an observation where those lines are disconnected
opposite_action_list = [{"set_line_status": [(id_l1, -1)]},
                        {"set_line_status": [(id_l2, -1)]}]

combined_opposite_action = env.action_space(opposite_action_list[0]) + env.action_space(opposite_action_list[1])
obs_start, reward, done, info  = env.step(combined_opposite_action)
if done:
    raise RuntimeError("The action you propose leads to a divergence of the power flow. No feasible state can be found.")
unitary_actions = [env.action_space(unitary_act) for unitary_act in unitary_action_list]

# computing obs_target load flow with the effect of combined action: this is the ground truth to compare to

combined_action = unitary_actions[0] + unitary_actions[1]
obs_target = obs_start.simulate(combined_action, time_step=0)[0]



In [None]:
print("initial state with lines disconnected to reconnect")
fig = plot_helper.plot_obs(obs_start,line_info="p")
fig.show()

In [None]:
print("final state with lines reconnected")
fig = plot_helper.plot_obs(obs_target, line_info="p")
fig.show()

"All by hand"

In [None]:
# computing intermediate states in which we applied one unitary action

obs1 = obs_start.simulate(unitary_actions[0], time_step=0)[0]
obs2 = obs_start.simulate(unitary_actions[1], time_step=0)[0]

# solving the linear system by "hand" and computing the superposition
a = np.array([[1, 1 - get_delta_theta_line(obs2, id_l1) / get_delta_theta_line(obs_start, id_l1)],
              [1 -  get_delta_theta_line(obs1, id_l2) / get_delta_theta_line(obs_start, id_l2), 1]])

b = np.ones(2)
betas = np.linalg.solve(a, b)

print(f"beta coefficients are: {betas}")

p_target_computed = betas[0] * obs1.p_or + betas[1] * obs2.p_or + (1 - betas.sum()) * obs_start.p_or

assert (np.all((np.round(obs_target.p_or - p_target_computed,decimal_accuracy ) == 0.0)))

print(p_target_computed)


You could have computed the betas with this function

In [None]:
idls_lines = [id_l1, id_l2]
unit_act_observations = [obs1, obs2]

# getting delta thetas and ative power flows
delta_theta_unit_act_lines = np.array(
    [[get_delta_theta_line(obs_connect_idl, id_lj) for id_lj in idls_lines] for obs_connect_idl in
     unit_act_observations])

p_or_unit_act_lines = np.array([[obs_connect_idl.p_or[id_lj] for id_lj in idls_lines] for obs_connect_idl in
                                    unit_act_observations])#[0:n_line_actions]])

# obs_start
delta_theta_obs_start_lines = np.array([get_delta_theta_line(obs_start, id_lj) for id_lj in idls_lines])
p_or_obs_start_lines = np.array([obs_start.p_or[id_lj] for id_lj in idls_lines])


betas = get_betas_coeff_N_unit_acts_ultimate(delta_theta_unit_act_lines,
                                             delta_theta_obs_start_lines,
                                             p_or_unit_act_lines,
                                             p_or_obs_start_lines,
                                             delta_theta_obs_target=None,
                                             p_or_obs_target=None,
                                             idls=idls_lines,
                                             verbose=True)

print(f"beta coefficients are: {betas}")

Or you could have directly computed the resulting flows with the superposition theorem


In [None]:
idls_subs = []
p_or_combined_action = compute_flows_superposition_theorem_from_unit_act_obs(idls_lines,
                                                                             idls_subs,
                                                                             obs_start,
                                                                             obs_target,
                                                                             unit_act_observations)
print("check target flows")
decimal_digit_precision = 4
target_obs, *_ = obs_start.simulate(combined_action, time_step=0)
print(target_obs.p_or)
print(p_or_combined_action)
assert (np.all((np.round(target_obs.p_or - p_or_combined_action, decimal_digit_precision) == 0.0)))
print(f"Max difference (MW): {np.max(np.abs(target_obs.p_or - p_or_combined_action))}")

## 2 line disconnections

compute the beta coefficients, compared to what betas are expected in the case we already know the target flows

In [None]:
env.set_id(time_series_id)
env.reset()

id_l1 = 3  # 1#2#3
id_l2 = 7  # 2#4#7
unitary_action_list = [{"set_line_status": [(id_l1, -1)]},  # sub5
                       {"set_line_status": [(id_l2, -1)]},  # sub4
                       ]


obs_start, *_  = env.step(env.action_space({}))

unitary_actions = [env.action_space(unitary_act) for unitary_act in unitary_action_list]

# computing obs_target load flow with the effect of combined action: this is the ground truth to compare to

combined_action = unitary_actions[0] + unitary_actions[1]
obs_target = obs_start.simulate(combined_action, time_step=0)[0]



In [None]:
print("initial state with lines disconnected to reconnect")
fig = plot_helper.plot_obs(obs_start, line_info="p")
fig.show()

In [None]:
print("final state with lines reconnected")
fig = plot_helper.plot_obs(obs_target, line_info="p")
fig.show()

"All by hand"

In [None]:
# computing intermediate states in which we applied one unitary action

obs1 = obs_start.simulate(unitary_actions[0], time_step=0)[0]
obs2 = obs_start.simulate(unitary_actions[1], time_step=0)[0]

# solving the linear system by "hand" and computing the superposition
a = np.array([[1, 1 - obs2.p_or[id_l1] / obs_start.p_or[id_l1]],
              [1 - obs1.p_or[id_l2] / obs_start.p_or[id_l2], 1]])

b = np.ones(2)
betas = np.linalg.solve(a, b)

print("beta coefficients are: "+str(betas))

p_target_computed = betas[0] * obs1.p_or + betas[1] * obs2.p_or + (1 - betas.sum()) * obs_start.p_or

print(f"Max difference (MW): {np.max(np.abs(obs_target.p_or - p_target_computed))}")

print(p_target_computed)


Or you could have directly computed the resulting flows with the superposition theorem


In [None]:
idls_subs=[]
idls_lines = [id_l1, id_l2]
unit_act_observations = [obs1, obs2]

p_or_combined_action = compute_flows_superposition_theorem_from_unit_act_obs(idls_lines,
                                                                             idls_subs,
                                                                             obs_start,
                                                                             obs_target,
                                                                             unit_act_observations)
print("check target flows")
decimal_digit_precision = 4
target_obs, *_ = obs_start.simulate(combined_action, time_step=0)
print(target_obs.p_or)
print(p_or_combined_action)
print(f"Max difference (MW): {np.max(np.abs(target_obs.p_or - p_or_combined_action))}")

## 2 node splitting

compute the beta coefficients, compared to what betas are expected in the case we already know the target flows

In [None]:
env.set_id(time_series_id)
env.reset()

id_sub1 = 5  
id_sub2 = 4  
unitary_action_list = [{'set_bus': {'substations_id': [(5, (1, 1, 2, 2, 1, 2, 2))]}},  # sub5
                       {'set_bus': {'substations_id': [(4, (2, 1, 2, 1, 2))]}},  # sub4
                       ]


obs_start, *_  = env.step(env.action_space({}))

unitary_actions = [env.action_space(unitary_act) for unitary_act in unitary_action_list]

# computing obs_target load flow with the effect of combined action: this is the ground truth to compare to

combined_action = unitary_actions[0] + unitary_actions[1]
obs_target = obs_start.simulate(combined_action, time_step=0)[0]

In [None]:
print("initial state with lines disconnected to reconnect")
fig = plot_helper.plot_obs(obs_start,line_info="p")
fig.show()

In [None]:
print("final state with lines reconnected")
fig = plot_helper.plot_obs(obs_target,line_info="p")
fig.show()

"All by hand"

In [None]:
from superposition_theorem.core.compute_power_flows import get_sub_node1_idsflow,get_virtual_line_flow
# computing intermediate states in which we applied one unitary action

obs1 = obs_start.simulate(unitary_actions[0], time_step=0)[0]
obs2 = obs_start.simulate(unitary_actions[1], time_step=0)[0]

(ind_load_node1_sub1, ind_prod_node1_sub1, ind_lor_node1_sub1, ind_lex_node1_sub1) = get_sub_node1_idsflow(obs1,
                                                                                                           id_sub1)
(ind_load_node1_sub2, ind_prod_node1_sub2, ind_lor_node1_sub2, ind_lex_node1_sub2) = get_sub_node1_idsflow(obs2,
                                                                                                           id_sub2)

# virtual line flows: this is not directly computed by the load flow solver.
# you need to recover the ids of the element of one of the that will appear after splitting
# and compute the imbalance of flows for these elements before the substation gets split:
# this gives the equivalent of virtual line flows between the two nodes that will appear
p_start_sub1 = get_virtual_line_flow(obs_start, ind_load_node1_sub1, ind_prod_node1_sub1, ind_lor_node1_sub1,
                                     ind_lex_node1_sub1)
p_start_sub2 = get_virtual_line_flow(obs_start, ind_load_node1_sub2, ind_prod_node1_sub2, ind_lor_node1_sub2,
                                     ind_lex_node1_sub2)

p_obs1_sub2 = get_virtual_line_flow(obs1, ind_load_node1_sub2, ind_prod_node1_sub2, ind_lor_node1_sub2,
                                    ind_lex_node1_sub2)

p_obs2_sub1 = get_virtual_line_flow(obs2, ind_load_node1_sub1, ind_prod_node1_sub1, ind_lor_node1_sub1,
                                    ind_lex_node1_sub1)

# solving the linear system by "hand" and computing the superposition
a = np.array([[1, 1 - p_obs2_sub1 / p_start_sub1],
              [1 - p_obs1_sub2 / p_start_sub2, 1]])

b = np.ones(2)
betas = np.linalg.solve(a, b)

p_target_computed = betas[0] * obs1.p_or + betas[1] * obs2.p_or + (1 - betas.sum()) * obs_start.p_or
assert (np.all((np.round(obs_target.p_or - p_target_computed,decimal_accuracy ) == 0.0)))

print(p_target_computed)


Or you could have directly computed the resulting flows with the superposition theorem


In [None]:
idls_subs=[id_sub1,id_sub2]
idls_lines = []
unit_act_observations = [obs1,obs2]

p_or_combined_action = compute_flows_superposition_theorem_from_unit_act_obs(idls_lines,
                                                                             idls_subs,
                                                                             obs_start,
                                                                             obs_target,
                                                                             unit_act_observations)
print("check target flows")
decimal_digit_precision = 4
target_obs, *_ = obs_start.simulate(combined_action, time_step=0)
print(target_obs.p_or)
print(p_or_combined_action)
assert (np.all((np.round(target_obs.p_or - p_or_combined_action, decimal_digit_precision) == 0.0)))
print(f"Max difference (MW): {np.max(np.abs(target_obs.p_or - p_or_combined_action))}")

## 2 node merging actions

compute the beta coefficients, compared to what betas are expected in the case we already know the target flows

In [None]:
env.set_id(time_series_id)
env.reset()

id_sub1 = 5  
id_sub2 = 4  

unitary_action_list = [{'set_bus': {'substations_id': [(5, (1, 1, 1, 1, 1, 1, 1))]}},  # sub5
                       {'set_bus': {'substations_id': [(4, (1, 1, 1, 1, 1))]}},  # sub4
                       ]

# need ooposite list of actions to start with an observation where those lines are disconnected
opposite_action_list = [{'set_bus': {'substations_id': [(5, (1, 1, 2, 2, 1, 2, 2))]}},  # sub5
                       {'set_bus': {'substations_id': [(4, (2, 1, 2, 1, 2))]}},  # sub4
                       ]

combined_opposite_action = env.action_space(opposite_action_list[0])+env.action_space(opposite_action_list[1])
obs_start, *_  = env.step(combined_opposite_action)

unitary_actions = [env.action_space(unitary_act) for unitary_act in unitary_action_list]

# computing obs_target load flow with the effect of combined action: this is the ground truth to compare to

combined_action = unitary_actions[0] + unitary_actions[1]
obs_target = obs_start.simulate(combined_action, time_step=0)[0]



In [None]:
print("initial state with lines disconnected to reconnect")
fig = plot_helper.plot_obs(obs_start,line_info="p")
fig.show()

In [None]:
print("final state with lines reconnected")
fig = plot_helper.plot_obs(obs_target,line_info="p")
fig.show()

"All by hand"

In [None]:
from superposition_theorem.core.compute_power_flows import get_delta_theta_sub_2nodes
# computing intermediate states in which we applied one unitary action

obs1 = obs_start.simulate(unitary_actions[0], time_step=0)[0]
obs2 = obs_start.simulate(unitary_actions[1], time_step=0)[0]

# computing delta thetas at a substation between nodes split
delta_theta_sub1_obs2 = get_delta_theta_sub_2nodes(obs2,id_sub1)
delta_theta_sub1_obs_start = get_delta_theta_sub_2nodes(obs_start, id_sub1)

delta_theta_sub2_obs1 = get_delta_theta_sub_2nodes(obs1,id_sub2)
delta_theta_sub2_obs_start = get_delta_theta_sub_2nodes(obs_start, id_sub2)

# solving the linear system by "hand" and computing the superposition
a = np.array([[1, 1 - delta_theta_sub1_obs2 / delta_theta_sub1_obs_start],
                      [1 - delta_theta_sub2_obs1 / delta_theta_sub2_obs_start, 1]])

b = np.ones(2)
betas = np.linalg.solve(a, b)

p_target_computed = betas[0] * obs1.p_or + betas[1] * obs2.p_or + (1 - betas.sum()) * obs_start.p_or
assert (np.all((np.round(obs_target.p_or - p_target_computed,decimal_accuracy ) == 0.0)))

print(p_target_computed)


Or you could have directly computed the resulting flows with the superposition theorem


In [None]:
idls_subs=[id_sub1, id_sub2]
idls_lines = []
unit_act_observations = [obs1,obs2]

p_or_combined_action = compute_flows_superposition_theorem_from_unit_act_obs(idls_lines,
                                                                             idls_subs,
                                                                             obs_start,
                                                                             obs_target,
                                                                             unit_act_observations)
print("check target flows")
decimal_digit_precision = 4
target_obs, *_ = obs_start.simulate(combined_action, time_step=0)
print(target_obs.p_or)
print(p_or_combined_action)
assert (np.all((np.round(target_obs.p_or - p_or_combined_action, decimal_digit_precision) == 0.0)))
print(f"Max difference (MW): {np.max(np.abs(target_obs.p_or - p_or_combined_action))}")

# Four action combination with all types

Initial state starts with one substation with node splitting on which we apply merging, and one line disconnected that we will reconnect.

Conversely we will split nodes at another substation and disconnect another line

WARNING: be careful if you do a line action and substation action that belongs to the same substation, Grid2op might return that it is an ambiguous action and don't play them.

In [None]:
env.set_id(time_series_id)
env.reset()

#be careful with ambiguous actions when combining actions on lines and substations such as
#Grid2OpException AmbiguousAction InvalidLineStatus InvalidLineStatus('You ask to disconnect a powerline but also to connect it to a certain bus.')

id_l1 = 3  # 1#2#3
id_l2 = 2  # 2#4#7
id_sub1 = 5
id_sub2 = 4
idls_lines = [id_l1, id_l2]
idls_subs = [id_sub1, id_sub2]

unitary_action_list = [{"set_line_status": [(id_l1, +1)]},  # reconnection
                       {"set_line_status": [(id_l2, -1)]}, # disconnection
                       {'set_bus': {'substations_id': [(5, (1, 1, 1, 1, 1, 1, 1))]}},  # merging
                       {'set_bus': {'substations_id': [(4, (2, 1, 2, 1, 2))]}},  # splitting
                       ]

# need ooposite list of actions to start with an observation where those lines are disconnected
opposite_action_list = [{"set_line_status": [(id_l1, -1)]},
                        {"set_line_status": [(id_l2, +1)]},
                        {'set_bus': {'substations_id': [(5, (1, 1, 2, 2, 1, 2, 2))]}},
                        {'set_bus': {'substations_id': [(4, (1, 1, 1, 1, 1))]}},
                        ]

combined_opposite_action=env.action_space({})
for unit_act in opposite_action_list:
    combined_opposite_action+=env.action_space(unit_act)
obs_start, *_ = env.step(combined_opposite_action)

unitary_actions = [env.action_space(unitary_act) for unitary_act in unitary_action_list]

# computing obs_target load flow with the effect of combined action: this is the ground truth to compare to

n_actions = len(unitary_actions)
combined_action = unitary_actions[0]
for i in range(1, n_actions):
    combined_action += unitary_actions[i]
obs_target, *_ = obs_start.simulate(combined_action, time_step=0)

In [None]:
print("initial state with lines disconnected to reconnect")
fig = plot_helper.plot_obs(obs_start,line_info="p")
fig.show()

In [None]:
print("final state with lines reconnected")
fig = plot_helper.plot_obs(obs_target,line_info="p")
fig.show()

In [None]:
unitary_actions = [env.action_space(unitary_act) for unitary_act in unitary_action_list]


# running superposition theorem function
unit_act_observations = [obs_start.simulate(action, time_step=0)[0] for action in unitary_actions]


p_or_combined_action = compute_flows_superposition_theorem_from_unit_act_obs(idls_lines,
                                                                             idls_subs,
                                                                             obs_start,
                                                                             target_obs=None,
                                                                             unit_act_observations = unit_act_observations
                                                                             )

print("check target flows")
decimal_digit_precision = 4
target_obs, *_ = obs_start.simulate(combined_action, time_step=0)
print(target_obs.p_or)
print(p_or_combined_action)
assert (np.all((np.round(target_obs.p_or - p_or_combined_action, decimal_digit_precision) == 0.0)))
print(f"Max difference (MW): {np.max(np.abs(target_obs.p_or - p_or_combined_action))}")
