In [1]:
import xarray as xr
import dask.array as da
import pandas as pd
import numpy as np

In [None]:
def get_daylight(time, time_dims, lat, lat_dims):
    """
    Calculate the number of daylight hours for a given day of the year and latitude.

    Args:
        time (Union[da.Array, np.ndarray]): datetime
        lat: (Union[da.Array, np.ndarray]), latitude in decimal degrees

    Returns:
        daylit: dask.array or numpy array, the number of daylight hours
    """
   # Ensure the day of the year is within bounds
    j_date = time.dt.dayofyear
    j_date = da.clip(j_date, 1, 365)
    # Convert latitude to radians
    phi = da.radians(lat.values)
    # Calculate solar declination angle (in radians)
    decl = 0.41008 * da.sin((j_date - 82) * da.radians(1))
    # Reshape to enable broadcasting
    phi = lat.expand_dims({"valid_time": time_dims})  # Expand for valid_time
    decl = time.dt.dayofyear.expand_dims({"latitude": lat_dims})  # Expand for latitude
    # Compute daylight hours for all (latitude, declination) combinations
    daylit = 24 * (1 - da.arccos(da.clip(da.tan(da.radians(phi)) * da.tan(da.radians(decl)), -1, 1)) / da.pi)
    # Ensure the result is real (no complex values)
    daylit = da.real(daylit)
    return daylit

def get_eqilibrium_moisture_content(rh, t):
    emc = da.where(
        condition=rh > 50,
        x=21.0606 + 0.005565 * rh**2 - 0.00035 * rh * t - 0.483199 * rh,
        y=rh,
    )
    emc = da.where(
        condition=(rh > 10) & (rh <= 50), 
        x=2.22749 + 0.160107 * rh - 0.014784 * t, 
        y=emc,
    )
    return da.where(
        condition=rh <= 10, 
        x=0.03229 + 0.281073 * rh - 0.000578 * rh * t, 
        y=emc,
    )

def get_daily_aggregation(ds):
    daily_ds = ds.resample(valid_time="1D").map(
        lambda x: xr.Dataset(
            {
                "t2m_max": x["t2m"].max(dim="valid_time"),
                "t2m_min": x["t2m"].min(dim="valid_time"),
                "rh_max": x["rh"].max(dim="valid_time"),
                "rh_min": x["rh"].min(dim="valid_time"),
            }
        )
    )
    emc_daytime = get_eqilibrium_moisture_content(daily_ds["rh_min"], daily_ds["t2m_max"])
    emc_nighttime = get_eqilibrium_moisture_content(daily_ds["rh_max"], daily_ds["t2m_min"])
    daylight = (
        get_daylight(
            daily_ds["valid_time"], 
            daily_ds.dims["valid_time"], 
            daily_ds["latitude"], 
            daily_ds.dims["latitude"],
        )
    )
    daylight = daylight.expand_dims(dim={"longitude": daily_ds.dims["longitude"]}, axis=-1)
    daily_ds["emc"] = (daylight * emc_daytime + (24.0 - daylight) * emc_nighttime) / 24.0

    tp_inches = ds["tp"] * 39.3701
    WETRAT = 0.25 if CLIM_CLASS < 3 else 0.05
    daily_ds["tp_duration"] = (
        ((tp_inches / WETRAT) + 0.49).round()
        .resample(valid_time="1D")
        .sum()
        .clip(max=8)
    )
    
    return daily_ds

In [None]:
daily_ds = get_daily_aggregation(ds)
daily_ds.chunk({"valid_time": -1, "latitude": -1, "longitude": -1}).to_zarr("era5_derived_daily.zarr", mode="w")
daily_ds = xr.open_dataset("era5_derived_daily.zarr", chunks={"valid_time": -1, "latitude": -1, "longitude": -1})

tp_duration = daily_ds["tp_duration"]
emc = daily_ds["emc"]
daily_ds["boundary_100"] = ((24 - tp_duration) * emc + (0.5 * tp_duration + 41) * tp_duration) / 24
boundary_1000 = ((24 - tp_duration) * emc + (2.7 * tp_duration + 76) * tp_duration) / 24
bvave = boundary_1000.rolling(valid_time=7, center=False).mean()
daily_ds["bvave"] = xr.where(bvave.isnull(), 10 + 5 * CLIM_CLASS, bvave)
daily_ds

In [None]:
# daily_ds[["boundary_100", "bvave"]].to_zarr("dfmc_input.zarr")
dfmc_input_ds = xr.open_dataset("dfmc_input.zarr", chunks="auto")
ddf = dfmc_input_ds[["boundary_100", "bvave"]].to_dask_dataframe()
ddf = ddf.repartition(npartitions="auto")  # Adjust dynamically based on data size
df = ddf.compute()
df.head()

In [None]:
def compute_fm100(df):
    """
    Compute 100-hour fuel moisture (FM100) using a recursive approach 
    within each (latitude, longitude) group.

    Args:
        df (pandas.DataFrame): A DataFrame containing boundary conditions.

    Returns:
        pandas.DataFrame: The DataFrame with computed FM100 values.
    """
    fr100 = 0.3156  # Moisture retention factor
    ymc_init = 10   # Initial FM100 (starting moisture content)

    # Ensure time is sorted within each group
    df = df.sort_values("valid_time").copy()

    # Initialize ymc memory
    ymc = ymc_init
    fm100_values = []

    # Apply recursive calculation across rows
    for _, row in df.iterrows():
        fm100 = (row["boundary"] - ymc) * fr100 + ymc
        fm100_values.append(fm100)
        ymc = fm100  # Carry forward

    # Assign computed values to DataFrame
    df["fm100"] = fm100_values
    return df
    
# Apply function per (latitude, longitude) group
df = df.rename(columns={"boundary_100": "boundary"}).groupby(["latitude", "longitude"], group_keys=False).apply(compute_fm100)

# Check the structure
print(df.head())

In [None]:
def compute_fm1000(df):
    """
    Compute 1000-hour fuel moisture (FM1000) using a recursive approach 
    within each (latitude, longitude) group.

    Args:
        df (pandas.DataFrame): A DataFrame containing bvave values.

    Returns:
        pandas.DataFrame: The DataFrame with computed FM1000 values.
    """
    fr1 = 0.3068  # Moisture retention factor
    tmois_init = 10  # Initial FM1000 (starting moisture content)

    # Ensure time is sorted within each group
    df = df.sort_values("valid_time").copy()

    # Initialize tmois rolling memory (7-day history)
    tmois = [tmois_init] * 7
    fm1000_values = []

    # Apply recursive calculation across rows
    for _, row in df.iterrows():
        # Compute FM1000 using first value of tmois (most recent 7-day memory)
        fm1000 = tmois[0] + (row["bvave"] - tmois[0]) * fr1
        fm1000_values.append(fm1000)

        # Shift tmois values forward
        tmois = tmois[1:] + [fm1000]  # Remove first, append latest fm1000 value

    # Assign computed values to DataFrame
    df["fm1000"] = fm1000_values
    return df
    
df = df.groupby(["latitude", "longitude"], group_keys=False).apply(compute_fm1000)
df.head()


In [None]:
dfmc_ds = (
    df.drop(columns=["boundary", "bvave"])
    .set_index(["valid_time", "latitude", "longitude"])
    .to_xarray()
)
dfmc_ds


In [None]:
dfmc_ds.to_zarr("dfmc.zarr")