# Setup

In [None]:
from simulations import Simulation, StructuralModel, ModelInputParams, IVOptions, ModelResults, ModelMetrics
from estimation import ar_model_fit, ar_model_analysis
from plot_tools import *

import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from matplotlib.lines import Line2D
from matplotlib.legend_handler import HandlerPatch
from matplotlib.patches import Rectangle, FancyArrowPatch, Patch, FancyArrow
import mpl_toolkits.axisartist as axisartist

import networkx as nx

import pandas as pd
import numpy as np
import csv
import random
from typing import Literal

from statsmodels.stats import proportion
import statsmodels.stats.weightstats as ws


plt.style.use("seaborn-v0_8-whitegrid")

In [None]:
FOLDER:str = "results/"

def write_results_to_file(file_path: str, results_df: pd.DataFrame):
    results_df.to_csv(FOLDER+file_path+'.csv')

def read_results_from_file(file_path: str, header_lines:int=2) -> pd.DataFrame:
    return pd.read_csv(FOLDER+file_path, header=[i for i in range(header_lines)], index_col=[0])

# DAGs

Common definitions

In [None]:
NODE_POSITIONS = {
    "$W_{t-3}$": (-3, 5), "$W_{t-2}$": (0, 5), "$W_{t-1}$": (3, 5), "$W_t$": (6, 5), "$W_{t+1}$": (9, 5),
    "$P_{t-3}$": (-3, 3), "$P_{t-2}$": (0, 3), "$P_{t-1}$": (3, 3), "$P_t$": (6, 3), "$P_{t+1}$": (9, 3),
    "$D_{t-3}$": (-3, 0), "$D_{t-2}$": (0, 0), "$D_{t-1}$": (3, 0), "$D_t$": (6, 0), "$D_{t+1}$": (9, 0),
    }

ARROW_SIZE = 15

DAG_LINE_WIDTH = 1.5
DAG_XLIM = (-1.5, 7.5)

def get_base_dag():
    G = nx.DiGraph()

    G.add_node("$W_{t+1}$")
    G.add_node("$W_t$")
    G.add_node("$W_{t-1}$")
    G.add_node("$W_{t-2}$")

    G.add_node("$P_{t+1}$")
    G.add_node("$P_t$")
    G.add_node("$P_{t-1}$")
    G.add_node("$P_{t-2}$")

    G.add_node("$D_{t+1}$")
    G.add_node("$D_t$")
    G.add_node("$D_{t-1}$")
    G.add_node("$D_{t-2}$")

    return G

def add_wind_edges(G:nx.DiGraph, weight:float, color_t_1:str, ar:bool=True):
    if ar:
        G.add_edge("$W_{t-3}$", "$W_{t-2}$", color="k", weight=weight, style="dashed")
        G.add_edge("$W_{t-2}$", "$W_{t-1}$", color="k", weight=weight)
        G.add_edge("$W_{t-1}$", "$W_t$", color=color_t_1, weight=weight)
        G.add_edge("$W_t$", "$W_{t+1}$", color="k", weight=weight, style="dashed")

    G.add_edge("$W_{t-3}$", "$P_{t-3}$", color="k", weight=weight)
    G.add_edge("$W_{t-2}$", "$P_{t-2}$", color="k", weight=weight)
    G.add_edge("$W_{t-1}$", "$P_{t-1}$", color=color_t_1, weight=weight)
    G.add_edge("$W_t$", "$P_t$", color="k", weight=weight)
    G.add_edge("$W_{t+1}$", "$P_{t+1}$", color="k", weight=weight)

def add_elasticity_edges(G:nx.DiGraph, weight:float, color_t_1:str):
    G.add_edge("$P_{t-3}$", "$D_{t-3}$", color="k", weight=weight)
    G.add_edge("$P_{t-2}$", "$D_{t-2}$", color="k", weight=weight)
    G.add_edge("$P_{t-1}$", "$D_{t-1}$", color=color_t_1, weight=weight)
    G.add_edge("$P_t$", "$D_t$", color="red", weight=weight)
    G.add_edge("$P_{t+1}$", "$D_{t+1}$", color="k", weight=weight)


In [None]:
def add_equilibrium_edges_to_DAG(axes:Axes, node_positions:dict):
    new_edges = [
        ("$D_t$", "$P_t$", "k"),
        ("$D_{t-1}$", "$P_{t-1}$", "k"),
        ("$D_{t-2}$", "$P_{t-2}$", "k"),
    ]

    node_size = 0.85
    for edge in new_edges:
        source, target, color = edge
        rad = -0.4
        # Calculate the offset based on node size
        offset = 0.5 * node_size  # Adjust this factor as needed
        # Adjust source and target coordinates based on node size
        source_x, source_y = node_positions[source]
        target_x, target_y = node_positions[target]
        dx = target_x - source_x
        dy = target_y - source_y
        length = (dx ** 2 + dy ** 2) ** 0.5
        if length != 0:
            dx /= length
            dy /= length
        source_x += offset / 10 
        source_y += dy * offset 
        target_x += offset / 10 
        target_y += - dy * offset 

        # Draw the arrow with adjusted coordinates
        arrowprops=dict(arrowstyle="<|-|>",
                        mutation_scale=ARROW_SIZE,
                        color=color,
                        connectionstyle=f"arc3,rad={rad}",
                        linestyle='-',
                        lw=DAG_LINE_WIDTH,
                        alpha=1)
        axes.annotate("",
                    xy=(source_x, source_y),
                    xytext=(target_x, target_y),
                    arrowprops=arrowprops
                )

Model I: inertial demand

In [None]:
def plot_dag_model_I_on_axes(axes:Axes, demand_ar:bool=True, elasticity:bool=True, wind_ar:bool=True, color_open:str="dodgerblue"):

    # Create a Directed Graph (DAG)
    G = get_base_dag()

    # Add nodes
    add_wind_edges(G, DAG_LINE_WIDTH, color_open, wind_ar)

    if elasticity:
        add_elasticity_edges(G, DAG_LINE_WIDTH, color_open)
    

    weight=DAG_LINE_WIDTH
    
    if demand_ar:

        G.add_edge("$D_{t-3}$", "$D_{t-2}$", color="k", weight=weight, style="dashed")
        G.add_edge("$D_{t-2}$", "$D_{t-1}$",color="k", weight=weight)
        G.add_edge("$D_{t-1}$", "$D_t$", color=color_open, weight=weight)
        G.add_edge("$D_t$", "$D_{t+1}$", color="k", weight=weight, style="dashed")

        G.add_edge("$D_{t-3}$", "$P_{t-2}$", color="k", weight=weight, style="dashed")
        G.add_edge("$D_{t-2}$", "$P_{t-1}$", color="k", weight=weight)
        G.add_edge("$D_{t-1}$", "$P_t$", color="k", weight=weight)
        G.add_edge("$D_t$", "$P_{t+1}$", color="k", weight=weight, style="dashed")


    edges = G.edges()
    colors = [G[u][v]['color'] for u,v in edges]
    weights = [G[u][v]['weight'] for u,v in edges]
    styles = [G[u][v].get('style', '-') for u,v in edges]
    connection_styles = [G[u][v].get('connectionstyle', "arc3") for u, v in edges]

    # Draw the graph with specific node positions
    nx.draw(G, pos=NODE_POSITIONS, node_color='white', with_labels=True, labels={node: node for node in G.nodes()}, 
            font_color='black', node_size=800, font_size=10, edge_color=colors, width=weights, ax=axes, style=styles, 
            connectionstyle=connection_styles, arrowsize=ARROW_SIZE, edgecolors='black')
    
    add_equilibrium_edges_to_DAG(axes, NODE_POSITIONS)

    axes.set_aspect('equal', adjustable='box')


    axes.set_xlim(DAG_XLIM)
    axes.set_ylim(-1.75, 6.25)
    axes.set_xlabel("Demand response to prices propagates through time", fontsize="large")
    axes.set_title("Model I: Inertial Demand", fontsize="large")

In [None]:
def plot_model_I():
    fig, axs = plt.subplots(1, 1, sharex=False,  sharey=False, figsize=(REGULAR_WIDTH, HEIGHT))
    plot_dag_model_I_on_axes(axs)
    fig.show()

In [None]:
plot_model_I()

Model I (Figure 2)

In [None]:
def figure_3():
    fig, axs = plt.subplots(1, 1, sharex=False,  sharey=False, figsize=(REGULAR_WIDTH, HEIGHT))
    plot_dag_model_I_on_axes(axs)

    fig.tight_layout(pad=2)



    fig.show()

    save_figure(fig=fig, figname="figure_2_main_model")

figure_3()

Variants (Figure 4)

In [None]:
def figure_4():
    fig, axs = plt.subplots(1, 3, sharex=False,  sharey=False, figsize=(EXTRA_WIDTH, HEIGHT))
    plot_dag_model_I_on_axes(axs[0], wind_ar=False, color_open="k")
    plot_dag_model_I_on_axes(axs[1], demand_ar=False, color_open="k")
    plot_dag_model_I_on_axes(axs[2], elasticity=False, color_open="k")
    axs[0].set_title("a) I.i.d. instrument ($\\beta^W = 0$)")
    axs[1].set_title("b) No dir. str. autocor ($\\beta^{D1} = 0$)")
    axs[2].set_title("c) No price response ($\\beta^P = 0$)")
    fig.show()

    save_figure(fig=fig, figname="figure_4_variant_DAGs")


figure_4()

Model II: heterogeneous demand

