# **Final Presentation**



In this notebook team Power_Factor will describe its developed package: the different functionalities, the completed assignments that led to the developement of the package as well as some code demonstrations and collaboration discussion

In [96]:
from typing import List, Tuple

import networkx as nx
import random
import json
import pprint
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.sparse import csr_array
from IPython.display import display
from scipy.sparse.csgraph import connected_components
from power_grid_model.utils import json_deserialize, json_serialize
from power_grid_model.validation import ValidationException, assert_valid_batch_data, assert_valid_input_data
from power_grid_model import (
    CalculationMethod,
    CalculationType,
    ComponentType,
    DatasetType,
    PowerGridModel,
    initialize_array,
)

# **Assignment 1 -graph processing**

In assignment 1 the task is to build a graph processing class. For a given undirected graph as input, there are two functionalities to implement:
- Find downstream vertices - given a specific edge from the graph, find the downstream vertices of the edge, including the downstream vertex of the edge itself
- Find alternative edges - given a specific enabled edge from the graph, returns a list that indicates which currently disabled edges can be enabled so that the graph is again fully connected and acyclic
            

# **Check input validity**

Before the either of the two functionalities are used, the input data must be checked for validity. There are 7 criteria that are evaluated:
1. The vertex (node) and edge ids should be unique
2. The number of edges (or connections between two separate vertices) should have the same as the number of edge ids
3. The vertices connected by the specified edges should be valid vertex ids
4. The number of enabled/disabled edges should be the same as the number of edge ids
5. The id of the source vertex should be a valid vertex id
6. The graph should be fully connected
7. The graph should not contain cycles

Furthermore, find_alternative_edges functionality is important to check if the edge to be disabled has a valid id and whether it is already disabled.

In [97]:
class IDNotFoundError(Exception):
    "Raised when a source or edge id is not found/valid"


class InputLengthDoesNotMatchError(Exception):
    "Raised when number of enabled and disabled edges does not match number of total edges or number of vertex pairs does not match number of edges"


class IDNotUniqueError(Exception):
    "Raised when vertex or edge ids are not unique"


class GraphNotFullyConnectedError(Exception):
    "Raised when graph contains more than 1 component"


class GraphCycleError(Exception):
    "Raised when graph contains a cycle"


class EdgeAlreadyDisabledError(Exception):
    "Edge is already disabled"


def check_unique(vertex_ids, edge_ids):  # checks vertex ids or edge ids are unique
    ver = list(set(vertex_ids))  # a set is used because sets contain only unique values
    edge = list(set(edge_ids))
    if len(ver) != len(vertex_ids) or len(edge) != len(edge_ids):
        raise IDNotUniqueError("Vertex or edge ids are not unique")


def check_length_pairs(edge_vertex_id_pairs, edge_ids):  # checks if number of vertex pairs matches the number of edges
    if len(edge_vertex_id_pairs) != len(edge_ids):
        raise InputLengthDoesNotMatchError("Number of vertex pairs does not match number of edges")


def check_found_pairs(edge_vertex_id_pairs, vertex_ids):  # checks if all vertex pairs contain valid vertex ids
    if all(all(elem in vertex_ids for elem in t) for t in edge_vertex_id_pairs) == False:
        raise IDNotFoundError("Vertex id not found in edge array")


def check_length_enabled(
    edge_enabled, edge_ids
):  # checks if the number of enabled/disabled edges is the same as the number of edge ids
    if len(edge_enabled) != len(edge_ids):
        raise InputLengthDoesNotMatchError("Number of enabled and disabled edges does not match number of total edges")


def check_found_source(source_vertex_id, vertex_ids):  # checks if the source vertex id is valid
    if source_vertex_id not in vertex_ids:
        raise IDNotFoundError("Source vertex id not found")


def check_connect(vertex_ids, edge_enabled, edge_vertex_id_pairs):  # checks if graph is fully connected
    size = len(vertex_ids)
    sparseMatrix = [[0 for i in range(size)] for j in range(size)]
    for i in range(size):
        for j in range(size):
            if ((vertex_ids[i], vertex_ids[j]) in edge_vertex_id_pairs) and sparseMatrix[i][j] == 0:
                if edge_enabled[edge_vertex_id_pairs.index((vertex_ids[i], vertex_ids[j]))]:
                    sparseMatrix[i][j] = vertex_ids[j]
                    sparseMatrix[j][i] = vertex_ids[i]
            elif ((vertex_ids[j], vertex_ids[i]) in edge_vertex_id_pairs) and sparseMatrix[i][j] == 0:
                if edge_enabled[edge_vertex_id_pairs.index((vertex_ids[j], vertex_ids[i]))]:
                    sparseMatrix[i][j] = vertex_ids[j]
                    sparseMatrix[j][i] = vertex_ids[i]
    graph = csr_array(sparseMatrix)
    components = connected_components(graph)
    if components[0] > 1:
        raise GraphNotFullyConnectedError("Graph contains more than 1 component")


def check_cycle(edge_enabled, edge_vertex_id_pairs):  # checks if graph contains cycles
    G = nx.Graph()
    for (u, v), enabled in zip(edge_vertex_id_pairs, edge_enabled):
        if enabled:
            G.add_edge(u, v)

    has_cycle = nx.is_forest(G)  # A forest is a graph with no undirected cycles
    if has_cycle == False:
        raise GraphCycleError("Graph contains a cycle")


def check_found_edges(disabled_edge_id, all_edges):  # checks if the edge to be disabled has a valid edge id
    if disabled_edge_id not in all_edges:
        raise IDNotFoundError("Disabled edge id not found in edge array")


def check_disabled(disabled_edge_id, edge_ids, edge_enabled):  # checks if the edge to be disabled is already disabled
    if edge_enabled[edge_ids.index(disabled_edge_id)] == False:
        raise EdgeAlreadyDisabledError("Edge is already disabled")

With all of the necessary needed external functions declared, the graph processing class can now also be created

