In [None]:
def interpolate_control_signals(control_register, t_steps):
    # Create dataframe and filter out points that are too close to each other to improve fit
    t_threshold = 10
    control_signal_df = pd.DataFrame(control_register)
    control_signal_df["next_t"] = control_signal_df['t'].shift(-1)
    control_signal_df = control_signal_df[
        (control_signal_df["next_t"] > (control_signal_df["t"] + t_threshold)) | \
        control_signal_df["next_t"].isnull() # Include last row where next_t is NaN
    ]

    # Do interpolation for each control signal type
    interpolated_controls_df = pd.DataFrame([{ "t": t } for t in t_steps])
    for control_type in control_register[0]:
        if control_type != "t":
            # Fit a B-spline that goes through all points
            X_Y_pairs = np.vstack((control_signal_df["t"], control_signal_df[control_type]))
            tck, u = splprep(X_Y_pairs, s=0.0)

            # Evaluate the B-spline
            x_new, y_new = splev(t_steps, tck)

            # Assign new values to dataframe
            interpolated_controls_df[control_type] = y_new

    return interpolated_controls_df

In [None]:
integrated_airflows = []
for time_period in airflows_at_t_steps:
    for t, airflow in enumerate(time_period):
        try:
            last_t = integrated_airflows[-1]["t"]
        except IndexError:
            last_t = 0
            
        integrated_airflows.append({
            "t": last_t + t,
            "airflow": airflow
        })

integrated_airflows_df = pd.DataFrame(integrated_airflows)

alt.Chart(integrated_airflows_df, width=1000).mark_point().encode(
    alt.X('t'),
    alt.Y("airflow:Q"),
    color=alt.value("blue")
).interactive()

In [None]:
timestep = 24

integrated_airflows = []
for t, airflow in zip(control_t_values[timestep], airflows[timestep]):
    integrated_airflows.append({
        "t": t,
        "airflow": airflow
    })

integrated_airflows_df = pd.DataFrame(integrated_airflows)

# integrated_airflows_df = integrated_airflows_df[integrated_airflows_df["t"] > 2]

integrated_airflows_df["next_t"] = integrated_airflows_df['t'].shift(-1)
integrated_airflows_df = integrated_airflows_df[(integrated_airflows_df["next_t"] > (integrated_airflows_df["t"] + 10)) | integrated_airflows_df["next_t"].isnull()]

# t_steps = np.linspace(0, 900, 15)
# control_function = UnivariateSpline(integrated_airflows_df["t"], integrated_airflows_df["airflow"], s=3)
# airflow_at_t_steps = control_function(t_steps)

# interpolated_airflows = []
# for t, airflow in zip(t_steps, airflow_at_t_steps):
#     interpolated_airflows.append({
#         "t": t,
#         "airflow": airflow
#     })
# interpolated_airflows_df = pd.DataFrame(interpolated_airflows)


X = np.array(integrated_airflows_df["t"])
Y = np.array(integrated_airflows_df["airflow"])
pts = np.vstack((X, Y))
# Find the B-spline representation of an N-dimensional curve
tck, u = splprep(pts, s=0.0)
u_new = np.linspace(u.min(), u.max(), 1000)
# Evaluate a B-spline
x_new, y_new = splev(u_new, tck)

interpolated_airflows = []
for t, airflow in zip(x_new, y_new):
    interpolated_airflows.append({
        "t": t,
        "airflow": airflow
    })
interpolated_airflows_df = pd.DataFrame(interpolated_airflows)



one = alt.Chart(integrated_airflows_df, width=1000).mark_point().encode(
    alt.X('t'),
    alt.Y("airflow"),
    color=alt.value("blue")
).interactive()

two = alt.Chart(interpolated_airflows_df, width=1000).mark_line().encode(
    alt.X('t'),
    alt.Y("airflow"),
    color=alt.value("red")
).interactive()

alt.layer(one, two).resolve_scale(y='independent', color="independent")

