![image](https://github.com/eWaterCycle/ewatercycle/raw/main/docs/examples/logo.png)

## This notebook accompanies the publication by Aerts et al. (review) to demonstrate how extensive model studies can be conducted using the eWaterCycle Platform.

## Case study 3: Parallel calibration of the KsatHorFrac Parameter of the wflow sbm hydrological model
In this notebook we show how the KsatHorFrac Parameter can be manually calibrated for the wflow sbm model using eWaterCycle. This experiment is setup as follows: <br>
    1. Generation of ERA5 model specific forcing.<br>
    2. Creation of multiple model instances according to a calibration value interval.<br>
    3. Storage of simulated streamflow output.<br>
    4. Analysis using  USGS streamflow observation based on the KGE objective function (Gupta et al., 2009).<br>
    5. Selection of optimal KsatHorFrac parameter value.<br>
    6. Preperations for evaluation model run.<br>

### Interested in using eWaterCycle, checkout the package https://ewatercycle.readthedocs.io/en/latest/ 

## Import statements
We'll be using the following modules.

In [2]:
import glob
import os
import shutil
import time

import geopandas as gpd
import hvplot.xarray
import HydroErr as he
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import xarray as xr

import ewatercycle.forcing
import ewatercycle.models
import ewatercycle.parameter_sets
from ewatercycle.observation.usgs import get_usgs_data

sns.set_theme(style="whitegrid")

import logging
import warnings
from datetime import date
from pathlib import Path

from pathos.threading import ThreadPool as Pool
# SWITCH TO MULTIPROCESSING STANDARD PYTHON LIB
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

logger = logging.getLogger("esmvalcore")
logger.setLevel(logging.WARNING)
logging.basicConfig(level=logging.WARN)

# Configuration
Here we specify the paths and basin id.

In [3]:
# Set Paths
ROOTDIR = Path(
    "/gpfs/home6/jaerts/eWaterCycle_example_data/wflow_camels_parameters/"
)
AUXDIR = Path("/gpfs/home6/jaerts/eWaterCycle_example_data/camels_input_files/")
OUTDIR = Path("/gpfs/home6/jaerts/eWaterCycle_example_data/ewatercycle_output/")

A loop can be inserted here to run multiple basins and resolutions consecutively.

In [4]:
# Specify Catchment ID
# Create list for running all basins
basin_id = "01123000"

# Specify model instance resolution
# Create list for runnning all basins
resolution = "3km"

# Get basin model instance directory
BASINDIR = Path(glob.glob(f"{ROOTDIR}/{resolution}/*{basin_id}*/")[0])

## Configure Parameter Set
Now we load the eWaterCycle configuration file and add the parameter set information.

In [5]:
# Load eWaterCycle config file
ewatercycle.CFG.load_from_file("./ewatercycle.yaml")

In [6]:
# Set wflow_sbm parameter set info
if ewatercycle.CFG["parameter_sets"] is None:
    ewatercycle.CFG["parameter_sets"] = {}

ewatercycle.CFG["parameter_sets"]["329_camels_06043500_3km"] = dict(
    directory=str(BASINDIR),
    config=f"{str(BASINDIR)}/wflow_sbm_calibration.ini",
    doi="N/A",
    target_model="wflow",
    supported_model_versions={"2020.1.1", "2020.1.2"},
)

# Save to file
ewatercycle.CFG.save_to_file()

PosixPath('/gpfs/home6/jaerts/eWaterCycle_example_notebooks/ewatercycle_example_notebooks/ewatercycle.yaml')

In [7]:
# Load parameter set
parameter_set = ewatercycle.parameter_sets.get_parameter_set("329_camels_06043500_3km")
print(parameter_set)

Parameter set
-------------
name=329_camels_06043500_3km
directory=/gpfs/home6/jaerts/eWaterCycle_example_data/wflow_camels_parameters/3km/13_camels_01123000_3km
config=/gpfs/home6/jaerts/eWaterCycle_example_data/wflow_camels_parameters/3km/13_camels_01123000_3km/wflow_sbm_calibration.ini
doi=N/A
target_model=wflow
supported_model_versions={'2020.1.1', '2020.1.2'}


# Forcing
The model specific ERA5 forcing is generated using the extent of a shapefile.

## Extract basin shapefile

In [8]:
# Read CAMELS shapefile
gdf = gpd.read_file(
    f"{AUXDIR}/camels_basin_shapefiles/HCDN_nhru_final_671.shp", index_col="hru_id"
)

# Fix missing leading zero Basin ID
index = gdf.hru_id.to_list()
index = [str(x) for x in index]
index = ["0" + x if len(x) == 7 else x for x in index]

# Extract sub-shapefile
gdf["basin_id"] = index
gdf = gdf.set_index("basin_id")
gdf = gdf.loc[basin_id]

# Save single basin shapefile
gpd.GeoDataFrame(geometry=[gdf.geometry]).to_file(f"{BASINDIR}/staticgeoms/basin.shp")

## Create Forcing
Here we generate the forcing for a specified time period.

In [9]:
wflow_forcing = ewatercycle.forcing.generate(
    target_model="wflow",
    dataset="ERA5",
    start_time="1996-01-01T00:00:00Z",
    end_time="2005-12-31T00:00:00Z",
    shape=f"{BASINDIR}/staticgeoms/basin.shp",
    model_specific_options={
        "dem_file": f"{BASINDIR}/staticmaps/wflow_dem.map",
    },
)

# Setting Up Model Run


## Get Streamflow Observation Data
First we download the USGS streamflow observation that is unit and timezone corrected. Next we retrieve the station location information.

In [11]:
# Download USGS streamflow data
ds_obs = get_usgs_data(
    basin_id,
    "1996-01-01",
    "2005-12-31",
    cache_dir="/gpfs/home6/jaerts/example_notebook_ewatercycle/obs_data/",
)

# Retrieve station latitude and longitude information
station_lat, station_lon = ds_obs.attrs["location"]

## Specify Calibration Information
We specify the calibration value interval which spawns model instances.

In [14]:
# Specify Manual Calibration KsatHorFrac Values (longer in Aerts et al.)
calibration_interval = [1, 10, 25, 50, 75, 100, 250, 500, 1000]


def set_calibration_parameter(model_instance_dir, parameter_value):
    parameter_file = f"{model_instance_dir}/intbl/KsatHorFrac.tbl"
    print(parameter_file)

    with open(parameter_file) as tablefile:
        table = tablefile.read()
        table = table.replace("100", parameter_value)
        table_new = open(parameter_file, "w")

        table_new.write(table)
    return

## Create list for parallel model run

We pass lists to the function that runs the model on a single core to be implemented on multiple cores.

In [24]:
# Calculate lenght of lists
list_length = len(calibration_interval)

# Create lists
basin_ids = list_length * [basin_id]
resolutions = list_length * [resolution]
station_lats = list_length * [station_lat]
station_lons = list_length * [station_lon]
cfg_dirs = []

# Pass custom string to output directory creation
for calibration_value in calibration_interval:
    cfg_dir = f'{OUTDIR}/wflow_calibration_{basin_id}_{resolution}_ksathorfrac_{calibration_value}'
    cfg_dirs.append(cfg_dir) 

# Parallel calibration run

## Single core function 

Here we set the function that runs the model for a single core as described in the first usecase

In [27]:
def model_calibration_run(
    basin_id,
    resolution,
    station_lat,
    station_lon,
    calibration_value,
    cfg_dir
):

    # Specify model, model version, parameter set, forcing info
    model = ewatercycle.models.Wflow(
        version="2020.1.2", parameter_set=parameter_set, forcing=wflow_forcing
    )

    # Setup the model    
    config_file, config_dir = model.setup(cfg_dir)

    # Set Calibration Parameter Values (KSatHorFrac)
    set_calibration_parameter(config_dir, str(calibration_value))

    # Initialize Model Instance
    model.initialize(config_file)

    # Create empty list for output storage
    output_dict = {}
    output_dict[f"KsatHorFrac_{calibration_value}"] = []

    # Run Loop until model end time
    while model.time < model.end_time:

        # Advance model with single timestep
        model.update()

        # Retrieve simultated streamflow value at observation station location
        output_dict[f"KsatHorFrac_{calibration_value}"].append(
            model.get_value_at_coords(
                "RiverRunoff", lon=[station_lon], lat=[station_lat]
            )[0]
        )

        print(model.time_as_isostr, end="\r")

    model.finalize()

    return output_dict

# Start parallel run

Next we call the single core function using list that is deployed on multiple cores

In [31]:
def parallel_model_calibration(
    basin_ids,
    resolution,
    station_lats,
    station_lons,
    calibration_interval,
    cfg_dirs,
    threads=None,
):
    # Set number of threads (cores) used for parallel run and map threads
    if threads is None:
        pool = Pool()
    else:
        pool = Pool(nodes=threads)
    # Run parallel models
    results = pool.map(
        model_calibration_run,
        basin_ids,
        resolutions,
        station_lats,
        station_lons,
        calibration_interval,
        cfg_dirs,
    )
    return results

In [None]:
%%time
results = {}
results = parallel_model_calibration(
    basin_ids,
    resolution,
    station_lats,
    station_lons,
    calibration_interval,
    cfg_dirs,
    threads=None,
)



In [None]:
results

# Store Output

We start with defining the general output metadata.

In [None]:
metadata = {
    "description": "simulated streamflow",
    "basin_id": basin_id,
    "parameter_set-name": parameter_set.name,
    "parameter_set-doi": parameter_set.doi,
    "parameter_set-target_model": parameter_set.target_model,
    "parameter_set-supported_model_versions": (
        list(parameter_set.supported_model_versions)[0],
        list(parameter_set.supported_model_versions)[1],
    ),
    "forcing-name": wflow_forcing.netcdfinput,
    "forcing-recipe": "recipe_wflow.yml",
    "forcing-diagnostic": "wflow.py",
    "forcing-esmvaltool_version": "2.3.0",
    "forcing-doi": "N/A",
    "author": "Jerom Aerts",
    "affiliation": "Delft University of Technology",
    "contact": "j.p.m.aerts@tudelft.nl",
    "creation date": str(date.today()),
}

Next we store the simulated streamflow grid.

In [None]:
model_instance_output = []

for key, model in models.items():
    print(key)
    # Concatenate xarray data arrays to dataset
    da_grid = xr.concat(streamflow_grid[key], dim="time")

    # Rename the data variable
    da_grid = da_grid.rename(f"{key}_{resolution}_Q")

    # Add basin id as dimension
    da_grid = da_grid.expand_dims(dim="basin_id")
    da_grid = da_grid.assign_coords({"basin_id": (["basin_id"], [basin_id])})

    # Add the metadata
    for key_dict, item_dict in metadata.items():
        da_grid.attrs[f"{key_dict}"] = item_dict

    # Store individual instance output netcdf
    da_grid.to_netcdf(f"{OUTDIR}/{key}_{resolution}_streamflow_grid.nc")

    # Append to list
    model_instance_output.append(da_grid)

# Concatenate model_instances
ds_grid = xr.merge(model_instance_output)
ds_grid.to_netcdf(f"{OUTDIR}/{basin_id}_{resolution}_parallel_calibration_streamflow_grid.nc")

### Check Output

Lets check the output for a single model instance

In [None]:
# Plot interactive plot
ds_grid.KsatHorFrac_1_3km_Q.sel(basin_id=basin_id).hvplot(
    groupby="time",  # adds a widget for time
    clim=(
        ds_grid.KsatHorFrac_1_3km_Q.sel(basin_id=basin_id).min(),
        ds_grid.KsatHorFrac_1_3km_Q.sel(basin_id=basin_id).max(),
    ),
    widget_location="bottom",
)

Next, we store the streamflow timeseries at the basin outlet/observation station location.

In [None]:
datasets = []

for key, model in streamflow_timeseries.items():

    # Get timeseries data
    series = streamflow_timeseries[key]

    # Define data variable
    data_vars = dict({f"{key}_{resolution}_Q": (["time"], series)})

    # define coordinates
    coords = dict({"time": (["time"], time)})

    # Create data array
    da_series = xr.Dataset(data_vars=data_vars, coords=coords)

    # Add basin id as dimension
    da_series = da_series.expand_dims(dim="basin_id")
    da_series = da_series.assign_coords({"basin_id": (["basin_id"], [basin_id])})

    # Add the metadata
    for key_dict, item_dict in metadata.items():
        da_series.attrs[f"{key_dict}"] = item_dict

    # Add basin specific metadata
    da_series.attrs["station_lat"] = station_lat
    da_series.attrs["station_lon"] = station_lon

    # Append to list
    datasets.append(da_series)


# datasets = xr.concat(datasets, dim='index')
ds_series = xr.merge(datasets)

### Check Output

Lets check the output for a single model instance

In [None]:
ds_series.KsatHorFrac_1_3km_Q.sel(basin_id=basin_id).hvplot(
    widget_location="bottom",
)

# Analyse Results

First we need to resample the hourly USGS observation data to daily values.


In [None]:
# Resample Hourly USGS Data to Daily Values
ds_obs = ds_obs.resample(time="1D").mean()

A dataframe is constructed for use with the hydroerr package

In [None]:
# Construct dataframes
df_obs = ds_obs.to_dataframe()
df_sim = ds_series.to_dataframe()

## Calculate Objective Functions

In [None]:
# Calculate Objective Functions
def calculate_objective_functions(
    simulation_dataframe, observation_dataframe, resolution
):
    # Construct empty dataframe and lists for results
    objective_function_dataframe = pd.DataFrame()

    basin_ids = []
    resolutions = []
    model_instances = []
    param_values = []

    kge_values = []
    nse_values = []

    # Iterate output of model instances
    for column in simulation_dataframe:
        # Append run info to lists
        basin_ids.append(simulation_dataframe.index[0][0])
        resolutions.append(resolution)
        model_instances.append(column)
        param_values.append(column.split("_")[1])

        # Merge dataframes
        df = simulation_dataframe[column].to_frame().join(observation_dataframe)

        # Calculate objective functions
        kge_values.append(he.kge_2012(df[column], df["streamflow"].values))
        nse_values.append(he.nse(df[column], df["streamflow"].values))

    # Construct output dataframe
    objective_function_dataframe["basin_id"] = basin_ids
    objective_function_dataframe["resolution"] = resolutions
    objective_function_dataframe["model_instance"] = model_instances
    objective_function_dataframe["KsatHorFrac_value"] = param_values
    objective_function_dataframe["kge_value"] = kge_values
    objective_function_dataframe["nse_value"] = nse_values

    return objective_function_dataframe


df_obj = calculate_objective_functions(df_sim, df_obs, "3km")

# Plot hydrograph

Here we construct the hydrograph for visual inspection

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(15, 10))