In [98]:
class GraphProcessor(nx.Graph):
    def __init__(
        self,
        vertex_ids: List[int],
        edge_ids: List[int],
        edge_vertex_id_pairs: List[Tuple[int, int]],
        edge_enabled: List[bool],
        source_vertex_id: int,
    ) -> None:
        super().__init__()
        check_unique(vertex_ids, edge_ids)
        check_length_pairs(edge_vertex_id_pairs, edge_ids)
        check_found_pairs(edge_vertex_id_pairs, vertex_ids)
        check_length_enabled(edge_enabled, edge_ids)
        check_found_source(source_vertex_id, vertex_ids)
        check_connect(vertex_ids, edge_enabled, edge_vertex_id_pairs)
        check_cycle(edge_enabled, edge_vertex_id_pairs)

        self.source_vertex_id = source_vertex_id
        self.vertex_ids = vertex_ids
        self.edge_enabled = edge_enabled
        self.edge_ids = edge_ids
        self.edge_vertex_id_pairs = edge_vertex_id_pairs
        self.add_nodes_from(vertex_ids)
        for i, (u, v) in enumerate(edge_vertex_id_pairs):
            self.add_edge(u, v, id=edge_ids[i], enabled=edge_enabled[i])

        self.enabled_subgraph = nx.Graph()
        for (u, v), enabled in zip(edge_vertex_id_pairs, edge_enabled):
            if enabled:
                self.enabled_subgraph.add_edge(u, v)
        # DFS tree & parent map from source
        self.dfs_tree = nx.dfs_tree(self.enabled_subgraph, self.source_vertex_id)
        self.parent_map = {child: parent for parent, child in nx.dfs_edges(self.dfs_tree, source=self.source_vertex_id)}

    def find_downstream_vertices(self, edge_id: int) -> List[int]:
        if edge_id not in self.edge_ids:
            raise IDNotFoundError("Edge ID not found.")

        edge_index = self.edge_ids.index(edge_id)
        if not self.edge_enabled[edge_index]:
            return []

        u, v = self.edge_vertex_id_pairs[edge_index]

        # Ensure both u and v are reachable
        if u not in self.dfs_tree or v not in self.dfs_tree:
            return []

        # Determine downstream vertex (child in DFS tree)
        if self.parent_map.get(v) == u:
            downstream_root = v
        elif self.parent_map.get(u) == v:
            downstream_root = u
        else:
            # If neither is parent of the other, one of them is ancestor; pick the child
            # or fallback to whichever is deeper
            depth = nx.single_source_shortest_path_length(self.dfs_tree, self.source_vertex_id)
            downstream_root = v if depth.get(v, 0) > depth.get(u, 0) else u

        descendants = list(nx.descendants(self.dfs_tree, downstream_root))
        return [downstream_root] + descendants

    def find_alternative_edges(self, disabled_edge_id: int) -> List[int]:
        ans = []
        check_found_edges(disabled_edge_id, self.edge_ids)
        check_disabled(disabled_edge_id, self.edge_ids, self.edge_enabled)
        H = nx.Graph()
        for i, (u, v) in enumerate(self.edge_vertex_id_pairs):
            if self.edge_enabled[i] == True and self.edge_ids[i] != disabled_edge_id:
                H.add_edge(u, v)
        for i, (u, v) in enumerate(self.edge_vertex_id_pairs):
            if self.edge_enabled[i] == False and self.edge_ids[i] != disabled_edge_id:
                H.add_edge(u, v)
                if nx.number_connected_components(H) == 1 and nx.is_forest(H):
                    ans.append(self.edge_ids[i])
                H.remove_edge(u, v)
        return ans

With the class implemented, its functionalities can now be tested.

In [99]:
vertex_ids1 = [0, 2, 4, 6, 10]
edge_ids1 = [1, 3, 5, 7, 9, 8]
edge_vertex_id_pairs1 = [(0, 2), (0, 4), (0, 6), (2, 4), (2, 10), (4, 6)]
edge_enabled1 = [True, True, True, False, True, False]
source_vertex_id1 = 0

"""
vertex_0 (source) --edge_1(enabled)-- vertex_2 --edge_9(enabled)-- vertex_10
|                               |
|                           edge_7(disabled)
|                               |
-----------edge_3(enabled)-- vertex_4
|                               |
|                           edge_8(disabled)
|                               |
-----------edge_5(enabled)-- vertex_6
"""

test_graph1 = GraphProcessor(vertex_ids1, edge_ids1, edge_vertex_id_pairs1, edge_enabled1, source_vertex_id1)
edge_to_disable = 3
print(
    f"The list of alternative edges when disabling edge {edge_to_disable} is: {test_graph1.find_alternative_edges(edge_to_disable)}"
)

The list of alternative edges when disabling edge 3 is: [7, 8]


In [100]:
vertex_ids2 = [0, 2, 4, 6, 8, 10, 12]
edge_ids2 = [1, 3, 5, 7, 9, 11]
edge_vertex_id_pairs2 = [(0, 2), (2, 4), (2, 6), (4, 8), (8, 10), (6, 12)]
edge_enabled2 = [True, True, True, True, True, True]
source_vertex_id2 = 0

"""
    vertex_0 (source) --edge_1-- vertex_2 --edge_3-- vertex_4--edge 7--vertex 8 --edge 9--vertex 10
                                    |
                                  edge 5
                                    |
                                 vertex 6  --edge 11 --vertex 12
"""

test_graph2 = GraphProcessor(vertex_ids2, edge_ids2, edge_vertex_id_pairs2, edge_enabled2, source_vertex_id2)
edge_down = 3
print(f"The list of downstream vertices for edge {edge_down} is {test_graph2.find_downstream_vertices(edge_down)}")

The list of downstream vertices for edge 3 is [4, 8, 10]


# **Assignment 2 - power grid model**

For assignment 2, the first thing that needs to be done is to load the input data and batch profiles. If any of these are not valid a suitable exception is raised.