In [None]:
filtered_airflows = []
for i, row in integrated_airflows_df.iterrows():
    try:
        next_row = integrated_airflows_df.iloc[i + 1]
    except IndexError:
        next_row = { "t": float("inf") }

    print(next_row["t"], (row["t"] + 1))
    if next_row["t"] > (row["t"] + 1):
        filtered_airflows.append({
            "t": row["t"],
            "airflow": row["airflow"]
        })


X = np.array(integrated_airflows_df["t"])
Y = np.array(integrated_airflows_df["airflow"])
pts = np.vstack((X, Y))
# Find the B-spline representation of an N-dimensional curve
tck, u = splprep(pts, s=0.0)
u_new = np.linspace(u.min(), u.max(), 1000)
# Evaluate a B-spline
x_new, y_new = splev(u_new, tck)

plt.plot(x_new, y_new, 'b--')
plt.show()

In [None]:
transformed_airflows = []

for time_period in airflows_at_t_steps:
    for result in time_period:
        transformed_airflows.append({
            "airflow": result
        })

airflows_df = pd.DataFrame(transformed_airflows)

alt.Chart(airflows_df.reset_index(), width=1000).mark_line().encode(
    alt.X('index'),
    alt.Y("airflow"),
).interactive()

In [None]:
class Greenhouse:
    def __init__(self, 
        time_period: s,
        # target_temp_range: [C, C],
        # target_humidity_range: [RH, RH]
    ):
        self.time_period: s = time_period
        # self.target_temp_range: [C, C] = target_temp_range
        # self.target_humidity_range: [RH, RH] = target_humidity_range

        # Init subsystems
        self.structure = Structure()
        self.dehumidifier = Dehumidifier()
        self.heatpump = HeatPump()
        self.crop = SweetBasil(
            time_period=self.time_period, 
            plants_per_barrel=self.structure.plants_per_barrel, 
            barrel_count=self.structure.barrel_count
        )
        self.light = AdaptiveLighting(time_period, self.structure, self.crop)

        # Init register to store previous period's values
        self.prev_period = {
            "humidity_ratio": 0.0085881067, # 70 RH at 1 bar and 25 Celsius
            "temp": 17,
            "CO2_concentration": 410,
        }

        self.prev_airflows_at_t_steps = []


    def run(self, timestamp, data):
        # SECTION 1: Get input from solar radiation
        # Get distribution of irradiance on different panels (solar and non-solar) of the greenhouse (factoring in sun position, tilt and azimuth angles)
        results: [W, W_per_m2] = self.structure.get_irradiance_by_panel_type(timestamp, data.solarradiation)
        solar_power_on_nonsolar_panels, irradiance_on_solar_panels = results

        # Get electrical energy generated and heat irradiated to the greenhouse
        results: [W, W, W_per_m2] = self.light.solarpanel.run(irradiance_on_solar_panels)
        solar_power_generated, solar_power_transmitted, transmitted_irradiance, transparency, _ = results
        solar_energy_generated: kWh = J_to_kWh(solar_power_generated * self.time_period)

        # Add energy from non-solar panels and portion of energy transmitted by solar panels to get total heat entering the greenhouse
        heat_transfer_rate_inside: W = solar_power_on_nonsolar_panels + solar_power_transmitted

        # Calculate energy used for lighting
        PPFD: umol_per_m2_s = irradiance_to_PPFD(transmitted_irradiance)
        PAR_inside: umol_per_m2 = PPFD * self.time_period
        lighting_results = self.light.run(timestamp, PAR_inside)

        # SECTION 2: Grow plants
        results: [mol_per_s, mol, mol_per_s, mol] = self.crop.grow(1e-6 * lighting_results["actual_PPFD_umol_per_m2_s"] * self.time_period)
        CO2_assimilation_rate, CO2_assimilated, H20_evaporation_rate, H2O_evaporated = results

        # SECTION 3: Deal with resulting CO2, water, heat
        # Calculate net energy
        total_energy_used: kWh = lighting_results["energy_used_for_lighting_kWh"] #+ energy_for_dehumidification + energy_for_heating
        total_energy_generated: kWh = solar_energy_generated
        net_energy: kWh = total_energy_generated - total_energy_used

        # Calculate CO2
        # results: [ppm, m3_per_s] = self.register_CO2(CO2_assimilated)
        # CO2_concentration, airflow = results

        input_values = {
            "H2O_mass_evaporation_rate": 18 * H20_evaporation_rate, # 18: molar mass of H20
            "CO2_assimilation_rate": CO2_assimilation_rate,
            "ambient_data": data, 
            "power_irradiated": transmitted_irradiance * self.structure.irradiated_area,
            "get_heat_transfer_rate": self.structure.get_heat_transfer_rate,
            "structure_volume": self.structure.volume
        }

        airflow_results = self.register_airflow(input_values)
        

        return {
            "natural_PPFD": PPFD,
            "total_energy_used_kWh": total_energy_used,
            "total_energy_generated_kWh": total_energy_generated,
            "net_energy_kWh": net_energy,

            "CO2_assimilation_rate_umol_per_s": CO2_assimilated / time_period * 1e6,

            **lighting_results,
            **airflow_results,
            
            "ambient_humidity": data.humidity,
            "ambient_temp": data.temp
        }

    def register_airflow(self, input_values):
        # Set up initial values for odeint
        init_values = [
            self.prev_period["humidity_ratio"], 
            self.prev_period["temp"], 
            self.prev_period["CO2_concentration"]
        ]

        # List of dicts where the t values and control signals of each integration steps can be stored
        control_register = []

        # Set up timesteps for odeint (we are interested only in the last one)
        t_steps = np.linspace(0, self.time_period, 15)
        t_max = t_steps[-1]

        # Solve diff equations
        results = solve_ivp(
            airflow_model, 
            (0, t_max,), 
            init_values, 
            t_eval=t_steps, 
            method="BDF", 
            dense_output=True, 
            args=(t_max, input_values, control_register, self.prev_airflows_at_t_steps)
        )

        new_humidity_ratio, new_temp, new_CO2_concentration = results["y"].T[-1]

        pressure: Pa = 101325 # TODO: get this from weather data
        new_humidity: RH = psychrolib.GetRelHumFromHumRatio(new_temp, new_humidity_ratio, pressure) * 100

        # Since odeint calculates the steps randomly, needs to be sorted based on t
        control_register = sorted(control_register, key=lambda x: x["t"])

        # Set up and call interpolation function to get control values at t_max
        control_results = {}
        control_t_values = [x["t"] for x in control_register]
        for control_type in control_register[0]:
            control_results[control_type] = control_register[-1][control_type]

        # control_register_df = pd.DataFrame(control_register)
        # control_register_df = control_register_df[control_register_df["t"] > 2]

        # control_signals_at_tmax = {}
        # for control_type in control_register[0]:
        #     if control_type != "t":
        #         control_function = UnivariateSpline(control_register_df["t"], control_register_df[control_type], k=2)
        #         control_signals_at_tmax[control_type] = control_function(t_max)

        # Set new values as starting values for next period
        self.prev_period["humidity_ratio"] = new_humidity_ratio
        self.prev_period["temp"] = new_temp
        self.prev_period["CO2_concentration"] = new_CO2_concentration

        # Interpolate control values for t_steps
        # interpolated_controls_df = interpolate_control_signals(control_register, t_steps)

        # control_signals_at_tmax = {}
        # for control_type in control_register[0]:
        #     if control_type != "t":
        #         control_function = UnivariateSpline(control_register_df["t"], control_register_df[control_type], k=2)
        #         control_signals_at_tmax[control_type] = control_function(t_max)
        

        self.prev_airflows_at_t_steps = interpolated_controls_df["airflow_m3_per_s"]

        return {
            "humidity": new_humidity,
            "humidity_ratio": new_humidity_ratio,
            "temp": new_temp,
            "CO2_concentration": new_CO2_concentration,
            **control_results,
            # **control_signals_at_tmax,
            "intermediate_results": results["y"].T,
            "airflow_at_t_steps": interpolated_controls_df["airflow_m3_per_s"],
            # "control_t_values": control_t_values,
            "airflows": [x["airflow_m3_per_s"] for x in control_register],
            "interpolated_controls_df": interpolated_controls_df
        }