In [None]:
def add_marginalized_demand(axes:Axes, node_positions:dict, xlim:tuple[float, float]):

    new_edges = [
        ("$D_{t-3}$", "$D_{t-2}$", "k", "--"),
        ("$D_{t-2}$", "$D_{t-1}$", "k", "-"),
        ("$D_{t-1}$", "$D_t$", "darkorange", "-"),
        ("$D_t$", "$D_{t+1}$", "k", "--"),
    ]

    node_size = 0.85
    for i, edge in enumerate(new_edges):
        source, target, color, style = edge
        rad = -0.4
        # Calculate the offset based on node size
        offset = 0.5 * node_size  # Adjust this factor as needed
        # Adjust source and target coordinates based on node size
        source_x, source_y = node_positions[source]
        target_x, target_y = node_positions[target]

        source_x += offset / 10
        source_y += - offset
        target_x += - offset / 10
        target_y += - offset


        arrowstyle = "<|-|>"
        
        # Draw the arrow with adjusted coordinates
        arrowprops=dict(arrowstyle=arrowstyle,
                        mutation_scale=ARROW_SIZE,
                        color=color,
                        connectionstyle=f"arc3,rad={rad}" if i != 0 else f"arc3,rad={-rad}",
                        linestyle=style,
                        lw=DAG_LINE_WIDTH,
                        alpha=1)
        if i == 0:
            ann = axes.annotate("",
                    xytext=(source_x, source_y),
                    xy=(target_x, target_y),
                    arrowprops=arrowprops,
                    annotation_clip=False,
                    clip_on=True,
                )
        else:
            ann = axes.annotate("",
                        xy=(source_x, source_y),
                        xytext=(target_x, target_y),
                        arrowprops=arrowprops,
                        annotation_clip=False,
                        clip_on=True,
                    )
            
        if ann.arrow_patch is not None:
            ann.arrow_patch.set_clip_box(axes.bbox)

In [None]:
def plot_dag_model_II_on_axes(axes:Axes):
    # Create a Directed Graph (DAG)
    G = get_base_dag()
    
    add_wind_edges(G, DAG_LINE_WIDTH, "darkorange")
    add_elasticity_edges(G, DAG_LINE_WIDTH, "darkorange")

    weight=DAG_LINE_WIDTH

    # Add edges
    edges = G.edges()
    colors = [G[u][v]['color'] for u,v in edges]
    weights = [G[u][v]['weight'] for u,v in edges]
    styles = [G[u][v].get('style', '-') for u,v in edges]
    connection_styles = [G[u][v].get('connectionstyle', "arc3") for u, v in edges]
    print(connection_styles)

    # Draw the graph with specific node positions
    nx.draw(G, pos=NODE_POSITIONS, node_color='white', with_labels=True, labels={node: node for node in G.nodes()}, 
            font_color='black', node_size=800, font_size=10, edge_color=colors, width=weights, ax=axes, style=styles, 
            connectionstyle=connection_styles, arrowsize=ARROW_SIZE, edgecolors='black')

    add_equilibrium_edges_to_DAG(axes, NODE_POSITIONS)
    add_marginalized_demand(axes, NODE_POSITIONS, (-0.3, 2.3))


    axes.set_aspect('equal', adjustable='box')
        
    axes.set_xlim(DAG_XLIM)
    axes.set_ylim(-1.75, 6.25)
    axes.set_title("Model II: Partially responsive demand", fontsize="large")

In [None]:
def plot_model_II():
    fig, axs = plt.subplots(1, 1, sharex=False,  sharey=False, figsize=(REGULAR_WIDTH, HEIGHT))
    plot_dag_model_II_on_axes(axs)
    fig.show()

In [None]:
plot_model_II()

Model III: cross-price elasticity

In [None]:
def add_cross_price_elasticity_edges(G:nx.DiGraph, weight:float, color_t_1:str):
    G.add_edge("$P_{t-3}$", "$D_{t-2}$", color="k", weight=weight, style="dashed")
    G.add_edge("$P_{t-2}$", "$D_{t-1}$", color="k", weight=weight)
    G.add_edge("$P_{t-1}$", "$D_t$", color=color_t_1, weight=weight)
    G.add_edge("$P_t$", "$D_{t+1}$", color="k", weight=weight, style="dashed")

    G.add_edge("$P_{t-3}$", "$P_{t-2}$", color="k", weight=weight, style="dashed")
    G.add_edge("$P_{t-2}$", "$P_{t-1}$", color="k", weight=weight)
    G.add_edge("$P_{t-1}$", "$P_t$", color="k", weight=weight)
    G.add_edge("$P_t$", "$P_{t+1}$", color="k", weight=weight, style="dashed")

In [None]:
def plot_dag_model_III_on_axes(axes:Axes):
    # Create a Directed Graph (DAG)
    G = get_base_dag()
    
    add_wind_edges(G, DAG_LINE_WIDTH, "dodgerblue")
    add_elasticity_edges(G, DAG_LINE_WIDTH, "k")
    add_cross_price_elasticity_edges(G, DAG_LINE_WIDTH, "dodgerblue")

    weight=DAG_LINE_WIDTH

    # Add edges

    edges = G.edges()
    colors = [G[u][v]['color'] for u,v in edges]
    weights = [G[u][v]['weight'] for u,v in edges]
    styles = [G[u][v].get('style', '-') for u,v in edges]
    connection_styles = [G[u][v].get('connectionstyle', "arc3") for u, v in edges]
    print(connection_styles)

    
    # Draw the graph with specific node positions
    nx.draw(G, pos=NODE_POSITIONS, node_color='white', with_labels=True, labels={node: node for node in G.nodes()}, 
            font_color='black', node_size=800, font_size=10, edge_color=colors, width=weights, ax=axes, style=styles, 
            connectionstyle=connection_styles, arrowsize=ARROW_SIZE, edgecolors='black')

    add_equilibrium_edges_to_DAG(axes, NODE_POSITIONS)

    axes.set_aspect('equal', adjustable='box')
        

    axes.set_xlim(DAG_XLIM)
    axes.set_ylim(-1.75, 6.25)
    axes.set_title("Model III: Demand shifting", fontsize="large")

In [None]:
def plot_model_III():
    fig, axs = plt.subplots(1, 1, sharex=False,  sharey=False, figsize=(REGULAR_WIDTH, HEIGHT))
    plot_dag_model_III_on_axes(axs)
    fig.show()

In [None]:
plot_model_III()

Model II and III (Figure 3)

In [None]:
def figure_3():
    fig, axs = plt.subplots(1, 2, sharex=False,  sharey=False, figsize=(WIDE_WIDTH, HEIGHT))
    plot_dag_model_II_on_axes(axs[0])
    plot_dag_model_III_on_axes(axs[1])

    fig.tight_layout(pad=2)

    fig.show()

    save_figure(fig=fig, figname="figure_3_alternative_models")

figure_3()

# Punchline (Figure 1)

Data

In [None]:
def run_simulation_by_demand_arg(
    sample: int = 8760,
    runs: int = 10,
    linspace: int = 21,
    model:StructuralModel = StructuralModel.Model_I,
    estimators:list[ModelInputParams]=MODELS,
    wind_manual_ar_sum: float | None = None,
    ) -> pd.DataFrame:
    
    # Create the X axis
    
    xx = np.linspace(-200, 200, linspace)
    dd = np.linspace(-0.249, 0.999, linspace)

    # Create the empty lists to store the analysis output

    output_lists:dict[str, dict[str, list]] = {}
    for estimator in estimators:
        output_lists[estimator.name] = {"Mean":[],
                                        "Upper":[],
                                        "Lower":[]}


    for i, x in enumerate(xx):
        d = dd[i]

        simulation = Simulation(sample=sample)

        simulation.run_simulations_by_model(
            runs=runs,
            estimators=estimators,
            model=model,
            demand_arg=x if model == StructuralModel.Model_III else d,
            simulation_count=i,
            wind_manual_ar_sum=wind_manual_ar_sum
        )

        for estimator in estimators:

            try:

                output_lists[estimator.name]["Mean"].append(simulation.models[estimator.name].mean)
                output_lists[estimator.name]["Upper"].append(np.percentile(simulation.models[estimator.name].estimates, 95))
                output_lists[estimator.name]["Lower"].append(np.percentile(simulation.models[estimator.name].estimates, 5))

            except KeyError as e:
                
                output_lists[estimator.name]["Mean"].append(simulation.models[estimator.name+"_P0"].mean)
                output_lists[estimator.name]["Upper"].append(np.percentile(simulation.models[estimator.name+"_P0"].estimates, 95))
                output_lists[estimator.name]["Lower"].append(np.percentile(simulation.models[estimator.name+"_P0"].estimates, 5))

    # Organize results into a DataFrame

    sub_index = ["Mean", "Upper", "Lower"]
    index = pd.MultiIndex.from_product([[estimator.name for estimator in estimators], sub_index])
    
    columns = xx.tolist() if model in [StructuralModel.Model_III, StructuralModel.Model_IV] else dd.tolist()

    print(model, columns)

    data = []
    for estimator, dictionary in output_lists.items():
        data.extend([dictionary["Mean"], dictionary["Upper"], dictionary["Lower"]])

    results_df = pd.DataFrame(data, index=index, columns=columns).T

    return results_df