In [101]:
def load_input_data(filepath: str):
    """Read and deserialize input network data from a JSON file."""
    try:
        with open(filepath) as fp:
            try:
                data = fp.read()
            except Exception:
                raise ValidationException("Error reading input data file")
    except Exception:
        raise ValidationException("Error reading input data file")
    try:
        input_data = json_deserialize(data)
    except Exception:
        raise ValidationException("Error deserializing input data")
    return input_data


def load_batch_profiles(active_profile_path: str, reactive_profile_path: str):
    """
    Read active and reactive power profiles from parquet files,
    check matching timestamps, and return them.
    """
    try:
        active_power_profile = pd.read_parquet(active_profile_path)
        reactive_power_profile = pd.read_parquet(reactive_profile_path)
    except Exception as e:
        raise ValidationException(f"Error reading batch profile files: {e}")
    if not active_power_profile.index.equals(reactive_power_profile.index):
        raise ValidationException("Active and reactive batch data have different timestamps")
    return active_power_profile, reactive_power_profile

Then, the batch profiles can be loaded which will become the update data for the power flow calculations. The user can choose which profile to load by passing a string.
The transformer data is used for assignment 3, however, if nothing about the transformer is specified, the function will simply not create update data for the transformer, therefore making this function compatible with previous implementations.

In [102]:
def prepare_update_data(
    active_batch_profile: pd.DataFrame,
    reactive_batch_profile: pd.DataFrame,
    tap_pos: int = -1,
    transformer_id: int = -1,
    profile_type: str = "active",
):
    """
    Given:
      - active_batch_profile: DataFrame indexed by timestamps, columns=house IDs → active p
      - reactive_batch_profile: same shape → reactive q
      - transformer_id: the int ID of the MV/LV transformer
      - tap_pos: the integer tap position to use for every time step
    Build and return the full update_data dict for:
      1) sym_load (p_specified, q_specified)
      2) transformer (id, tap_pos, from_status, to_status)
    """
    # initialize the sym_load update
    load_profile = initialize_array(DatasetType.update, ComponentType.sym_load, active_batch_profile.shape)

    ids = active_batch_profile.columns.to_numpy()
    try:
        load_profile["id"] = ids
    except (ValueError, TypeError):
        load_profile["id"] = np.array(ids, dtype=object)

    # fill p_specified / q_specified
    data_active = active_batch_profile.to_numpy()
    data_reactive = reactive_batch_profile.to_numpy()
    if profile_type == "active":
        load_profile["p_specified"] = data_active
        load_profile["q_specified"] = 0.0
    elif profile_type == "reactive":
        load_profile["p_specified"] = 0.0
        load_profile["q_specified"] = data_reactive
    elif profile_type == "both":
        load_profile["p_specified"] = data_active
        load_profile["q_specified"] = data_reactive
    else:
        raise ValueError(f"Unknown profile_type '{profile_type}', must be 'active', 'reactive', or 'both'")

    # start composing the return dict
    update_data = {ComponentType.sym_load: load_profile}

    # If specified build the transformer update data
    if transformer_id >= 0:
        # initialize the transformer update array
        transformer_update = initialize_array(
            DatasetType.update, ComponentType.transformer, (active_batch_profile.shape[0], 1)
        )

        # assign the transformer ID
        transformer_update["id"] = np.array([transformer_id], dtype=int)

        # set the tap position for all timestamps
        transformer_update["tap_pos"] = tap_pos

        update_data[ComponentType.transformer] = transformer_update

    return update_data

The power flow is calculated with the input and update data, and if the calculation fails, a detailed error message will be given

In [103]:
def calculate_power_flow(input_data, update_data):
    """
    Assert validity and run the power flow calculation, returning output_data.
    """
    try:
        assert_valid_batch_data(
            input_data=input_data, update_data=update_data, calculation_type=CalculationType.power_flow
        )
        model = PowerGridModel(input_data=input_data)
        output_data = model.calculate_power_flow(
            update_data=update_data, calculation_method=CalculationMethod.newton_raphson, threading=0
        )
    except ValidationException as ex:
        # Print each error found
        for error in ex.errors:
            print(type(error).__name__, error.component, ":", error.ids)
        raise
    return output_data

Now, the power flow data can be analyzed with the following functions

In [104]:
def calculate_node_stats(output_data, batch_profile: pd.DataFrame):
    """
    For each node in batch_profile.index, compute min/max voltages and corresponding node indices.
    Returns a DataFrame with columns: ["id", "Max_voltage", "Max_voltage_node", "Min_voltage", "Min_voltage_node"].
    """
    ids_node = batch_profile.index
    n = len(ids_node)
    min_voltage = np.zeros(n)
    max_voltage = np.zeros(n)
    min_voltage_id = np.zeros(n, dtype=int)
    max_voltage_id = np.zeros(n, dtype=int)

    u_pu = output_data[ComponentType.node]["u_pu"]
    # For each timestamp index i, find min/max along the node axis
    for i in range(n):
        row = u_pu[i, :]
        min_idx = row.argmin()
        max_idx = row.argmax()
        min_voltage[i] = row[min_idx]
        max_voltage[i] = row[max_idx]
        # +1 to handle 0-based indexing in Python
        min_voltage_id[i] = min_idx + 1
        max_voltage_id[i] = max_idx + 1

    output_node = pd.DataFrame(
        {
            "id": ids_node,
            "Max_voltage": max_voltage,
            "Max_voltage_node": max_voltage_id,
            "Min_voltage": min_voltage,
            "Min_voltage_node": min_voltage_id,
        }
    )
    return output_node