In [None]:
transformed_results = []

for time_period in intermediate_results:
    for result in time_period:
        transformed_results.append({
            "humidity_ratio": result[0],
            "temp": result[1],
            "CO2": result[2]
        })

results_df = pd.DataFrame(transformed_results)


one = alt.Chart(results_df.reset_index(), width=1000).mark_line().encode(
    alt.X('index'),
    alt.Y("humidity_ratio"),
).interactive()

two = alt.Chart(results_df.reset_index(), width=1000).mark_line().encode(
    alt.X('index'),
    alt.Y("temp"),
    color=alt.value("red")
).interactive()

alt.layer(one, two).resolve_scale(y='independent', color="independent")

In [None]:
# OLD STUFF
def register_CO2(self, CO2_assimilated: mol):
    ambient_CO2: ppm = 410
    CO2_assimilated_per_s: mol_per_s = CO2_assimilated / self.time_period
    initial_CO2_concentration: ppm = self.prev_period["CO2"]

    # List of tuples where the t and y values of each integration steps can be stored
    airflows = []

    # Set up model for odeint
    def CO2_model(y, t):
        CO2_concentration: ppm = y[0]

        # Define airflow to minimize difference between ambient and inside CO2
        airflow = (ambient_CO2 - CO2_concentration) / ambient_CO2

        # Avoid duplicates to ensure strictly monotonically increasing list as it is a requirement of InterpolatedUnivariateSpline
        if t not in [x[0] for x in airflows]: 
            airflows.append((t, airflow,))

        CO2_inflow: mol_per_s = ppm_to_amount(ambient_CO2, airflow)
        CO2_outflow: mol_per_s = ppm_to_amount(CO2_concentration, airflow)
        net_change_amount: mol_per_s = CO2_inflow - CO2_assimilated_per_s - CO2_outflow
        net_change_concentration: ppm_per_s = amount_to_ppm(net_change_amount, self.structure.volume)

        return net_change_concentration

    # Set up timesteps for odeint (we are interested only in the last one)
    t_steps = np.linspace(0, self.time_period, 15)

    # Run odeint
    CO2_concentrations_in_period: list[ppm] = odeint(CO2_model, initial_CO2_concentration, t_steps, args=(t_steps[-1],))

    # Get last value
    new_CO2_concentration = CO2_concentrations_in_period[-1][0]

    # Since odeint calculates the steps randomly, needs to be sorted based on t
    airflows = sorted(airflows, key=lambda tup: tup[0])

    # Set up and call interpolation function to get airflow at t_max
    airflow_function = InterpolatedUnivariateSpline([x[0] for x in airflows], [x[1] for x in airflows], k=3)
    airflow_end: m3_per_s = airflow_function(t_steps[-1])

    # Make sure values stay realistic in extreme cases too
    if new_CO2_concentration < 0:
        new_CO2_concentration = 0

    # Save new concentration to be used in next period as starting value
    self.prev_period["CO2"] = new_CO2_concentration

    return new_CO2_concentration, airflow_end

In [None]:
print(f"Total Net Energy: {round(df['net_energy_kWh'].sum(), 2)} kWh")
print(f"Total Wasted PAR: {round(df['wasted_PAR_total_umol'].sum() * 1e6, 2)} mol")

plot_multiline(
    df.resample('D').mean() if is_long else df, 
    ["total_energy_used_kWh", "total_energy_generated_kWh", "net_energy_kWh"],
    width=900, 
    height=235, 
    title="", 
    y_label="kWh", 
    legend_label="", 
    date_format="",
    colors=["blue", "green", "orange"]
) & \
plot_multiline(
    df.resample('D').mean() if is_long else df, 
    ["actual_PPFD_umol_per_m2_s", "wasted_PPFD_umol_per_m2_s", "natural_PPFD"],
    width=900, 
    height=235, 
    title="", 
    y_label="umol / m2 / s", 
    legend_label="", 
    date_format="",
    colors=["blue", "green", "orange"]
)

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=2f5dc715-67f7-4c8c-98f7-a87b736d3338' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>