In [None]:
def get_results_punchline(file_path:str, estimators:list[ModelInputParams], sample:int, runs:int, linspace:int, model:StructuralModel) -> pd.DataFrame:
    try:
        df = read_results_from_file(file_path+'.csv')

        missing_estimators = []
        for estimator in estimators:
            if not estimator.name in df.columns.get_level_values(0):
                missing_estimators.append(estimator)
                print(estimator.name+" is missing for "+file_path)

        if missing_estimators: 
            df_b = run_simulation_by_demand_arg(sample=sample, runs=runs, linspace=linspace, estimators=missing_estimators, model=model)
            df = df.join(df_b)
            print(df.columns)

            write_results_to_file(file_path, df)
            
    except FileNotFoundError:
        df = run_simulation_by_demand_arg(sample=sample, runs=runs, linspace=linspace, estimators=estimators, model=model)
        write_results_to_file(file_path, df)

    return df

Panel

In [None]:
def plot_punchline_panel(axes:Axes, df:pd.DataFrame, color_estimator_dict:dict[str, ModelInputParams], xlim:tuple[float, float], xlabel:str, model_name:str, ylabel:str, add_label:bool):
    x_axis = df.index

    llss = ['-', '--', ':', '-.']

    model_labels = ["Naive IV: ", "Conditional IV: ", "Nuisance IV: "]


    for i, (color, estimator) in enumerate(color_estimator_dict.items()):

        axes.plot(x_axis, df[estimator.name]['Mean'], color=color, label=model_labels[i]+estimator.latex_name if add_label else "", ls=llss[i])
        axes.fill_between(x_axis, df[estimator.name]['Upper'], df[estimator.name]['Lower'], color=color, alpha=0.3)

    axes.axhline(0, xlim[0], max(1, xlim[1]), color='black', linewidth=0.8)
    axes.axvline(0, -500, 100, color='black', linewidth=0.8)
    axes.set_xlim(xlim)

    axes.set_xlabel(xlabel, size="large")
    axes.set_ylabel(ylabel=ylabel, size="large")
    # axes.set_title(model_name, size="large")

Legend

In [None]:
def punchline_plot_legend(estimators:list[ModelInputParams]):
    # Plot lines
    dodgerblue_line = Line2D([0, 1], [0, 1], color='dodgerblue', label=estimators[0].latex_name, ls="-")
    darkorange_line = Line2D([0, 1], [0, 0.5], color='darkorange', label=estimators[1].latex_name, ls="--")
    darkgreen_line = Line2D([0, 1], [0, 0.2], color='darkgreen', label=estimators[2].latex_name, ls=":")

    # Create arrows
    red_arrow = FancyArrowPatch((0.1, 0.9), (0.3, 0.7), color='red', arrowstyle='-|>', linewidth=2, mutation_scale=15, label='effect of interest ($\\beta^P=-100$)')
    blue_arrow = FancyArrowPatch((0.2, 0.6), (0.4, 0.4), color='dodgerblue', arrowstyle='-|>', linewidth=2, mutation_scale=15, label='unblocked path given $\\emptyset$')
    orange_arrow = FancyArrowPatch((0.3, 0.3), (0.5, 0.1), color='darkorange', arrowstyle='-|>', linewidth=2, mutation_scale=15, label='blocked path given $\\emptyset$')

    # Create custom legend
    legend_elements = [
        blue_arrow,
        orange_arrow,
        red_arrow,
        dodgerblue_line,
        darkorange_line,
        darkgreen_line
    ]

    return legend_elements

Figure

In [None]:
def punchline_plot(runs=10, linspace=21, sample=8760, color_estimator_dict:dict[str, ModelInputParams]={
    "dodgerblue":ModelInputParams(IVOptions.REGULAR, 0),
    "darkorange":ModelInputParams(IVOptions.CONDITIONAL_WIND, 26),
    "darkred":ModelInputParams(IVOptions.CONDITIONAL_DEMAND, 1),
    "darkgreen":ModelInputParams(IVOptions.NUISANCE_ORDER, 1),
    "pink":ModelInputParams(IVOptions.TRUNCATED_NUISANCE_ORDER, order=26, order_d=1),
    "brown":ModelInputParams(IVOptions.CLEAN_2dim_ORDER, order=26, order_price=1),
}, file:str="", 
models:list[StructuralModel] = [StructuralModel.Model_I, 
                                StructuralModel.Model_II,
                                ]
):

    name_dict = {
        StructuralModel.Model_I:"simple_model",
        StructuralModel.Model_II:"autocorrelated_error",
        # StructuralModel.Model_III:"cross_price",
        # StructuralModel.Model_IV:"complex_model",
    }

    demand_equation = {
        StructuralModel.Model_I:'$D_{t} = D_0 + \\beta^{P}P_{t} + \\beta^{D1}D_{t-1} + U_{t}^{d}$',
        StructuralModel.Model_II:'$D_t = D_0 + \\beta^{P}P_t + \\beta^{B1}B_{t-1} + U_{t}^{d}$',
        # StructuralModel.Model_III:'$D_t = D_0 + \\beta_{P0}P_t + \\beta_{P1}P_{t-1} + \\epsilon_t^d$',
        # StructuralModel.Model_IV:"complex_model",
    }

    demand_arg = {
        StructuralModel.Model_I:'str. autocorr. of demand',
        StructuralModel.Model_II:'str. autocorr. of demand',
        # StructuralModel.Model_III:'Cross-price elasticity ($\\beta_{P1}$)',
        # StructuralModel.Model_IV:"complex_model",
    }

    fig, axs = plt.subplots(2, len(models), sharex=False,  sharey=False, figsize=(WIDE_WIDTH, HEIGHT*2))

    for i, model in enumerate(models):

        name = f"punchline_output_{name_dict[model]}_{sample//8760}_years_{runs}_runs_{linspace}_linspace"+file
        df = get_results_punchline(name, list(color_estimator_dict.values()), sample, runs, linspace, model=model)


        plot_punchline_panel(axs[i][1], df, color_estimator_dict, 
                             xlim=(-0.25, 0.9) if model in [StructuralModel.Model_I, StructuralModel.Model_II] else (-200, 200), 
                             xlabel=demand_arg[model], 
                             model_name=demand_equation[model], 
                             ylabel='estimated demand response $\\hat{\\beta}^{P}$',
                             add_label=True if i == 0 else False)    
        axs[i][1].set_ylim(-300, 50)


    plot_dag_model_I_on_axes(axes=axs[0][0])
    plot_dag_model_II_on_axes(axes=axs[1][0])


    axs[0][0].set_title(axs[0][0].get_title(), fontsize="large")
    axs[0][1].set_title("Estimates from data generated using Model I", size="large")
    axs[1][0].set_title(axs[1][0].get_title(), fontsize="large")
    axs[1][1].set_title("Estimates from data generated using Model II", size="large")

    fig.tight_layout()

    handles = punchline_plot_legend(list(color_estimator_dict.values()))


    fig.legend(handles=handles, loc="upper center", frameon=False, bbox_to_anchor=(0.5, 0),
               fancybox=True, fontsize='large', ncol=2, 
               handler_map={FancyArrowPatch : HandlerPatch(patch_func=make_legend_arrow)})
    
    fig.show()

    fig_name = f"punchline_plot_{sample//8760}_years_{runs}_runs_{linspace}_linspace"+file

    save_figure(fig, fig_name, VERSION)

In [None]:
punchline_plot(runs=20, linspace=21, sample=8760*5, color_estimator_dict={
    "dodgerblue":ModelInputParams(IVOptions.REGULAR, order=26),
    "darkorange":ModelInputParams(IVOptions.CONDITIONAL_DEMAND, order=1, order_w=0),
    "darkgreen":ModelInputParams(IVOptions.IV_2dim_ORDER, order=26, order_d=1, order_price=1),
})

# Heatmap