In [105]:
def calculate_line_stats(output_data, batch_profile: pd.DataFrame):
    """
    For each line ID, compute min/max loading with timestamps, and total energy loss.
    Returns a DataFrame with columns:
      ["id", "Total_loss", "Max_loading", "Max_loading_timestamp", "Min_loading", "Min_loading_timestamp"].
    """
    # find all the unique line IDs
    ids = np.unique(output_data[ComponentType.line]["id"])
    n_lines = len(ids)
    min_loading = np.zeros(n_lines)
    max_loading = np.zeros(n_lines)
    min_loading_timestamp = np.empty(n_lines, dtype=object)
    max_loading_timestamp = np.empty(n_lines, dtype=object)

    loading = output_data[ComponentType.line]["loading"]
    timestamps = batch_profile.index

    # Compute min/max and find matching timestamp
    for i in range(n_lines):
        col = loading[:, i]
        min_val = col.min()
        max_val = col.max()
        min_loading[i] = min_val
        max_loading[i] = max_val

        min_idx = int(np.where(col == min_val)[0][0])
        max_idx = int(np.where(col == max_val)[0][0])
        min_loading_timestamp[i] = timestamps[min_idx]
        max_loading_timestamp[i] = timestamps[max_idx]

    # Compute total energy loss with trapezoidal rule:
    p_from = output_data[ComponentType.line]["p_from"]
    p_to = output_data[ComponentType.line]["p_to"]
    total_loss = np.trapezoid(p_from + p_to, dx=3600 * (10**9), axis=0) / (3.6 * (10**15))

    output_line = pd.DataFrame(
        {
            "id": ids,
            "Total_loss": total_loss,
            "Max_loading": max_loading,
            "Max_loading_timestamp": max_loading_timestamp,
            "Min_loading": min_loading,
            "Min_loading_timestamp": min_loading_timestamp,
        }
    )
    return output_line

Change working directory to keep spaghetti code running:

In [106]:
#import os
#print("Current working directory:", os.getcwd())
#os.chdir('C:\\Users\\Kossyo\\Desktop\\Уни\\Power System computation and simulation\\Github desktop project\\power-system-simulation-Power_Factor')

The data can be shown using the following functions

In [107]:
def display_results(output_node: pd.DataFrame, output_line: pd.DataFrame):
    """Display the node and line result tables."""
    print("\nSelf-made output node table: ")
    display(output_node)
    print("\nSelf-made line table: ")
    display(output_line)


def compare_with_expected(timestamp_table_path: str, line_table_path: str):
    """Read expected output tables and display them for comparison."""
    try:
        expected_output_timestamp = pd.read_parquet(timestamp_table_path)
        expected_output_line = pd.read_parquet(line_table_path)
    except Exception as e:
        raise ValidationException(f"Error reading expected output files: {e}")

    # print("\nProvided timestamp table: ")
    # display(expected_output_timestamp)
    # print("\nProvided line table: ")
    # display(expected_output_line)


def power_flow_results(update_data, batch_profile):
    """
    This function performs an analysis of the power flow results by:
      - Asserting and computing power flow
      - Calculating node stats
      - Calculating line stats (including energy loss)
      - Displaying results and comparing with expected tables
    """
    # Compute power flow
    try:
        output_data = calculate_power_flow(input_data=input_data, update_data=update_data)
    except ValidationException:
        return

    # Node calculation
    output_node = calculate_node_stats(output_data, batch_profile)

    # Line calculation
    output_line = calculate_line_stats(output_data, batch_profile)

    display_results(output_node, output_line)

    # Compare with expected outputs
    compare_with_expected(
        "data/expected_output/output_table_row_per_timestamp.parquet",
        "data/expected_output/output_table_row_per_line.parquet",
    )


def main():
    """Executes all the functions: load data, prepare update_data, and call power_flow_results."""
    # Load input network data
    input_filepath = "data/input/input_network_data.json"
    try:
        global input_data  # so power_flow_results can access it by name as in original code
        input_data = load_input_data(input_filepath)
    except ValidationException as e:
        print(f"Failed to load input data: {e}")
        return

    # Load batch profiles
    try:
        active_power_profile, reactive_power_profile = load_batch_profiles(
            "data/input/active_power_profile.parquet",
            "data/input/reactive_power_profile.parquet",
        )
    except ValidationException as e:
        print(f"Failed to load batch profiles: {e}")
        return

    # Prepare update_data for active profile, you can also add reactive now
    update_data = prepare_update_data(active_power_profile, reactive_power_profile, profile_type="both")
    # Run the analysis
    power_flow_results(update_data, active_power_profile)


if __name__ == "__main__":
    main()


Self-made output node table: 


Unnamed: 0,id,Max_voltage,Max_voltage_node,Min_voltage,Min_voltage_node
0,2024-01-01 00:00:00,1.004847,1,1.00345,3
1,2024-01-01 01:00:00,1.012053,3,1.007998,1
2,2024-01-01 02:00:00,0.997474,1,0.984365,4
3,2024-01-01 03:00:00,1.006557,4,1.00519,1
4,2024-01-01 04:00:00,1.011007,4,1.005877,1
5,2024-01-01 05:00:00,1.020486,4,1.01057,1
6,2024-01-01 06:00:00,1.006342,1,0.998868,4
7,2024-01-01 07:00:00,1.004306,1,1.002822,3
8,2024-01-01 08:00:00,1.020402,4,1.010501,1
9,2024-01-01 09:00:00,1.015742,4,1.010084,1



Self-made line table: 


Unnamed: 0,id,Total_loss,Max_loading,Max_loading_timestamp,Min_loading,Min_loading_timestamp
0,5,63.294763,0.14983,2024-01-01 06:00:00,0.063798,2024-01-01 03:00:00
1,6,36.775143,0.111039,2024-01-01 05:00:00,0.037184,2024-01-01 00:00:00
2,7,14.872359,0.0717,2024-01-01 02:00:00,0.02038,2024-01-01 01:00:00


# **Assignment 3 - developing a power system simulation package**

Now that graph processing and power grid model have been implemented, they can be combined to form a functional user package to simulate power systems. The package has 4 functionalities:

1. Input data validity check
2. EV penetration level
3. Optimal tap position
4. N-1 calculation

In [108]:
with open(
    "data/assignment_3_input/input_network_data.json"
) as fp:
    data = fp.read()

input_data = json_deserialize(data)

with open("data/assignment_3_input/meta_data.json") as fp:
    meta = fp.read()

meta_data = json.loads(meta)
pprint.pprint(meta_data)