params = {
    "legend.fontsize": 30,
    "legend.handlelength": 2,
    "xtick.labelsize": 30,
    "ytick.labelsize": 30,
}

# sns.lineplot(ax=ax, x=df.index, y=df['KsatHorFrac_100'],  data=df, label=key, alpha=0.8)

# Plot Streamflow observations
sns.lineplot(
    ax=ax,
    x=df_obs.index,
    y=df_obs.streamflow,
    data=df_obs,
    linewidth=3.5,
    color="tab:red",
    label="Observations",
    alpha=0.8,
)

# Plot Calibration Interval
df_sim.index = df_sim.index.droplevel()
for column in df_sim:
    sns.lineplot(
        ax=ax,
        x=df_sim.index,
        y=df_sim[column].values,
        data=df_sim,
        label=column,
        alpha=0.8,
        linewidth=2.5,
    )

# Plot objective function results table
df = df_obj.drop(columns="model_instance")
df = df.round(2)
table = plt.table(
    cellText=df.values,
    rowLabels=None,
    colLabels=df.columns,
    bbox=(-0.001, -0.2, 1, 0.1),
)
table.auto_set_font_size(False)
table.set_fontsize(12)


# Set axis and Title
ax.set_xlabel("Date", size=20)
ax.set_ylabel("Q (m3*s-1)", fontsize=20)
ax.set_title(f"Basin ID:{basin_id} - Resolution:{resolution}", size=30)

# Select best calibration Parameter

Now we select and set the best KsatHorFrac parameter based on the KGE objective score

In [23]:
# Sort the objective function results dataframe
df_obj = df_obj.sort_values(by="kge_value", ascending=False)

# Select the ksathorfrac parameter with the highest KGE score
ksathorfrac_param = df_obj.KsatHorFrac_value.values[0]

# Set the parameter value in the parameter set
set_calibration_parameter(BASINDIR, ksathorfrac_param)

NameError: name 'df_obj' is not defined

# Thank you for viewing!

### Interested in using eWaterCycle, checkout the package https://ewatercycle.readthedocs.io/en/latest/ 