In [None]:
def run_simulations_for_heatmap(
    sample: int = 8760,
    runs: int = 10,
    linspace: int = 21,
    estimator: ModelInputParams = ModelInputParams(IVOptions.REGULAR, order=0),
    model:StructuralModel = StructuralModel.Model_I,
    wind_lags: int = 1,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    
    # Create the X and Y axis
    
    alpha_d = np.linspace(0, 0.999, linspace)
    alpha_w = np.linspace(0, 0.999, linspace)


    demand_grid, wind_grid = np.meshgrid(alpha_d, alpha_w)
    error_values = np.zeros_like(demand_grid)

    # For each point on the grid, run {runs} simulations with a demand type that corresponds to the specified model

    for i, d in enumerate(alpha_d):
        for j, w in enumerate(alpha_w):

            simulation = Simulation(sample)

            simulation.run_simulations_by_model(runs=runs, estimators=[estimator], model=model, 
                                                demand_arg=d, wind_manual_ar_sum=w, wind_lags=wind_lags,
                                                simulation_count=i*linspace*runs + j*runs)
                
            error_values[i, j] = simulation.models[estimator.name].average_percentage_error
            

    return alpha_d, alpha_w, error_values

In [None]:
def export_error_grid_to_csv(alpha_values, beta_values, error_values, filename):
    """
    Export Error values to a CSV file.
    
    Parameters:
        alpha_values (numpy.ndarray): Array of alpha values.
        beta_values (numpy.ndarray): Array of beta values.
        e_values (numpy.ndarray): Array of E-values.
        filename (str): Name of the CSV file to export.
    """
    with open(FOLDER+filename+".csv", 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        # Write header row
        writer.writerow(['alpha_demand', 'alpha_wind', 'error'])
        # Write data rows
        for i in range(len(alpha_values)):
            for j in range(len(beta_values)):
                writer.writerow([alpha_values[i], beta_values[j], error_values[i, j]])

def retrieve_error_grid_from_csv(filename):
    """
    Retrieve Error values from a CSV file in a grid format
    
    Parameters:
        filename (str): Name of the CSV file to retrieve data from.
    
    Returns:
        alpha_grid (numpy.ndarray): 2D grid of alpha values.
        beta_grid (numpy.ndarray): 2D grid of beta values.
        error_grid (numpy.ndarray): 2D grid of error values.
    """
    alpha_values = []
    beta_values = []
    e_values = []
    
    with open(FOLDER+filename+'.csv', 'r') as csvfile:
        reader = csv.reader(csvfile)
        next(reader)  # Skip header row
        for row in reader:
            alpha_values.append(float(row[0]))
            beta_values.append(float(row[1]))
            e_values.append(float(row[2]))
    
    alpha_values = np.unique(alpha_values)
    beta_values = np.unique(beta_values)
    e_values = np.array(e_values).reshape(len(alpha_values), len(beta_values))

    alpha_grid, beta_grid = np.meshgrid(alpha_values, beta_values)
    
    return alpha_grid, beta_grid, e_values.reshape(alpha_grid.shape)

def get_results_heatmap(file_path:str, estimator:ModelInputParams, sample:int, runs:int, linspace:int, wind_lags:int, model:StructuralModel) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    try:
        alpha_demand, alpha_wind, error_grid = retrieve_error_grid_from_csv(file_path)            
    except FileNotFoundError as e:
        print(e)
        alpha_demand, alpha_wind, error_grid = run_simulations_for_heatmap(sample=sample, runs=runs, linspace=linspace, estimator=estimator, model=model, wind_lags=wind_lags)
        export_error_grid_to_csv(alpha_demand, alpha_wind, error_grid, file_path)

    return alpha_demand, alpha_wind, error_grid 

In [None]:
def plot_heatmap_panel(fig:Figure, axes:Axes, alpha_grid, beta_grid, error_values:np.ndarray, xlabel:str, model_name:str, ylabel:str):
    axes.set_xlabel(xlabel, fontsize='large')
    axes.set_ylabel(ylabel, fontsize='large')
    axes.set_title(model_name, fontsize='large')
    axes.set_ylim(-0.01, 1)
    axes.set_xlim(-0.01, 1)

    
    axes.contour(alpha_grid, beta_grid, abs(error_values.transpose()), levels=[1], colors='darkgreen', linestyles='solid', lw=2)
    axes.contour(alpha_grid, beta_grid, abs(error_values.transpose()), levels=[5], colors='lightgreen', linestyles=[(5, (10, 3))])
    axes.contour(alpha_grid, beta_grid, abs(error_values.transpose()), levels=[10], colors='gold', linestyles='dashed')
    axes.contour(alpha_grid, beta_grid, abs(error_values.transpose()), levels=[25], colors='darkorange', linestyles='dashdot')
    axes.contour(alpha_grid, beta_grid, abs(error_values.transpose()), levels=[50], colors='red', linestyles='dotted')
    axes.contour(alpha_grid, beta_grid, abs(error_values.transpose()), levels=[100], colors='purple', linestyles=[(5, (1, 4))])

In [None]:
def heatmap_legend():
    handles=[Line2D([0], [0], color="darkgreen", ls="solid", lw=2),
            Line2D([0], [0], color="lightgreen", ls=(5, (10, 3)), lw=2),
            Line2D([0], [0], color="gold", ls='dashed', lw=2),
            Line2D([0], [0], color="darkorange", ls="dashdot", lw=2),
            Line2D([0], [0], color="red", ls='dotted', lw=2),
            Line2D([0], [0], color="purple", ls=(5, (1, 4)), lw=2)]
    
    labels=["1% error line", 
            "5% error line", 
            "10% error line", 
            "25% error line", 
            "50% error line", 
            "100% error line"]
    
    return handles, labels

In [None]:
def heatmap(runs=10, linspace=25, sample=8760, wind_lags:int=1,
            estimator1:ModelInputParams=ModelInputParams(IVOptions.REGULAR, 0), 
            estimator2:ModelInputParams=ModelInputParams(IVOptions.CONDITIONAL_DEMAND, 1, order_w=0), 
            file:str="", 
            model_1:StructuralModel = StructuralModel.Model_I,
            model_2:StructuralModel = StructuralModel.Model_II,
            double:bool = True,
):

    name_dict = {
        StructuralModel.Model_I:"simple_model",
        StructuralModel.Model_III:"cross_price",
        StructuralModel.Model_II:"autocorrelated_error",
        StructuralModel.Model_IV:"complex_model",
    }

    name_1 = f"heatmap_{name_dict[model_1]}_{estimator1.strategy}_{sample//8760}_years_{runs}_runs_{linspace}_linspace_{wind_lags}_windlags".replace("IVOptions.", "")
    name_2 = f"heatmap_{name_dict[model_2]}_{estimator2.strategy}_{sample//8760}_years_{runs}_runs_{linspace}_linspace_{wind_lags}_windlags".replace("IVOptions.", "")

    if double:
        fig, axs = plt.subplots(1, 2, sharex=False,  sharey=False, figsize=(WIDE_WIDTH, HEIGHT))
    else:
        fig, axs = plt.subplots(1, 1, sharex=False,  sharey=False, figsize=(4.5, 4.5))


    fig_name = f"heatmap_{sample//8760}_years_{runs}_runs_{linspace}_linspace_{wind_lags}_windlags{'_double' if double else '_single'}"+file
    
    def _model_name(model:StructuralModel) -> str:
        return str(model).replace("StructuralModel.", "").replace("_", " ")
    
    num1 = "#1 " if estimator1.strategy == IVOptions.REGULAR else "#3 " if estimator1.strategy == IVOptions.CONDITIONAL_DEMAND else ""
    num2 = "#1 " if estimator2.strategy == IVOptions.REGULAR else "#3 " if estimator2.strategy == IVOptions.CONDITIONAL_DEMAND else ""

    alpha_demand, alpha_wind, error_grid = get_results_heatmap(name_1, estimator1, sample, runs, linspace, model=model_1, wind_lags=wind_lags)
    plot_heatmap_panel(fig, axs[0] if double else axs, alpha_demand, alpha_wind, error_grid,
                       ylabel="str. autocorrelation of demand ($\\beta^{D1}$)" if model_1 == StructuralModel.Model_I else "str. autocorrelation of price-insensitive demand ($\\beta^{B1}$)",
                       xlabel="str. autocorrelation of wind ($\\beta^W$)",
                       model_name=num1+estimator1.latex_name+' on '+_model_name(model_1))
    if double:
        alpha_demand, alpha_wind, error_grid = get_results_heatmap(name_2, estimator2, sample, runs, linspace, model=model_2, wind_lags=wind_lags)
        plot_heatmap_panel(fig, axs[1], alpha_demand, alpha_wind, error_grid, 
                        ylabel="str. autocorrelation of non price\nresponsive demand $\\beta^{B1}$",
                        xlabel="str. autocorrelation of wind $\\beta^W1$",
                        model_name=num2+estimator2.latex_name+' on '+_model_name(model_2))
    
    handles, labels = heatmap_legend()

    fig.legend(handles=handles, labels=labels,
                loc="upper center", frameon=False, bbox_to_anchor=(0.5, 0), fancybox=True, fontsize='large', ncols=3)
    fig.tight_layout()
    fig.show()

    save_figure(fig, fig_name, VERSION)


Figure 5

In [None]:
heatmap(20, 10, 8760*5, double=False)


Appendix figure

In [None]:
heatmap(20, 10, 8760*5, double=False, estimator1=ModelInputParams(IVOptions.CONDITIONAL_DEMAND, 1, order_w=0), model_1=StructuralModel.Model_II, file="_appendix_version")


# Observed AR

In [None]:
def run_bias_and_observed_autocorrelation(
    sample: int = 8760,
    runs: int = 3,
    linspace: int = 50,
    wind_manual_ar_sum: float | None = None,
    estimator: ModelInputParams = ModelInputParams(IVOptions.REGULAR, order=0),
    models:list[StructuralModel] = [StructuralModel.Model_I, StructuralModel.Model_II, StructuralModel.Model_III]
) -> dict[str, list[tuple[float, float]]]:
    
    name_dict = {
        StructuralModel.Model_I:"simple_model",
        StructuralModel.Model_II:"autocorrelated_error",
        StructuralModel.Model_III:"cross_price",
        StructuralModel.Model_IV:"complex_model",
    }

    # Create the X axis
    
    xx = np.linspace(-250, 250, linspace)
    dd = np.linspace(-0.249, 0.999, linspace)

    models_results_dict:dict[str, list[tuple[float, float]]] = {}

    # For each point on the x axis, run {runs} simulations with a demand type that corresponds to the specified model
    for model in models:
        model_list:list[tuple[float, float]] = []
        for i in range(linspace):
            d = dd[i]
            x = xx[i]
            random_integers: list[int] = random.sample(range(1, 1000001), runs)

            for n, seed in enumerate(random_integers):
                np.random.seed(seed)

                simulation = Simulation(sample)
                eq = simulation.get_equilibrium(demand=simulation.get_demand(model, d if model in [StructuralModel.Model_I, StructuralModel.Model_II] else x),
                                                supply=simulation.get_supply(wind_manual_ar_sum, wind_lags=1))                
                
                ar_params = ar_model_analysis(ar_model_fit(eq.clearing.demand, 1, exog=None))

                simulation.analyze_equilibrium(eq, [estimator], False, False)
                
                error = (-100-simulation.models[estimator.name].estimates[-1])/(-100)*100
                model_list.append((ar_params.lags_dict[-1], error))

                print(f"Simulation {i*runs+n} seed {seed} ({model})")

        models_results_dict[name_dict[model]] = model_list
            

    return models_results_dict


In [None]:
def save_observed_ar_to_csv(models_results_dict: dict[str, list[tuple[float, float]]], filename: str):
    with open(FOLDER+filename+'.csv', 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        for key, value in models_results_dict.items():
            writer.writerow([key] + [f'{x},{y}' for x, y in value])

def load_observed_ar_from_csv(filename: str) -> dict[str, list[tuple[float, float]]]:
    models_results_dict = {}
    with open(FOLDER+filename+'.csv', 'r') as csvfile:
        reader = csv.reader(csvfile)
        for row in reader:
            key = row[0]
            values = [(float(pair.split(',')[0]), float(pair.split(',')[1])) for pair in row[1:]]
            models_results_dict[key] = values
    return models_results_dict

def get_observed_ar_results(file_path:str, estimator:ModelInputParams, sample:int, runs:int, linspace:int, models:list[StructuralModel]) -> dict[str, list[tuple[float, float]]]:
    try:
        models_results_dict = load_observed_ar_from_csv(file_path)            
    except FileNotFoundError:
        models_results_dict = run_bias_and_observed_autocorrelation(sample=sample, runs=runs, linspace=linspace, estimator=estimator, models=models)
        save_observed_ar_to_csv(models_results_dict, file_path)

    return models_results_dict

In [None]:
def plot_observed_ar_panel(axes:Axes, models_results_dict: dict[str, list[tuple[float, float]]], runs:int, title:str, legend:bool=True):
    model_names = ["Model I", "Model II", "Model III"]
    markers = ['.', '*', '^']
    cmaps = ['Blues', 'Oranges', 'Greens']
    colors = ['dodgerblue', 'darkorange', 'darkgreen']

    axes.axhline(0, 0, 1, color='black', linewidth=0.8)


    zz = []
    n = len(models_results_dict['simple_model'])
    for z in np.linspace(-250, 250, n//runs):
        for i in range(runs):
            zz.append(z)
    
    for i, (key, values) in enumerate(models_results_dict.items()):
        sorted_values = sorted(values[:n-runs], key=lambda x: x[0])
        
        xx = [pair[0] for pair in values[:n-runs]]
        yy = [pair[1] for pair in values[:n-runs]]
        # axes.scatter(xx, yy, c=colors[i], label=model_names[i]+f" ({key})", marker=markers[i], alpha=0.5)
        axes.scatter(xx, yy, c=zz[:n-runs], cmap=cmaps[i], label=model_names[i]+f" ({key})", marker=markers[i], alpha=1)


    axes.set_ylim(-305, 305)
    axes.set_xlim(0, 1)


    axes.set_xlabel('observed autocorrelation of demand $\\alpha^{D1}$', size="large")
    axes.set_ylabel('percentage error', size="large")
    axes.set_title(title)
    if legend:
        axes.legend()
    axes.grid(True)
    # plt.show()

In [None]:
def observed_ar_legend():
    def _get_mid_color(cmap_name:str) -> str:
        cmap = plt.get_cmap(cmap_name)
        norm = plt.Normalize(vmin=0, vmax=1)    # type: ignore

        sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
        sm.set_array([])

        color_at_middle = sm.to_rgba(0.7)       # type: ignore

        return mcolors.to_hex(color_at_middle)  # type: ignore

    handles = [
        Line2D([0], [0], marker='.', color='w', markerfacecolor=_get_mid_color('Blues'), markersize=10),
        Line2D([0], [0], marker='*', color='w', markerfacecolor=_get_mid_color('Oranges'), markersize=10),
        Line2D([0], [0], marker='^', color='w', markerfacecolor=_get_mid_color('Greens'), markersize=8)
    ]

    labels = ['Model I', 'Model II', 'Model III']

    return handles, labels

In [None]:
def observed_ar(runs=10, linspace=21, sample=8760, estimator:ModelInputParams= ModelInputParams(IVOptions.REGULAR, 0)):
    fig, axes = plt.subplots(1, 1, sharex=True, sharey=True, figsize=(REGULAR_WIDTH, HEIGHT))
    fig_name = f"observed_ar_{sample//8760}_years_{runs}_runs_{linspace}_linspace_{estimator.strategy}".replace("IVOptions.", "")

    results = get_observed_ar_results(fig_name, estimator, sample, runs, linspace, 
                                      models=[StructuralModel.Model_I, StructuralModel.Model_II, StructuralModel.Model_III])
    
    num = "#1 " if estimator.strategy == IVOptions.REGULAR else "#3 " if estimator.strategy == IVOptions.CONDITIONAL_DEMAND else ""

    plot_observed_ar_panel(axes, results, runs, title=num+estimator.latex_name, legend=False)

    handles, labels = observed_ar_legend()

    fig.legend(handles, labels, loc="upper center", frameon=False, bbox_to_anchor=(0.5, 0), fancybox=True, fontsize='large', ncol=3)

    save_figure(fig, fig_name)



In [None]:
observed_ar(4, 100, sample=8760*5, estimator=ModelInputParams(IVOptions.REGULAR, order=0))


In [None]:
observed_ar(4, 100, sample=8760*5, estimator=ModelInputParams(IVOptions.CONDITIONAL_DEMAND, order=1, order_w=0))

In [None]:
def double_panel_observed_ar(runs=10, linspace=21, sample=8760, 
                             estimator1:ModelInputParams= ModelInputParams(IVOptions.REGULAR, 0), 
                             estimator2:ModelInputParams= ModelInputParams(IVOptions.CONDITIONAL_DEMAND, 1, order_w=0)):
    fig, [ax1, ax2] = plt.subplots(1, 2, sharex=True, sharey=True, figsize=(WIDE_WIDTH, HEIGHT))

    fig_name_1 = f"observed_ar_{sample//8760}_years_{runs}_runs_{linspace}_linspace_{estimator1.strategy}".replace("IVOptions.", "")
    fig_name_2 = f"observed_ar_{sample//8760}_years_{runs}_runs_{linspace}_linspace_{estimator2.strategy}".replace("IVOptions.", "")

    models = [StructuralModel.Model_I, StructuralModel.Model_II, StructuralModel.Model_III]

    results1 = get_observed_ar_results(fig_name_1, estimator1, sample, runs, linspace, 
                                       models=models)
    results2 = get_observed_ar_results(fig_name_2, estimator2, sample, runs, linspace, 
                                       models=models)

    plot_observed_ar_panel(ax1, results1, runs, "a)   "+estimator1.latex_name, False)
    plot_observed_ar_panel(ax2, results2, runs, "b)   "+estimator2.latex_name, False)
    fig.tight_layout()

    handles, labels = observed_ar_legend()

    fig.legend(handles, labels, loc="upper center", frameon=False, bbox_to_anchor=(0.5, 0), fancybox=True, fontsize='large')


    save_figure(fig, f"double_panel_observed_ar_{sample//8760}_years_{runs}_runs_{linspace}_linspace")
    fig.show()


In [None]:
double_panel_observed_ar(4, 100, 8760*5)

# Combined heatmap and observed AR

In [None]:
def combined_heatmap_and_observed_ar_plot(
        heatmap_runs:int,
        heatmap_linspace:int,
        observed_ar_runs:int,
        observed_ar_linspace:int,
        sample:int,
        estimator:ModelInputParams):
    
    if estimator.strategy == IVOptions.REGULAR:
        model = StructuralModel.Model_I
    elif estimator.strategy == IVOptions.CONDITIONAL_DEMAND:
        model = StructuralModel.Model_II
    else:
        raise NotImplementedError()
    
    name_dict = {
        StructuralModel.Model_I:"simple_model",
        StructuralModel.Model_III:"cross_price",
        StructuralModel.Model_II:"autocorrelated_error",
        StructuralModel.Model_IV:"complex_model",
    }

    # HEATMAP

    heatmap_name = f"heatmap_{name_dict[model]}_{estimator.strategy}_{sample//8760}_years_{heatmap_runs}_runs_{heatmap_linspace}_linspace".replace("IVOptions.", "")
    
    fig, [ax1, ax2] = plt.subplots(1, 2, sharex=False, sharey=False, figsize=(WIDE_WIDTH, HEIGHT))

    alpha_grid, beta_grid, errors = get_results_heatmap(heatmap_name, estimator, sample, heatmap_runs, heatmap_linspace, 1, model)
    
    plot_heatmap_panel(fig, ax1, alpha_grid, beta_grid, errors,
                       ylabel="str. autocorrelation of demand ($\\beta^{D1}$)",
                       xlabel="str. autocorrelation of wind ($\\beta^W$)",
                       model_name='a) Bias of '+estimator.latex_name+f' on {model}'.replace('_', ' ').replace("StructuralModel.", ""))

    # OBSERVED AR

    observed_ar_name = f"observed_ar_{sample//8760}_years_{observed_ar_runs}_runs_{observed_ar_linspace}_linspace_{estimator.strategy}".replace("IVOptions.", "")

    models_results_dict = get_observed_ar_results(observed_ar_name, estimator, sample, observed_ar_runs, observed_ar_linspace, 
                                                  models=[StructuralModel.Model_I, StructuralModel.Model_II, StructuralModel.Model_III])

    title = "b) Lack of correlation between $\\alpha^{D1}$ and bias"

    plot_observed_ar_panel(ax2, models_results_dict, observed_ar_runs, title, legend=False)

    handles_1, labels_1 = heatmap_legend()

    handles_2, labels_2 = observed_ar_legend()

    handles = handles_1+handles_2
    labels = labels_1+labels_2

    fig.legend(handles, labels, loc="upper center", frameon=False, bbox_to_anchor=(0.5, 0), fancybox=True, fontsize='large', ncol=3)

    save_figure(fig, f"combined_heatmap_and_observed_ar")
    fig.show()
    

In [None]:
# combined_heatmap_and_observed_ar_plot(heatmap_runs=20, heatmap_linspace=10,
#                                       observed_ar_runs=4, observed_ar_linspace=100,
#                                       sample=8760*5, 
#                                       estimator=ModelInputParams(IVOptions.REGULAR, 0))

# Indicators

In [None]:
def run_estimator_indicator_analysis(estimator:ModelInputParams, max_sample:int=50, runs:int=10, linspace:int=7, wind_manual_ar_sum=None, alpha=0.05, model:StructuralModel=StructuralModel.Model_I, min_sample=1):
    xx = [round(x) for x in np.linspace(min_sample, max_sample, linspace)]

    yy_error = []
    yy_error_lower = []
    yy_error_upper = []

    yy_distance = []
    yy_distance_lower = []
    yy_distance_upper = []

    yy_coverage = []
    yy_coverage_lower = []
    yy_coverage_upper = []

    for i, x in enumerate(xx):
        sample = x * 8760

        simulation = Simulation(sample)

        simulation.run_simulations_by_model(runs, 
                                            estimators=[estimator],
                                            model=model,
                                            demand_arg=None, 
                                            simulation_count=i*runs)

        if estimator.name in simulation.models.keys():
            name = estimator.name
        else:
            name = estimator.name+"_P0"

        yy_error.append(simulation.models[name].average_absolute_percentage_error)
        error_low, error_up = ws.DescrStatsW(simulation.models[name].absolute_percentage_errors).tconfint_mean(alpha=alpha)
        if any(e < 0 for e in simulation.models[name].absolute_percentage_errors):
            raise Exception("Some errors are negative")
        print(f"ERRORS\nfrom {simulation.models[name].absolute_percentage_errors}\n\tmean:{simulation.models[name].average_absolute_percentage_error}\n\tupper:{error_up}\n\tlower:{error_low}")
        yy_error_lower.append(error_low)
        yy_error_upper.append(error_up)

        yy_distance.append(simulation.models[name].average_distance)
        distance_low, distance_up = ws.DescrStatsW(simulation.models[name].ci_upper_bound).tconfint_mean(alpha=alpha)
        if any(e < 0 for e in simulation.models[name].ci_upper_bound):
            raise Exception("Some distances are negative")
        yy_distance_lower.append(distance_low)
        yy_distance_upper.append(distance_up)


        yy_coverage.append(simulation.models[name].average_coverage)
        coverage_low, coverage_up = proportion.proportion_confint(simulation.models[name].coverage_count, simulation.models[name].n, alpha=alpha)
        if coverage_low < 0:
            raise Exception("Coverage low is lower than 0!")
        if coverage_up > 1:
            raise Exception("Coverage up is higher than 1!")
        yy_coverage_lower.append(coverage_low)
        yy_coverage_upper.append(coverage_up)

    # Organize results into a DataFrame

    sub_index = ["Mean", "Upper", "Lower"]
    index = pd.MultiIndex.from_product([["Error", "Distance", "Coverage"], sub_index])
    
    data = [
        yy_error,
        yy_error_upper,
        yy_error_lower,
        yy_distance,
        yy_distance_upper,
        yy_distance_lower,
        yy_coverage,
        yy_coverage_upper,
        yy_coverage_lower,
    ]

    results_df = pd.DataFrame(data, index=index, columns=xx).T

    return results_df

In [None]:
def plot_goodness_panel_from_indicator(axes:Axes, dfs:list[pd.DataFrame], names:list[str], indicator:str, lables:bool, model:StructuralModel, xlabel:bool=False):
    color1 = '#377eb8'
    color2 = '#ff7f00'
    color3 = '#4daf4a'

    colors = [color1, color2, color3]
    hatches = ['++', '..', 'XX']

    xx = dfs[0].index

    indicator_name_dict:dict[str, str] = {
        'Coverage':'coverage',
        'Error':'absolute percentage error',
        'Distance':'length of confidence interval'
    }

    indicator_ylim:dict[str, tuple[float, float]] = {
        'Coverage':(-0.01, 1.01),
        'Error':(0, 25 if model == StructuralModel.Model_II else 20),
        'Distance':(0, 50 if model == StructuralModel.Model_II else 40)
    }
    
    axes.set_ylabel(indicator_name_dict[indicator], color='k', size="large")

    for i, df in enumerate(dfs):

        axes.plot(xx, df[indicator]['Mean'], color=colors[i], label=names[i] if lables else '')
        axes.fill_between(xx, df[indicator]['Upper'], df[indicator]['Lower'], color=colors[i], alpha=0.4, hatch=hatches[i])
        axes.tick_params('y', colors='k')
    
        axes.set_xlim(1, max(xx))
        axes.set_ylim(indicator_ylim[indicator])
        if xlabel:
            axes.set_xlabel("years [sample size / 8760]", size="large")
        axes.set_xticks(xx)

    # Create custom legend handles
    legend_elements = [
        # Line2D([0], [0], color=colors[i], lw=2, label=names[i] if lables else '') for i in range(len(dfs))
    # ] + [
        Patch(edgecolor=colors[i], facecolor=colors[i], hatch=hatches[i], label=names[i] if lables else '', alpha=0.5) for i in range(len(dfs))
    ]

    return legend_elements



In [None]:
def plot_indicators(linspace:int=20, runs:int=50, max_sample:int=30, model:StructuralModel=StructuralModel.Model_I, min_sample:int=1):
    fig, axss  = plt.subplots(1, 3, sharex=True, sharey=False, figsize=(EXTRA_WIDTH, HEIGHT))
    [axs1, axs2, axs3] = axss

    name_dict = {
        StructuralModel.Model_I:"simple_model",
        StructuralModel.Model_II:"autocorrelated_error",
        StructuralModel.Model_III:"cross_price",
        StructuralModel.Model_IV:"complex_model",
    }
    _model = name_dict[model]

    estimator1 = ModelInputParams(IVOptions.CONDITIONAL_WIND, order=26)
    estimator2 = ModelInputParams(IVOptions.CONDITIONAL_H0, order=26)
    estimator3 = ModelInputParams(IVOptions.IV_2dim_ORDER, order=26, order_price=1)

    estimator_number = {
        IVOptions.CONDITIONAL_WIND:2,
        IVOptions.IV_2dim_ORDER:8,
        IVOptions.RESIDUAL_WIND:8,
        IVOptions.CONDITIONAL_H0:4,
    }


    name_1 = f"goodness_output_{_model}_{estimator1.strategy}_{max_sample}_years_{runs}_runs_{linspace}_linspace".replace("IVOptions.", "")
    name_2 = f"goodness_output_{_model}_{estimator2.strategy}_{max_sample}_years_{runs}_runs_{linspace}_linspace".replace("IVOptions.", "")
    name_3 = f"goodness_output_{_model}_{estimator3.strategy}_{max_sample}_years_{runs}_runs_{linspace}_linspace".replace("IVOptions.", "")

    fig_name = f"goodness_plot_{_model}_{max_sample}_years_{runs}_runs_{linspace}_linspace"

    try:
        df_1 = read_results_from_file(name_1+'.csv')
    except FileNotFoundError as e:
        print(e) 
        df_1 = run_estimator_indicator_analysis(min_sample=min_sample, max_sample=max_sample, runs=runs, linspace=linspace, estimator=estimator1, model=model)
        write_results_to_file(name_1, df_1)

    try:
        df_2 = read_results_from_file(name_2+'.csv')
    except FileNotFoundError as e:
        print(e)
        df_2 = run_estimator_indicator_analysis(min_sample=min_sample, max_sample=max_sample, runs=runs, linspace=linspace, estimator=estimator2, model=model)
        write_results_to_file(name_2, df_2)

    try:
        df_3 = read_results_from_file(name_3+'.csv')
    except FileNotFoundError as e:
        print(e)
        df_3 = run_estimator_indicator_analysis(min_sample=min_sample, max_sample=max_sample, runs=runs, linspace=linspace, estimator=estimator3, model=model)
        write_results_to_file(name_3, df_3)

    estimator1_label = f"#{estimator_number[estimator1.strategy]} {estimator1.latex_name}"
    estimator2_label = f"#{estimator_number[estimator2.strategy]} {estimator2.latex_name}"
    estimator3_label = f"#{estimator_number[estimator3.strategy]} {estimator3.latex_name}"

    labels = [estimator1_label, estimator2_label, estimator3_label]

    legend_elements = plot_goodness_panel_from_indicator(axs1, [df_1, df_2, df_3], labels, "Coverage", True, model, True)
    plot_goodness_panel_from_indicator(axs2, [df_1, df_2, df_3], labels, "Error", False, model, True)
    plot_goodness_panel_from_indicator(axs3, [df_1, df_2, df_3], labels, "Distance", False, model, True)

    fig.legend(handles=legend_elements, loc="upper center", frameon=False, bbox_to_anchor=(0.5, 0), fancybox=True, fontsize='large', ncols=1)
    fig.show()

    save_figure(fig, fig_name)

In [None]:
def run_single_estimator_indicator(model:StructuralModel, min_sample:int, max_sample:int, runs:int, linspace:int, estimator:ModelInputParams):
    name_dict = {
        StructuralModel.Model_I:"simple_model",
        StructuralModel.Model_II:"autocorrelated_error",
        StructuralModel.Model_III:"cross_price",
        StructuralModel.Model_IV:"complex_model",
    }
    _model = name_dict[model]

    if min_sample == 1:
        name_1 = f"goodness_output_{_model}_{estimator.strategy}_{max_sample}_years_{runs}_runs_{linspace}_linspace".replace("IVOptions.", "")
    else:
        name_1 = f"goodness_output_{_model}_{estimator.strategy}_{min_sample}-{max_sample}_years_{runs}_runs_{linspace}_linspace".replace("IVOptions.", "")
    df_1 = run_estimator_indicator_analysis(min_sample=min_sample, max_sample=max_sample, runs=runs, linspace=linspace, estimator=estimator, model=model)
    write_results_to_file(name_1, df_1)

In [None]:
# run_single_estimator_indicator(StructuralModel.Model_I, 15, 21, 1, 6, 
#                                ModelInputParams(IVOptions.CONDITIONAL_H0, order=26))

In [None]:
plot_indicators(11, 50, 21, StructuralModel.Model_I, 1)

In [None]:
plot_indicators(11, 50, 21, StructuralModel.Model_II, 1)


In [None]:
plot_indicators(11, 50, 21, StructuralModel.Model_III, 1)


# Application panel

In [None]:
models=[
        ModelInputParams(IVOptions.REGULAR, order=0),
        ModelInputParams(IVOptions.CONDITIONAL_WIND, order=26),
        ModelInputParams(IVOptions.CONDITIONAL_DEMAND, order=1, order_w=0),
        ModelInputParams(IVOptions.CONDITIONAL_H0, order=26),
        ModelInputParams(IVOptions.TRUNCATED_NUISANCE_ORDER, order=26, order_d=1),
        ModelInputParams(IVOptions.CLEAN_2dim_ORDER, order=26, order_price=1),
        ModelInputParams(IVOptions.TRUNCATED_IV_2dim_ORDER, order=26, order_price=1),
        ModelInputParams(IVOptions.IV_2dim_ORDER, order=26, order_price=1),
        ]


def run_model_simulation(sample:int, estimators:list[ModelInputParams], runs:int, model:StructuralModel) -> tuple[list[list[float]], list[list[float]], list[str]]:
    simulation = Simulation(sample=sample)

    
    simulation.run_simulations_by_model(runs=runs,
                                        estimators=estimators,
                                        model=model,
                                        demand_arg=0.7 if model == StructuralModel.Model_I else None)
    
    results_p0:list[list[float]] = []
    results_p1:list[list[float]] = []
    names = [est.latex_name for est in estimators]
    prev_model = list(simulation.models.keys())[0].split("_")[0]

    for name, estimator_results in simulation.models.items():
        print(name, estimator_results.results)

        new_model = name.split("_")[0]
        if new_model != prev_model:
            prev_model = new_model
            for res_list in [results_p1]:
                if len(res_list) < len(results_p0):
                    res_list.append([])
                
        if "_" not in name or name.endswith("_P0"):
            results_p0.append([res.estimate for res in estimator_results.results])
        elif name.endswith("_P1"):
            results_p1.append([res.estimate for res in estimator_results.results])
    # This is needed just in case the last one also does not have an estimate for all of them!
    for res_list in [results_p1]:
        if len(res_list) < len(results_p0):
            res_list.append([])

    return results_p0, results_p1, names

In [None]:
def read_application_boxplot_results_from_csv(file_path:str) -> tuple[list[list[float]], list[list[float]], list[str]]:
    try:
        df:pd.DataFrame = pd.read_csv(FOLDER+file_path, index_col=0)
        results_p0, results_p1, names = [], [], []

        for col in df.columns:
            if col.endswith("_P0"):
                results_p0.append(list(df[col].values))
                names.append(col.removesuffix("_P0"))

            elif col.endswith("_P1"):
                results_p1.append(list(df[col].values))

            else:
                raise ValueError(f"Unexpected column name {col} in file {file_path}")
            
        return results_p0, results_p1, names
    
    except FileNotFoundError as e:
        raise e

def save_application_boxplot_results(results_p0:list[list[float]], results_p1:list[list[float]], names:list[str], file_path:str, runs:int):
    data:dict[str, list[float]] = {}

    for i, name in enumerate(names):
        data[name+"_P0"] = results_p0[i] if results_p0[i] else [np.nan for i in range(runs)]
        data[name+"_P1"] = results_p1[i] if results_p1[i] else [np.nan for i in range(runs)]

    df = pd.DataFrame(data)
    df.to_csv(FOLDER+file_path)

In [None]:
def get_application_boxplot_results(model:StructuralModel, sample:int, runs:int, estimators:list[ModelInputParams], file_path:str) -> tuple[list[list[float]], list[list[float]], list[str]]:
    file_path = "application_figure_"+file_path+f"_{sample//8760}_years_{runs}_runs.csv"
    
    try:
        results_p0, results_p1, names = read_application_boxplot_results_from_csv(file_path) 

        # If there are less results that asked for, run the estimators that are missing
        missing_estimators = [estimator for estimator in estimators if estimator.latex_name not in names]
        if missing_estimators:
            print("There are missing estimators:\n", [estimator.name for estimator in missing_estimators], "\n are not in \n", names)
            new_results_p0, new_results_p1, new_names = run_model_simulation(sample=sample, runs=runs, estimators=missing_estimators, model=model)
            results_p0 = results_p0 + new_results_p0
            results_p1 = results_p1 + new_results_p1
            names = names + new_names
            save_application_boxplot_results(results_p0, results_p1, names, file_path, runs=runs)
        
        # If there are more results that asked for, drop unwanted results
        if len(names) > len(estimators):
            estim_names = [estimator.latex_name for estimator in estimators]
            filtered_results = [(p0, p1, name) for p0, p1, name in zip(results_p0, results_p1, names) if name in estim_names]
            results_p0, results_p1, names = map(list, zip(*filtered_results)) if filtered_results else ([], [], [])


        # Ensure the order matches the order of the input estimators
        estim_name_to_index = {estimator.latex_name: i for i, estimator in enumerate(estimators)}
        ordered_results = sorted(zip(results_p0, results_p1, names), key=lambda x: estim_name_to_index[x[2]])

        # Unzip the sorted list back into individual lists
        results_p0, results_p1, names = map(list, zip(*ordered_results))


    except FileNotFoundError as e:
        print(e, "\nStarting simulation")
        results_p0, results_p1, names = run_model_simulation(sample=sample, runs=runs, estimators=estimators, model=model)
        save_application_boxplot_results(results_p0, results_p1, names, file_path, runs=runs)

    return results_p0, results_p1, names


In [None]:
def application_boxplot(results_p0:list[list[float]],
                        results_p1:list[list[float]], 
                        names:list[str], 
                        output_file:str, 
                        xlim, 
                        own_price_elasticity, 
                        cross_price_elasticity,
                        replace_ellipsis:bool):
        axs:Axes
        fig, axs = plt.subplots(1, 1, sharex=False,  sharey=True, figsize=(WIDE_WIDTH, HEIGHT), subplot_kw={'axes_class': axisartist.Axes})
   
        names = [f"#{i+1}   {n} " for i, n in enumerate(names)]
        if replace_ellipsis:
             names = [func_replace_ellipsis(name) for name in names]
        xx_p0: list[list[float]] = list(reversed([[result for result in method_results] for method_results in results_p0]))
        xx_p1: list[list[float]] = list(reversed([[result for result in method_results] for method_results in results_p1]))
       
        ticks: list[str] = list(reversed(names))
   
        bplot1 = axs.boxplot(xx_p0, vert=False, patch_artist=True)
        bplot2 = axs.boxplot(xx_p1, vert=False, patch_artist=True, notch=True)
   
        colors = ["darkorange", "dodgerblue"]
        bbpp = [bplot1, bplot2]
   
        for plot, color in zip(bbpp, colors):
            for patch in plot['boxes']:
                patch.set_facecolor(color)
            for median in plot['medians']:
                median.set_color('black')
   
        legend_elements = [
            bplot1["boxes"][0], 
            bplot2["boxes"][0],
            Line2D([0], [0], color='darkorange', linestyle="--",),
            Line2D([0], [0], color='dodgerblue', linestyle=":",),
        ]
        legend_labels = [
            "estimated own-price demand response $\\hat{\\beta}^P$ ($P_t\\rightarrow D_t$)", 
            "estimated cross-price demand response $\\hat{\\beta}^{P1}$ ($P_{t-1}\\rightarrow D_t$)",
            "true own-price demand response $\\beta^P$",
            "true cross-price demand response $\\beta^{P1}$",
        ]
   
        axs.tick_params(axis='y', which='both', left=False, labelleft=True, reset=True)
   
        axs.set_yticks(range(1, len(ticks)+1))
        axs.set_yticklabels(ticks, fontsize="large")
        axs.yaxis.grid(visible=False)
        axs.axis["left"].major_ticklabels.set_ha("left")    # type: ignore
        axs.axis["left"].major_ticks.set_ticksize(0)        # type: ignore
        

        axs.set_ylim(0.5, len(ticks)+0.5)
        axs.set_xlim(xlim)
        
        axs.axvline(x=0, ls='-', color='k', lw=1, alpha=1)
        axs.axvline(x=own_price_elasticity, ls='--', color='darkorange', lw=2, alpha=1)
        axs.axvline(x=cross_price_elasticity, ls=':', color='dodgerblue', lw=2, alpha=1)
   
        for h in range(0, len(xx_p0)+1):
            axs.axhline(y=h + 0.5, ls="-", color="grey", xmin=-0.425, xmax=1, clip_on=False)
    
        # Add estimator validity table
        plot_table(fig)
    
        # Add the legend to the figure,
        fig.legend(handles=legend_elements, labels=legend_labels, loc='upper center', 
                   ncol=2, bbox_to_anchor=(0.5, 0), fancybox=True, frameon=False, fontsize="large")
    
        save_figure(fig, output_file+"_application_figure", VERSION)
   
  

In [None]:
def plot_application_boxplot(sample:int, runs:int, output_file:str, model:StructuralModel, estimators:list[ModelInputParams]):
    
    results_p0, results_p1, names = get_application_boxplot_results(model=model,
                                                                   estimators=estimators,
                                                                   sample=sample,
                                                                   runs=runs,
                                                                   file_path=output_file)
    
    print(names)
    
    xlims:dict[StructuralModel, tuple[float, float]] = {
        StructuralModel.Model_I:(-400, 100),
        StructuralModel.Model_II:(-200, 100),
        StructuralModel.Model_III:(-200, 100),
    }

    application_boxplot(
        results_p0=results_p0,
        results_p1=results_p1,
        names=names,
        output_file=output_file,
        xlim=xlims[model],
        own_price_elasticity=-100,
        cross_price_elasticity=50 if model == StructuralModel.Model_III else 0,
        replace_ellipsis=False
    )

In [None]:
plot_application_boxplot(8760*5, 100, "model_I", StructuralModel.Model_I, models)

In [None]:
plot_application_boxplot(8760*5, 100, "model_II", StructuralModel.Model_II, models)

In [None]:
plot_application_boxplot(8760*5, 100, "model_III", StructuralModel.Model_III, models)

# Singular values

In [None]:
# This plot and analysis is not included in the current draft

In [None]:
def covariance(x, y):
    return np.mean(x * y) - np.mean(x) * np.mean(y)

def cross_covariance_matrix(vector_x, vector_y):
    num_x = len(vector_x)
    num_y = len(vector_y)

    assert num_y >= num_x, "There must be at least as many instruments as endogeneous variables"

    matrix = np.zeros((num_x, num_y))

    for i in range(num_x):
        for j in range(num_y):
            matrix[i, j] = covariance(vector_x[i], vector_y[j])

    return matrix

def calculate_eigenvalues(matrix):
    eigenvalues, _ = np.linalg.eig(matrix)
    return eigenvalues

def calculate_singular_values(matrix):
    U, singular_values, VT = np.linalg.svd(matrix)
    return singular_values

In [None]:
def run_singular_values_analysis(
    sample: int = 8760,
    runs: int = 10,
    linspace: int = 21,
    wind_manual_ar_sum: float | None = None,
    estimator:ModelInputParams = ModelInputParams(IVOptions.NUISANCE_ORDER, 1),
    model:StructuralModel = StructuralModel.Model_III,
    instruments: int = 2,
    metric:Literal["eigenvalues", "singular values"] = "singular values"
) -> pd.DataFrame:
    
    # Create the X axis
    xx = np.linspace(-200, 200, linspace)
    # dd = np.linspace(-0.249, 0.499, linspace)
    dd = np.linspace(-0.249, 0.899, linspace)

    # Create the empty lists to store the analysis output
    mean = []
    lower_bound = []
    upper_bound = []
    matrix_list = []

    # Create the simulation instances

    for i, x in enumerate(xx):
        d = dd[i]

        min_metric_value = []
        matrix_at_d = []

        random_integers: list[int] = random.sample(range(1, 1000001), runs)

        for n, seed in enumerate(random_integers):
            np.random.seed(seed)

            simulation = Simulation(sample)

            equilibrium = simulation.get_equilibrium(
                demand=simulation.get_demand(model,
                                            x if model == StructuralModel.Model_III else d),
                supply=simulation.get_supply(wind_manual_ar_sum, 26)
            )

                
            matrix = cross_covariance_matrix(vector_x=[equilibrium.clearing.price, equilibrium.clearing.demand.shift(1)],
                                            vector_y=[equilibrium.supply.wind.shift(i) for i in range(instruments)])
            
            matrix_at_d.append(matrix)
            # min_eigenvalue.append(min(calculate_eigenvalues(matrix)))
            if metric == "eigenvalues":
                min_metric_value.append(min([abs(v) for v in calculate_eigenvalues(matrix)]))
            elif metric == "singular values":
                min_metric_value.append(min([abs(v) for v in calculate_singular_values(matrix)]))
            else:
                raise ValueError(f"Metric {metric} not implemented")
            run = i*runs+n+1
            print(f"Run {run}/{runs*linspace} finished ({(run)/(runs*linspace)*100}%)")
             
        # For each simulation store the output estimates average, upper bound and lower bound

        mean.append(np.average(min_metric_value))
        low, up = ws.DescrStatsW(min_metric_value).tconfint_mean(alpha=0.05)
        lower_bound.append(low)
        upper_bound.append(up)
        matrix_array = np.array(matrix_at_d)
        averaged_matrix = np.mean(matrix_array, axis=0)
        matrix_list.append(averaged_matrix)

    # return mean, lower_bound, upper_bound, matrix_list
        
    data = {'mean': mean, 'lower': lower_bound, 'upper': upper_bound}
    df = pd.DataFrame(data, index=xx)
    return df




In [None]:
def plot_singular_value_panel(axes:Axes, df:pd.DataFrame, title:str=""):

    xx = df.index

    axes.plot(xx, df["mean"], color='dodgerblue', label='Minimum singular value')
    axes.fill_between(xx, np.array(df["lower"].values), np.array(df["upper"].values), color='dodgerblue', alpha=0.3)
    axes.set_xlabel("Cross price elasticity ($\\beta^{P1}$)")
    axes.set_ylabel("Min. abs. Singular Value")
    axes.set_ylim(0, 40000)
    axes.set_xlim(-200, 200)
    axes.set_title(title)

In [None]:
def get_singular_values_results(sample:int=8760*5, linspace:int=100, runs:int=50, model:StructuralModel=StructuralModel.Model_III, estimator:ModelInputParams=ModelInputParams(IVOptions.NUISANCE_ORDER, 1), instruments:int=2, metric:Literal['singular values', 'eigenvalues']="singular values"):
    name_dict = {
        StructuralModel.Model_I:"simple_model",
        StructuralModel.Model_II:"autocorrelated_error",
        StructuralModel.Model_III:"cross_price",
        StructuralModel.Model_IV:"complex_model",
    }
    _model = name_dict[model]
    
    file_name = f"{metric.replace(' ', '_')}_{_model}_{estimator.strategy}_{sample//8760}_years_{runs}_runs_{linspace}_linspace_{instruments}_instruments".replace("IVOptions.", "")

    try:
        df = read_results_from_file(file_name+'.csv', 1)
    except FileNotFoundError as e:
        print(e)
        df = run_singular_values_analysis(sample=sample, runs=runs, linspace=linspace, estimator=estimator, model=model, instruments=instruments, metric=metric)
        write_results_to_file(file_name, df)
    
    return df


In [None]:
def singular_values_plot(sample:int=8760*5, linspace:int=100, runs:int=50, model:StructuralModel=StructuralModel.Model_III, estimator:ModelInputParams=ModelInputParams(IVOptions.NUISANCE_ORDER, 1), instruments:int=2, metric:Literal['singular values', 'eigenvalues']="singular values"):
    
    name_dict = {
        StructuralModel.Model_I:"simple_model",
        StructuralModel.Model_II:"autocorrelated_error",
        StructuralModel.Model_III:"cross_price",
        StructuralModel.Model_IV:"complex_model",
    }
    _model = name_dict[model]
    file_name = f"{metric.replace(' ', '_')}_{_model}_{estimator.strategy}_{sample//8760}_years_{runs}_runs_{linspace}_linspace_{instruments}_instruments".replace("IVOptions.", "")
    
    df = get_singular_values_results(sample=sample, 
                                        linspace=linspace, 
                                        runs=runs, 
                                        model=model, 
                                        instruments=instruments,
                                        metric=metric)

    fig, axes = plt.subplots(1, 1, sharex=True, sharey=True, figsize=(REGULAR_WIDTH, HEIGHT))
    plot_singular_value_panel(axes, df, title="Singular values of the cross-correlation matrix\nfor 2 instruments ($W_0, W_1$) and 2 endogeneous variables ($P_0, D_1$)")
    fig.tight_layout()
    
    save_figure(fig, file_name)

In [None]:
# singular_values_plot()

In [None]:
# fig, axes = plt.subplots(1, 2, sharex=True, sharey=True, figsize=(6*2, 3.6))
# plot_singular_value_panel(axes[0], get_singular_values_results(), title="With 2 instruments ($W_t, W_{t-1}$)")
# plot_singular_value_panel(axes[1], get_singular_values_results(instruments=27), title="With 27 instruments ($W_t, ..., W_{t-26}$)")