active_power_profile = pd.read_parquet(
    "data/assignment_3_input/active_power_profile.parquet"
)
reactive_power_profile = pd.read_parquet(
    "data/assignment_3_input/reactive_power_profile.parquet"
)
ev_active_power_profile = pd.read_parquet(
    "data/assignment_3_input/ev_active_power_profile.parquet"
)

dtype = {
    "names": [
        "id",
        "from_node",
        "to_node",
        "from_status",
        "to_status",
        "r1",
        "x1",
        "c1",
        "tan1",
        "r0",
        "x0",
        "c0",
        "tan0",
        "i_n",
    ]
}
df = pd.DataFrame(
    input_data[ComponentType.line], columns=dtype["names"]
)  # get the data for the lines of the grid as a dataframe

{'lv_busbar': 1,
 'lv_feeders': [16, 20],
 'mv_source_node': 0,
 'source': 10,
 'transformer': 11}


# **Input data validity check**

Check the following validity criteria for the input data. Raise or passthrough relevant errors.

In [109]:
class MoreThanOneTransformerOrSource(Exception):
    "Raised when there is more than one transformer or source in meta_data.json"


class InvalidLVIds(Exception):
    "Raised when LV Feeder IDs are not valid line IDs."


class NonMatchingTransformerLineNodes(Exception):
    "Raised when the lines in the LV Feeder IDs do not have the from_node the same as the to_node of the transformer."


class NonMatchingTimestamps(Exception):
    "Raised when the timestamps are not matching between the active load profile, reactive load profile, and EV charging profile."


class InvalidProfileIds(Exception):
    "Raised when the IDs in active load profile and reactive load profile are not matching."


class InvalidSymloadIds(Exception):
    "Raised when the IDs in active load profile and reactive load profile are not matching the symload IDs."


class InvalidNumberOfEVProfiles(Exception):
    "raised when the number of EV charging profile is at least the same as the number of sym_load."


class InvalidLineIds(Exception):
    "Raised when the given Line ID to disconnect is not a valid"


class NonConnnected(Exception):
    "Raised when the given Line ID is not connected at both sides in the base case"


class InvalidCriteria(Exception):
    "Raised when the criteria for optimal tap position is not valid. Use 'Voltage_deviation' or 'Total_loss'."


def check_source_transformer(meta_data):
    if (type(meta_data["source"]) is not int) or (type(meta_data["transformer"]) is not int):
        raise MoreThanOneTransformerOrSource("LV grid contains more than one source or transformer")


def check_valid_LV_ids(LV_ids, Line_ids):
    if not all(item in Line_ids for item in LV_ids):
        raise InvalidLVIds("LV feeders contain invalid ids")


def check_line_transformer_nodes(lines_from_nodes, transformer_to_node):
    if not all(element == transformer_to_node for element in lines_from_nodes):
        raise NonMatchingTransformerLineNodes(
            "The lines in the LV Feeder IDs do not have the from_node the same as the to_node of the transformer"
        )


def check_timestamps(active_timestamp, reactive_timestamp, ev_timestamp):
    if not (active_timestamp.equals(reactive_timestamp) and active_timestamp.equals(ev_timestamp)):
        raise NonMatchingTimestamps("Timestamps between the active, reactive and ev profiles do not match")


def check_profile_ids(active_ids, reactive_ids):
    if not active_ids.equals(reactive_ids):
        raise InvalidProfileIds("The active and reactive load profile IDs are not matching")


def check_symload_ids(active_ids, reactive_ids, symload_ids):
    if not (all(item in symload_ids for item in active_ids) and (all(item in symload_ids for item in reactive_ids))):
        raise InvalidSymloadIds("The active and reactive load profile IDs are not matching the sym_load IDs")


def check_number_of_ev_profiles(ev_profiles, symload_profiles):
    if len(ev_profiles) > len(symload_profiles):
        raise InvalidNumberOfEVProfiles("The number of EV charging profile is larger than the number of sym_loads")


def check_valid_line_ids(id, Line_ids):
    if id not in Line_ids:
        raise InvalidLineIds("Invalid Line ID")


def check_line_id_connected(fr, to):
    if not (fr.item() == 1 and to.item() == 1):
        raise NonConnnected("Line ID not connected at both sides in the base case")

In [110]:
def input_data_validity_check(input_data, meta_data):

    assert_valid_input_data(
        input_data=input_data, calculation_type=CalculationType.power_flow
    )  # check if input data is valid
    check_source_transformer(meta_data)  # check if LV grid has exactly one transformer, and one source.

    check_valid_LV_ids(
        meta_data["lv_feeders"], input_data[ComponentType.line]["id"]
    )  # check if All IDs in the LV Feeder IDs are valid line IDs
    check_line_transformer_nodes(
        df[df["id"].isin(meta_data["lv_feeders"])]["from_node"].tolist(),
        input_data[ComponentType.transformer]["to_node"],
    )  # check if all the lines in the LV Feeder IDs have the from_node the same as the to_node of the transformer

    transformer_tuple = list(
        zip(
            input_data[ComponentType.transformer]["from_node"].tolist(),
            input_data[ComponentType.transformer]["to_node"].tolist(),
        )
    )  # transformer also connects two nodes
    line_nodes_id_pairs = (
        list(
            zip(
                input_data[ComponentType.line]["from_node"].tolist(), input_data[ComponentType.line]["to_node"].tolist()
            )
        )
        + transformer_tuple
    )  # add nodes connected by transformer to list of lines connecting nodes
    status_list = list(input_data[ComponentType.line]["to_status"].tolist()) + list(
        input_data[ComponentType.transformer]["to_status"].tolist()
    )  # add transformer connection status to list of lines' statuses

    check_connect(
        input_data[ComponentType.node]["id"], status_list, line_nodes_id_pairs
    )  # check if the grid is fully connected in the initial state
    check_cycle(status_list, line_nodes_id_pairs)  # check if the grid has no cycles in the initial state

    check_timestamps(
        active_power_profile.index, reactive_power_profile.index, ev_active_power_profile.index
    )  # checks if timestamps are matching
    check_profile_ids(
        active_power_profile.columns, reactive_power_profile.columns
    )  # checks if number of active and reactive profiles are matching
    check_symload_ids(
        active_power_profile.columns, reactive_power_profile.columns, input_data[ComponentType.sym_load]["id"]
    )  # checks if IDs are matching
    check_number_of_ev_profiles(
        ev_active_power_profile.columns, input_data[ComponentType.sym_load]["id"]
    )  # checks if number of EV profiles does not exceed number of sym_loads
    print("Input data is valid!")

