In [1]:
import pandas as pd
import pendulum
import matplotlib.pyplot as plt
from pydantic import BaseModel
from flo import DGraph, DNode, DEdge
from dgraph_visualizer import DGraphVisualizer
from named_types import FloParamsHouse0


class SimulationParams(BaseModel):
    # Hourly prices and weather
    unix_start_s: list[int]
    usd_mwh: list[float]
    usd_mwh_alternative: list[float]
    oat_f: list[float]
    wind_speed_mph: list[float]
    # Hardware
    storage_gallons: int
    hp_power_kw: float
    cop_intercept: float
    cop_oat_coeff: float
    cop_min: float
    # House parameters
    alpha: float
    beta: float
    gamma: float
    intermediate_power_kw: float
    intermediate_rswt_f: float
    dd_power_kw: float
    dd_rswt_f: float
    dd_delta_t_f: float
    # Business
    max_payback_years: int


class Simulator():
    def __init__(self, params: SimulationParams):
        self.params = params
        self.simulation_length = len(params.usd_mwh)
        self.initial_top_temp = 120
        self.initial_middle_temp = 100
        self.initial_bottom_temp = 100
        self.initial_thermocline1 = 1
        self.initial_thermocline2 = 1
        # Results
        self.hour_cost_list = []
        self.hour_elec_used_list = []
        self.total_cost = 0
        self.total_elec_used = 0
        # Running the simulation
        self.simulate()
        self.print_results()
        self.plot_results()

    def simulate(self):
        for hour in range(self.simulation_length):
            self.simulate_hour(hour)

    def print_results(self):
        average_price_paid = self.total_cost / self.total_elec_used * 1000
        average_alternative_price = sum(self.params.usd_mwh_alternative)/len(self.params.usd_mwh_alternative)
        relative_decrease = (average_price_paid - average_alternative_price) / average_alternative_price
        would_have_paid = self.total_elec_used*average_alternative_price
        total_savings = self.total_cost - would_have_paid
        max_system_cost = total_savings * self.params.max_payback_years

        print(f"\nAverage alternative price during period: {round(average_alternative_price,2)} USD/MWh")
        print(f"Average price paid during period: {round(average_price_paid,2)} USD/MWh {-round(relative_decrease*100,2)}%")

        print(f"\nPaid {round(self.total_cost,2)} USD for {round(self.total_elec_used,2)} MWh")
        print(f"Would have paid {round(would_have_paid,2)} USD for {round(self.total_elec_used,2)} MWh")
        print(f"=> Total savings: {round(total_savings,2)} USD")
        print(f"=> The system would need to cost {round(max_system_cost,2)} USD to break even in {self.params.max_payback_years} years")

        # Sanity check
        if self.total_cost != sum(self.hour_cost_list):
            print("WARNING: Total cost is not equal to the sum of the hourly costs")
            print(f"Total cost: {round(self.total_cost,2)}, sum of hourly costs: {round(sum(self.hour_cost_list),2)}")
        if self.total_elec_used != sum(self.hour_elec_used_list):
            print("WARNING: Total elec used is not equal to the sum of the hourly elec used")
            print(f"Total elec: {round(self.total_elec_used,2)}, sum of hourly elec: {round(sum(self.hour_elec_used_list),2)}")

    def plot_results(self):
        fig, ax = plt.subplots(1,1, figsize=(12,3))
        ax.step(self.params.unix_start_s, self.params.usd_mwh)
        ax.plot(self.params.unix_start_s, [0]*len(self.params.unix_start_s), color='tab:blue', linestyle='dotted', alpha=0.5)
        ax2 = ax.twinx()
        ax2.step(self.params.unix_start_s, self.hour_elec_used_list, color='tab:orange')
        plt.show()

    def simulate_hour(self, hour: int):
        print(f"Simulating hour {hour}")
        
        flo_params = FloParamsHouse0(
            # Initial state
            InitialTopTempF = self.initial_top_temp,
            InitialMiddleTempF = self.initial_middle_temp,
            InitialBottomTempF = self.initial_bottom_temp,
            InitialThermocline1 = self.initial_thermocline1,
            InitialThermocline2 = self.initial_thermocline2,
            # Hourly forecasts
            StartUnixS = self.params.unix_start_s[hour],
            LmpForecast = self.params.lmp_usd_mwh[hour:hour+48],
            DistPriceForecast = self.params.dist_usd_mwh[hour:hour+48],
            RegPriceForecast = self.params.reg_usd_mwh[hour:hour+48],
            OatForecastF = self.params.oat_f[hour:hour+48],
            WindSpeedForecastMph = self.params.wind_speed_mph[hour:hour+48],
            # Hardware
            StorageVolumeGallons = self.params.storage_gallons,
            HpMaxElecKw = self.params.hp_power_kw,
            CopIntercept = self.params.cop_intercept,
            CopOatCoeff = self.params.cop_oat_coeff,
            CopMin = self.params.cop_min,
            # House parameters
            AlphaTimes10 = int(self.params.alpha*10),
            BetaTimes100 = int(self.params.beta*100),
            GammaEx6 = int(self.params.gamma*1e6),
            IntermediatePowerKw = self.params.intermediate_power_kw,
            IntermediateRswtF = self.params.intermediate_rswt_f,
            DdPowerKw = self.params.dd_power_kw,
            DdRswtF = self.params.dd_rswt_f,
            DdDeltaTF = self.params.dd_delta_t_f,
        )

        g = DGraph(flo_params)
        g.solve_dijkstra()
        g.find_initial_node()
        v = DGraphVisualizer(g)
        v.plot()

        next_node: DNode = g.initial_node.next_node
        self.initial_top_temp = next_node.top_temp
        self.initial_middle_temp = next_node.middle_temp
        self.initial_bottom_temp = next_node.bottom_temp
        self.initial_thermocline1 = next_node.thermocline1
        self.initial_thermocline2 = next_node.thermocline2

        edge_to_next_node: DEdge = [e for e in g.edges[g.initial_node] if e.head==next_node][0]
        hour_cost = edge_to_next_node.cost
        hour_elec_used = edge_to_next_node.hp_heat_out / g.params.COP(oat=g.params.oat_forecast[0])
        self.hour_cost_list.append(hour_cost)
        self.hour_elec_used_list.append(hour_elec_used)

        self.total_cost += hour_cost
        self.total_elec_used += hour_elec_used

In [29]:
df = pd.read_csv('simulation_data.csv')

params = SimulationParams(
    # Hourly prices and weather
    unix_start_s = list(df['hour_start_s']),
    usd_mwh = list(df['usd_mwh']),
    usd_mwh_alternative = [160]*len(df),
    oat_f = list(df['oat_f']),
    wind_speed_mph = list(df['ws_mph']),
    # Hardware
    storage_gallons = 360,
    hp_power_kw = 11,
    cop_intercept = 1.5,
    cop_oat_coeff = 0,
    cop_min = 1.5,
    # House parameters
    alpha = 0.0001,
    beta = 0.0001,
    gamma = 0.0001,
    intermediate_power_kw = 11,
    intermediate_rswt_f = 0.5,
    dd_power_kw = 11,
    dd_rswt_f = 0.5,
    dd_delta_t_f = 10,
    # Business
    max_payback_years = 10,
)