Validity check tested on big network

In [111]:
input_data_validity_check(input_data, meta_data)

Input data is valid!


# **EV penetration**

Given a (user-provided) input of electrical vehicle (EV) penetration level, i.e. the percentage of houses which has EV charged at home, randomly add EV charging profiles to the houses according to the following creteria.

In [112]:
def ev_penetration(
    input_data,
    meta_data,
    ev_profiles_df: pd.DataFrame,
    penetration_level,
    random_seed: int = None,
):
    """
    Assign EV charging profiles to sym_load houses based on penetration level usiing a dandom seed for replicability

    Returns:
        dict: sym_load node id -> EV profile column name (or None if no EV assigned)
    """
    # Set up randomness based on seed
    if random_seed is not None:
        random.seed(random_seed)

    # Extract sym_load nodes
    sym_load_nodes = [load["node"] for load in input_data[ComponentType.sym_load]]

    # Total houses
    total_houses = len(sym_load_nodes)
    # Number of feeders
    lv_feeders = meta_data["lv_feeders"]
    num_feeders = len(lv_feeders)

    # Calculate EVs per feeder after checking to avoid devide by 0 error
    if num_feeders == 0:
        return [], pd.DataFrame()  # or appropriate empty outputs
    evs_per_feeder = round(penetration_level * total_houses / num_feeders)

    line_nodes_id_pairs = [(l["from_node"], l["to_node"]) for l in input_data[ComponentType.line]]
    line_id_list = [l["id"] for l in input_data[ComponentType.line]]
    status_list = [l["to_status"] for l in input_data[ComponentType.line]]

    transformer_tuples = [(t["from_node"], t["to_node"]) for t in input_data[ComponentType.transformer]]
    transformer_ids = [t["id"] for t in input_data[ComponentType.transformer]]
    transformer_statuses = [t["to_status"] for t in input_data[ComponentType.transformer]]

    # Combine everything
    line_nodes_id_pairs += transformer_tuples
    line_id_list += transformer_ids
    status_list += transformer_statuses

    transformer_tuple = list(
        zip(
            input_data[ComponentType.transformer]["from_node"].tolist(),
            input_data[ComponentType.transformer]["to_node"].tolist(),
        )
    )  # transformer also connects two nodes
    line_nodes_id_pairs = (
        list(
            zip(
                input_data[ComponentType.line]["from_node"].tolist(), input_data[ComponentType.line]["to_node"].tolist()
            )
        )
        + transformer_tuple
    )  # add nodes connected by transformer to list of lines connecting nodes
    status_list = list(df["to_status"].tolist()) + list(
        input_data[ComponentType.transformer]["to_status"].tolist()
    )  # add transformer connection status to list of lines' statuses
    line_id_list = df["id"].tolist()
    new_id = (df["id"].iloc[-1] + 1).tolist()
    line_id_list.append(new_id)  # add another line id to mimic the transformer connection

    graph_processor = GraphProcessor(
        input_data[ComponentType.node]["id"].tolist(),
        line_id_list,
        line_nodes_id_pairs,
        status_list,
        meta_data["mv_source_node"],
    )
    # Mapping sym_load node -> assigned EV profile (None initially)
    ev_assignment = {node: None for node in sym_load_nodes}

    # Keep track of EV profiles assigned
    assigned_profiles = set()

    # EV profiles available (columns of ev_profiles_df)
    ev_profile_ids = list(ev_profiles_df.columns)

    # Assign EVs per feeder
    for feeder_line in lv_feeders:
        # Find which houses are connected to wih feeder
        downstream = graph_processor.find_downstream_vertices(feeder_line)
        # Nodes that are houses and ar downstream of feeder are saved
        feeder_houses = set(downstream).intersection(sym_load_nodes)

        # Error in case more evs per feeder than houses per feeder
        if evs_per_feeder > len(feeder_houses):
            print("Error Number of ev feeders larger than feeder houses")

        # Chose randomly which houses to apply ev profiles to
        selected_houses = random.sample(list(feeder_houses), evs_per_feeder)

        # Create a mapping for assigning random ev profiles to the randomly selected houses
        for house in selected_houses:
            # Find an EV profile not assigned yet
            available_profiles = [p for p in ev_profile_ids if p not in assigned_profiles]
            if not available_profiles:
                raise RuntimeError("Not enough EV profiles to assign uniquely.")
            # Choose random ev profile
            chosen_profile = random.choice(available_profiles)
            # Assign random ev profile
            ev_assignment[house] = chosen_profile
            # Add assigned profile to list as to not assign it twice
            assigned_profiles.add(chosen_profile)

    # Load the baseline active power profile
    active_power_profile = pd.read_parquet(
        "data/assignment_3_input/active_power_profile.parquet"
    )  # Update path if needed
    reactive_power_profile = pd.read_parquet("data/assignment_3_input/reactive_power_profile.parquet")
    # Create a copy to avoid modifying the original directly
    merged_active_profile = active_power_profile.copy()
    merged_reactive_profile = reactive_power_profile.copy()
    # Convert node number to id
    sym_df = input_data[ComponentType.sym_load]
    node_to_id = dict(zip(sym_df["node"].tolist(), sym_df["id"].tolist()))

    # Add EV profiles to corresponding house profiles
    for node_num, ev_profile_id in ev_assignment.items():
        if ev_profile_id is not None:
            node_id = node_to_id[node_num]  # convert node number to node ID
            # Sum the EV profile power to the house's active power profile by node_id
            merged_active_profile[node_id] += ev_profiles_df[ev_profile_id]

    # Run powerflow using assignment 2
    update_data = prepare_update_data(merged_active_profile, merged_reactive_profile, "both")
    output_data = calculate_power_flow(input_data, update_data)

    output_line = calculate_line_stats(output_data, merged_active_profile)
    output_node = calculate_node_stats(output_data, merged_active_profile)

    return output_line, output_node

In [113]:
penetration_level = 0.8  # 80% penetration example
output_line, output_node = ev_penetration(input_data, meta_data, ev_active_power_profile, penetration_level, random_seed=11)
print(output_line, output_node)

   id  Total_loss   Max_loading Max_loading_timestamp   Min_loading  \
0  16   26.281068  6.718844e-05   2025-01-07 10:15:00  1.255112e-05   
1  17    1.092804  1.633293e-03   2025-01-01 08:15:00  2.665765e-04   
2  18    8.861640  3.367277e-05   2025-01-07 10:45:00  5.516527e-06   
3  19    1.179838  1.520156e-03   2025-01-07 10:45:00  2.479016e-04   
4  20   27.080377  7.046508e-05   2025-01-07 10:45:00  1.171162e-05   
5  21    1.100656  1.626440e-03   2025-01-07 10:30:00  2.525038e-04   
6  22    8.425266  3.851372e-05   2025-01-07 10:45:00  5.668094e-06   
7  23    1.313918  1.491263e-03   2025-01-07 10:45:00  2.192279e-04   
8  24    2.725891  2.447440e-07   2025-01-08 17:30:00  2.433297e-07   

  Min_loading_timestamp  
0   2025-01-08 12:30:00  
1   2025-01-08 11:30:00  
2   2025-01-05 17:45:00  
3   2025-01-05 17:45:00  
4   2025-01-02 14:30:00  
5   2025-01-05 15:00:00  
6   2025-01-02 12:30:00  
7   2025-01-02 12:30:00  
8   2025-01-07 10:45:00                        id  Max_

In [114]:
penetration_level = 0.8  # 80% penetration example
output_line, output_node = ev_penetration(input_data, meta_data, ev_active_power_profile, penetration_level, random_seed=21)
print(output_line, output_node)

   id  Total_loss   Max_loading Max_loading_timestamp   Min_loading  \
0  16   26.281069  6.718844e-05   2025-01-07 10:15:00  1.255112e-05   
1  17    1.092420  1.633293e-03   2025-01-01 08:15:00  2.665765e-04   
2  18    8.863623  3.367277e-05   2025-01-07 10:45:00  5.516527e-06   
3  19    1.180176  1.520156e-03   2025-01-07 10:45:00  2.479016e-04   
4  20   27.080377  7.046508e-05   2025-01-07 10:45:00  1.171162e-05   
5  21    1.100656  1.626440e-03   2025-01-07 10:30:00  2.525038e-04   
6  22    8.425266  3.851372e-05   2025-01-07 10:45:00  5.668094e-06   
7  23    1.313918  1.491263e-03   2025-01-07 10:45:00  2.192279e-04   
8  24    2.725891  2.447440e-07   2025-01-08 17:30:00  2.433297e-07   

  Min_loading_timestamp  
0   2025-01-08 12:30:00  
1   2025-01-08 11:30:00  
2   2025-01-05 17:45:00  
3   2025-01-05 17:45:00  
4   2025-01-02 14:30:00  
5   2025-01-05 15:00:00  
6   2025-01-02 12:30:00  
7   2025-01-02 12:30:00  
8   2025-01-07 10:45:00                        id  Max_

# **Optimal tap position**

In this functionality, the user would like to optimize the tap position of the transformer in the LV grid. The user can specifiy which optimization criteria is chosen to determine the optimal tap position.

In [115]:

def optimal_tap_position(criteria):
    """calculates the optimal tap position for the transformer based on the specified criteria"""
    if criteria not in ["Voltage_deviation", "Total_loss"]:
        raise InvalidCriteria("Invalid criteria specified. Use 'Voltage_deviation' or 'Total_loss'.")

    transformer_data = input_data[ComponentType.transformer]
    tap_min, tap_max = int(transformer_data["tap_min"][0]), int(transformer_data["tap_max"][0])
    tap_steps = list(range(tap_max, tap_min + 1))

    id = transformer_data["id"]
    total_loss = []
    total_deviation = []
    for tap in tap_steps:
        update_data = prepare_update_data(active_power_profile, reactive_power_profile, tap, id, "both")
        output_data = calculate_power_flow(input_data, update_data)

        output_line = calculate_line_stats(output_data, active_power_profile)
        output_node = calculate_node_stats(output_data, active_power_profile)
        # a2.display_results(output_node, output_line)

        total_loss.append(output_line["Total_loss"].sum())

        deviation_max = abs(output_node["Max_voltage"] - 1).sum()
        deviation_min = abs(output_node["Min_voltage"] - 1).sum()
        total_deviation.append(
            (deviation_max + deviation_min) / (len(output_node) * 2)
        )  # Divide total deviation by number of nodes (max and min) and number of timestamps

    if criteria == "Voltage_deviation":
        optimal_tap = tap_steps[np.argmin(total_deviation)]
        display(f"Optimal tap position: {optimal_tap}, with total deviation: {min(total_deviation)}")

    elif criteria == "Total_loss":
        optimal_tap = tap_steps[np.argmin(total_loss)]
        display(f"Optimal tap position: {optimal_tap}, with total loss: {min(total_loss)}")

In [116]:
optimal_tap_position("Voltage_deviation")

'Optimal tap position: 1, with total deviation: 0.03754316068350102'

# **N-1 calculation**

In this functionality, the user would like to know alternative grid topology when a given line is out of service. The user will provide the Line ID which is going to be out of service.

In [117]:
def check_valid_line_ids(id, Line_ids):
    if id not in Line_ids:
        raise InvalidLineIds("Invalid Line ID")


def check_line_id_connected(fr, to):
    if not (fr.item() == 1 and to.item() == 1):
        raise NonConnnected("Line ID not connected at both sides in the base case")


def find_alternative_lines(
    vertex_ids, edge_ids, edge_vertex_id_pairs, edge_enabled, source_vertex_id, id_to_disconnect
):
    test = GraphProcessor(
        vertex_ids, edge_ids, edge_vertex_id_pairs, edge_enabled, source_vertex_id
    )  # create a graph from the provided data about nodes and lines
    return test.find_alternative_edges(
        id_to_disconnect
    )  # find the alternative edges to make the graph fully connected, when the line is disconnected


def power_flow_calc(active_power_profile, alt_lines_list, line_id_list):

    load_profile_active = initialize_array(DatasetType.update, ComponentType.sym_load, active_power_profile.shape)
    load_profile_active["id"] = active_power_profile.columns.to_numpy()
    load_profile_active["p_specified"] = active_power_profile.to_numpy()
    load_profile_active["q_specified"] = 0.0
    update_data = {ComponentType.sym_load: load_profile_active}

    input_data[ComponentType.line]["from_status"][
        line_id_list.index(id_to_disconnect)
    ] = 0  # disconnect the line that the user wants to disconnect
    input_data[ComponentType.line]["to_status"][
        line_id_list.index(id_to_disconnect)
    ] = 0  # disconnect the line that the user wants to disconnect

    max_loading_alt = np.zeros(len(alt_lines_list))  # initialize max_loading column values
    max_line_alt = np.zeros(len(alt_lines_list))  # initialize max_line_alt column values
    max_loading_timestamp_alt = np.zeros(
        len(alt_lines_list), dtype=object
    )  # initialize max_loading_timestamp column values
    for k in range(len(alt_lines_list)):
        input_data[ComponentType.line]["from_status"][k] = 1
        input_data[ComponentType.line]["to_status"][k] = 1  # connect the kth alternative line
        model = PowerGridModel(input_data=input_data)
        result = model.calculate_power_flow(
            update_data=update_data, calculation_method=CalculationMethod.newton_raphson
        )  # perform the power flow analysis when the kth line is connected
        # print(alt_lines_list[k])
        # print(result)
        ids = np.unique(result[ComponentType.line]["id"])
        max_loading = np.zeros(len(ids))
        max_loading_timestamp = np.zeros(len(ids), dtype=object)
        temp_max = -99999999
        for i in range(len(ids)):
            max_loading[i] = result[ComponentType.line]["loading"][:, i].max()
            for j in range(len(active_power_profile.index)):
                if max_loading[i] == result[ComponentType.line]["loading"][j, i]:
                    max_loading_timestamp[i] = active_power_profile.index[j]
                    if max_loading[i] > temp_max:
                        temp_max = max_loading[i]
                        max_loading_alt[k] = max_loading[i]
                        max_line_alt[k] = result[ComponentType.line]["id"][j, i]
                        max_loading_timestamp_alt[k] = active_power_profile.index[j]
                        # print(active_power_profile.index[j])
        input_data[ComponentType.line]["from_status"][k] = 0
        input_data[ComponentType.line]["to_status"][k] = 0  # disconnect kth line

    input_data[ComponentType.line]["from_status"][
        line_id_list.index(id_to_disconnect)
    ] = 1  # reconnect the line that the user wants to disconnect
    input_data[ComponentType.line]["to_status"][
        line_id_list.index(id_to_disconnect)
    ] = 1  # reconnect the line that the user wants to disconnect

    output_data = pd.DataFrame()  # generate specified table from assignment 3
    output_data["Alternative line ID"] = alt_lines_list
    output_data["Max_loading"] = max_loading_alt
    output_data["Max_line_id"] = max_line_alt
    output_data["Max_timestamp"] = max_loading_timestamp_alt
    display(output_data)
    return output_data

In [118]:
def N_minus_one_calculation(id_to_disconnect):

    check_valid_line_ids(id_to_disconnect, input_data[ComponentType.line]["id"])  # check if ID is valid
    check_line_id_connected(
        df[df["id"] == id_to_disconnect]["from_status"], df[df["id"] == id_to_disconnect]["to_status"]
    )  # check if line with selected ID is connected

    transformer_tuple = list(
        zip(
            input_data[ComponentType.transformer]["from_node"].tolist(),
            input_data[ComponentType.transformer]["to_node"].tolist(),
        )
    )  # transformer also connects two nodes
    line_nodes_id_pairs = (
        list(
            zip(
                input_data[ComponentType.line]["from_node"].tolist(), input_data[ComponentType.line]["to_node"].tolist()
            )
        )
        + transformer_tuple
    )  # add nodes connected by transformer to list of lines connecting nodes
    status_list = list(df["to_status"].tolist()) + list(
        input_data[ComponentType.transformer]["to_status"].tolist()
    )  # add transformer connection status to list of lines' statuses
    line_id_list = df["id"].tolist()
    new_id = (df["id"].iloc[-1] + 1).tolist()
    line_id_list.append(new_id)  # add another line id to mimic the transformer connection
    alt_lines_list = find_alternative_lines(
        input_data[ComponentType.node]["id"].tolist(),
        line_id_list,
        line_nodes_id_pairs,
        status_list,
        meta_data["mv_source_node"],
        id_to_disconnect,
    )

    print(
        f"To make the grid fully connected, the following lines need to be connected: {alt_lines_list}"
    )  # find alternative currently disconnected lines to make the grid fully connected

    power_flow_calc(active_power_profile, alt_lines_list, line_id_list)

In [119]:
id_to_disconnect = 22  # returns [2010] list and a table of 1 row
N_minus_one_calculation(id_to_disconnect)

To make the grid fully connected, the following lines need to be connected: [24]


Unnamed: 0,Alternative line ID,Max_loading,Max_line_id,Max_timestamp
0,24,0.001631,17.0,2025-01-01 08:15:00
