# Introduction

In this NoteBook, I am just testing to see whether I can create `Ostrich`'s `TPL` files using `MESHFlow`'s Jinja2 templating engines.

This may require some changes in the `Jinja2` templating files to be able to accept strings as values instead of numbers.

In [1]:
import meshflow as mf
import pandas as pd

import re
import json
import os

from typing import (
    Dict,
    List,
    Sequence,
    Union,
)
from pathlib import Path
from io import StringIO

We just directly go ahead and call the templating functions. But before that, it is important to parse the `CLASS` files and build the three distinct dictionaries out of it for the templating functions.

In [2]:
class_file = './wolf-creek-research-basin/mesh/MESH_parameters_CLASS.ini'
hydrology_file = './wolf-creek-research-basin/mesh/MESH_parameters_hydrology.ini'

In [3]:
def _remove_comments(string) -> str:
    '''remove comment strings in the CLASS file strings'''
    #return re.sub(r'\s+\d{2}\s.*$', '', sections[0], flags=re.MULTILINE)
    return re.sub(r'\s+\d{2}\s(?:[^\n ]| (?! ))*$', '', string, flags=re.MULTILINE)

In [4]:
# function to read a section in form of a pandas.DataFrame
def class_section_divide(section: str, **read_csv_kwargs):
    '''Refer to the following link for comprehensive, and hard-coded
    values for the CLASS sections are implemented as there is no other
    way around it.
    '''
    # split lines
    lines = section.splitlines()

    # build a dictionary out of CLASS sections
    class_section = {}
    
    # vegetation parameters
    class_section['veg1'] = "\n".join(lines[:4])
    class_section['veg2'] = "\n".join(lines[4:7])
    
    # surface/hydraulic parameters
    class_section['hyd1'] = lines[7]
    class_section['hyd2'] = lines[8]

    # soil parameters
    class_section['soil'] = "\n".join(lines[9:12])

    # prognostic parameters
    class_section['prog1'] = lines[12] if len(lines[12]) > 0 else ""
    class_section['prog2'] = lines[13] if len(lines[13]) > 0 else ""
    class_section['prog3'] = lines[14] if len(lines[14]) > 0 else ""

    # return dictionary
    return class_section

In [5]:
def _parse_class_meta_data(
    case_section : str,
) -> Dict:
    """Parse the CLASS file's meta-data section to extract
    `info_entry` and `case_entry` dictionaries, necessary
    to run MESHFlow's `meshflow.utility.render_class_template`
    function.

    Parameters
    ----------
    case_section : str
        The section of the CLASS file that contains the meta-data.
        It should be a string containing the first four lines of the
        CLASS file, which are:
        - Title
        - Author
        - Place
        - Case information (centroid latitude, longitude, reference heights, etc.)


    Returns
    -------
    info_entry : Dict
        A dictionary containing the author and location information.
    case_entry : Dict
        A dictionary containing the case information, including:
        - Centroid latitude and longitude
        - Reference heights for wind speed, specific humidity, and air temperature
        - Reference height surface roughness
        - Number of land cover types (NL)
        - Number of soil types (NM)
    """
    # remove comments from the section
    case_section = _remove_comments(case_section)
    
    # hard-coded values based on different lines of the CLASS file
    # the indices refer to line numbers in the section
    title_line = case_section.splitlines()[0]
    author_line = case_section.splitlines()[1]
    place_line = case_section.splitlines()[2]
    case_line = case_section.splitlines()[3]

    # now building dictionaries that MESHFlow needs just here for
    # simplicity
    info_entry = {
        "author": author_line.strip(),
        "location": place_line.strip(),
    }

    # now building the `case_entry` containing extra meta-data information
    # about the data

    # first stripping and splitting the `case_line` string
    case_line = case_line.strip().split()
    # build `case_entry` key-value pairs, note that the keys are hard-coded
    # to match `MESHFlow`'s requirements
    case_entry = {
        "centroid_lat": float(case_line[0]), # float value
        "centroid_lon": float(case_line[1]), # float value
        "reference_height_wndspd": float(case_line[2]), # float value
        "reference_height_spechum_airtemp": float(case_line[3]), # float value
        "reference_height_surface_roughness": float(case_line[4]), # float value
        "NL": int(case_line[-2]), # integer value, number of sub-basins
        "NM": int(case_line[-1]), # integer value, number of GRU blocks
    }
    
    return info_entry, case_entry

In [6]:
def _determine_gru_type(
    line : str
) -> int:
    """
    Return the 1-based column index of the first numeric token that is exactly '1.000'
    (or numerically equal to 1.0 with three decimal places) in a line of mixed data.

    The line may contain:
      - Multiple spaces between columns
      - Trailing non-numeric descriptor fields (tokens containing any letter)
      - Other numeric fields (including things like 05)

    Parsing stops once a token containing any alphabetic character appears, assuming
    the remainder are descriptors rather than data columns.

    Args:
        line: A string containing whitespace-separated columns.

    Returns:
        The 1-based column number where the first 1.000 occurs, or None if not found.
    """
    tokens = line.strip().split()
    slice_len = min(5, len(tokens))
    
    # to track mixed GRU types and also 
    gru_type_sum = 0
    
    # iterate over the first line of the vegetation parameter section
    for i in range(slice_len):
        # if a distinct GRU, look for 1.000 value
        if tokens[i] == "1.000":
            return i + 1  # 1-based

        # Calculate the sum until this for loop breaks
        # or ends
        gru_type_sum += float(tokens[i])

    # FIXME: if sum equals to 1, then that means we deal with a mixed GRU
    #        type, and we will have to add the relevant feature to both
    #        MESHFlow and MESHFIAT;
    #        For now, find the first column without non-zero value
    if gru_type_sum == 1:
        for i in range(slice_len):
            if float(tokens[i]) > 0:
                return i + 1

    # Raise an error if it is not a valid CLASS field
    if gru_type_sum == 0:
        raise ValueError("Invalid CLASS GRU type")


In [7]:
def _parse_class_veg1(
    veg_section : str,
    gru_idx : int,
) -> Dict[str, float]:
    """
    """
    # the `veg_section` must only be 4 lines
    veg_lines = veg_section.splitlines()
    
    if len(veg_lines) != 4:
        raise ValueError("The vegetation section must have exactly 4 lines.")

    # gru index is the 1-based index of the GRU type
    # so the index of the first column is gru_idx - 1
    # for the 5th type, the second section of each block
    # will have a value of `0`
    idx = gru_idx - 1

    # please note that the parameters are hard-coded and match the inputs
    # of MESHFlow's `meshflow.utility.render_class_template` function.
    if 1 <= gru_idx <= 4: # non-barren-land types
        veg_params = {
            # first-line parameters of the block
            'fcan': float(veg_lines[0].strip().split()[idx]),
            'lamx': float(veg_lines[0].strip().split()[idx + 5]),
            # second-line parameters
            'lnz0': float(veg_lines[1].strip().split()[idx]),
            'lamn': float(veg_lines[1].strip().split()[idx + 5]),
            # third-line parameters
            'alvc': float(veg_lines[2].strip().split()[idx]),
            'cmas': float(veg_lines[2].strip().split()[idx + 5]),
            # fourth-line parameters
            'alic': float(veg_lines[3].strip().split()[idx]),
            'root': float(veg_lines[3].strip().split()[idx + 5]),
        }

    elif gru_idx == 5:
        veg_params = {
            # first-line parameters of the block
            'fcan': float(veg_lines[0].strip().split()[idx]),
            'lamx': 0.0,
            # second-line parameters
            'lnz0': float(veg_lines[1].strip().split()[idx]),
            'lamn': 0.0,
            # third-line parameters
            'alvc': float(veg_lines[2].strip().split()[idx]),
            'cmas': 0.0,
            # fourth-line parameters
            'alic': float(veg_lines[3].strip().split()[idx]),
            'root': 0.0,
        }

    else:
        raise ValueError("Invalid GRU index. Must be between 1 and 5.")

    return veg_params

def _parse_class_veg2(
    veg_section : str,
    gru_idx : int,
) -> Dict[str, float]:
    """
    """
    # the `veg_section` must only be 3 lines
    veg_lines = veg_section.splitlines()
    
    if len(veg_lines) != 3:
        raise ValueError("The vegetation section must have exactly 4 lines.")

    # gru index is the 1-based index of the GRU type
    # so the index of the first column is gru_idx - 1
    # for the 5th type, the second section of each block
    # will have a value of `0`
    idx = gru_idx - 1

    # please note that the parameters are hard-coded and match the inputs
    # of MESHFlow's `meshflow.utility.render_class_template` function.
    if 1 <= gru_idx <= 4: # non-barren-land types
        veg_params = {
            # first-line parameters of the block
            'rsmn': float(veg_lines[0].strip().split()[idx]),
            'qa50': float(veg_lines[0].strip().split()[idx + 5]),
            # second-line parameters
            'vpda': float(veg_lines[1].strip().split()[idx]),
            'vpdb': float(veg_lines[1].strip().split()[idx + 5]),
            # third-line parameters
            'psga': float(veg_lines[2].strip().split()[idx]),
            'psgb': float(veg_lines[2].strip().split()[idx + 5]),
        }

    elif gru_idx == 5:
        param_names = ['rsmn', 'qa50', 'vpda', 'vpdb', 'psga', 'psgb']
        veg_params = {k: 0.0 for k in param_names}

    else:
        raise ValueError("Invalid GRU index. Must be between 1 and 5.")

    return veg_params

def _parse_class_hyd1(
    hyd_line : str,
) -> Dict[str, float]:
    """
    """
    # remove comments
    hyd_line = _remove_comments(hyd_line)

    # strip and split based on whitespace
    hyd_line = hyd_line.strip().split()

    # please note that the parameters are hard-coded and match the inputs
    # of MESHFlow's `meshflow.utility.render_class_template` function.
    veg_params = {
        'drn': float(hyd_line[0]),
        'sdep': float(hyd_line[1]),
        'fare': float(hyd_line[2]),
        'dd': float(hyd_line[3]),
    }

    return veg_params

def _parse_class_hyd2(
    hyd_line : str,
) -> Dict[str, float]:
    """
    """
    # remove comments
    hyd_line = _remove_comments(hyd_line)

    # strip and split based on whitespace
    hyd_line = hyd_line.strip().split()

    # please note that the parameters are hard-coded and match the inputs
    # of MESHFlow's `meshflow.utility.render_class_template` function.
    hyd_params = {
        'xslp': float(hyd_line[0]),
        'xdrainh': float(hyd_line[1]),
        'mann': float(hyd_line[2]),
        'ksat': float(hyd_line[3]),
        'mid': " ".join(hyd_line[5:])
    }

    return hyd_params

def _parse_class_soil(
    soil_section : str,
) -> Dict[str, float]:
    """
    """

    # remove comments
    soil_section = _remove_comments(soil_section)

    # strip and split based on whitespace
    soil_lines = soil_section.splitlines()

    # please note that the parameters are hard-coded and match the inputs
    # of MESHFlow's `meshflow.utility.render_class_template` function.
    soil_params = {
        # first line parameters
        'sand1': float(soil_lines[0].strip().split()[0]),
        'sand2': float(soil_lines[0].strip().split()[1]),
        'sand3': float(soil_lines[0].strip().split()[2]),
        # second line parameters
        'clay1': float(soil_lines[1].strip().split()[0]),
        'clay2': float(soil_lines[1].strip().split()[1]),
        'clay3': float(soil_lines[1].strip().split()[2]),
        # third line parameters
        'orgm1': float(soil_lines[2].strip().split()[0]),
        'orgm2': float(soil_lines[2].strip().split()[1]),
        'orgm3': float(soil_lines[2].strip().split()[2]),
    }

    return soil_params

def _parse_class_prog1(
    prog_line : str,
) -> Dict[str, float]:
    """
    """
    # remove comments
    prog_line = _remove_comments(prog_line)

    # strip and split based on whitespace
    prog_line = prog_line.strip().split()

    # please note that the parameters are hard-coded and match the inputs
    # of MESHFlow's `meshflow.utility.render_class_template` function.
    prog_params = {
        'tbar1': float(prog_line[0]),
        'tbar2': float(prog_line[1]),
        'tbar3': float(prog_line[2]),
        'tcan': float(prog_line[3]),
        'tsno': float(prog_line[4]),
        'tpnd': float(prog_line[5]),
    }

    return prog_params

def _parse_class_prog2(
    prog_line : str,
) -> Dict[str, float]:
    """
    """
    # remove comments
    prog_line = _remove_comments(prog_line)

    # strip and split based on whitespace
    prog_line = prog_line.strip().split()

    # please note that the parameters are hard-coded and match the inputs
    # of MESHFlow's `meshflow.utility.render_class_template` function.
    prog_params = {
        'thlq1': float(prog_line[0]),
        'thlq2': float(prog_line[1]),
        'thlq3': float(prog_line[2]),
        'thic1': float(prog_line[3]),
        'thic2': float(prog_line[4]),
        'thic3': float(prog_line[5]),
        'zpnd': float(prog_line[6]),
    }

    return prog_params

def _parse_class_prog3(
    prog_line : str,
) -> Dict[str, float]:
    """
    """
    # remove comments
    prog_line = _remove_comments(prog_line)

    # strip and split based on whitespace
    prog_line = prog_line.strip().split()

    # please note that the parameters are hard-coded and match the inputs
    # of MESHFlow's `meshflow.utility.render_class_template` function.
    prog_params = {
        'rcan': float(prog_line[0]),
        'scan': float(prog_line[1]),
        'sno': float(prog_line[2]),
        'albs': float(prog_line[3]),
        'rhos': float(prog_line[4]),
        'gro': float(prog_line[5])
    }

    return prog_params

In [8]:
def _analyze_class_file(
    class_file: Union[os.PathLike, str],
) -> Dict[str, Union[Dict, str]]:
    """
    Analyze the CLASS file and return a dictionary containing the parsed sections.

    Parameters
    ----------
    class_file : Union[PathLike | str]
        The path to the CLASS file to be analyzed.

    Returns
    -------
    Tuple[Dict[str, Union[Dict, str]]]
        A dictionary containing the parsed sections of the CLASS file.
    """
    # read the text file
    text = Path(class_file).read_text(encoding="utf-8")
    
    # Split where there is at least one completely blank line (possibly with spaces)
    sections = re.split(r'\r?\n\s*\r?\n', text.strip())
    
    # first section is typically the information section
    # the middle sections are CLASS computational unit blocks, each
    #     containing vegetation, soil, hydrology, and prognostic parameters
    # the last section are the dates that should not be processed and 
    #     its content does not matter for the analysis

    # building dictionaries out of the first section needed for 
    # MESHFLOW's `meshflow.utility.render_class_template` function
    info_entry, case_entry = \
        _parse_class_meta_data(sections[0])

    # create an empty gru_entry dictionary to be further
    # populated by the following iterative loop
    gru_entry = {}

    # iterating over the sections until the last one
    for idx, section in enumerate(sections[1:-1], start=1):
        # divide the section into a dictionary of sections
        class_section = class_section_divide(section=section)

        # determine GRU type, based on CLASS assumptions:
        #    1. needleleaf forest
        #    2. broadleaf forest
        #    3. cropland
        #    4. grassland
        #    5. urban, barren land, or imprevious area
        section_landcover_type = _determine_gru_type(
            line=class_section['veg1'].splitlines()[0],
        )
        # based on the number extracted above, we can name the
        # GRU class
        class_name_dict = {
            1: "needleleaf",
            2: "broadleaf",
            3: "crop",
            4: "grassland",
            5: "urban",
        }

        # parse the sections -- hard-coded as there are no
        # other alternatives
        veg1_params = _parse_class_veg1(
            veg_section=class_section['veg1'],
            gru_idx=section_landcover_type,
        )
        veg2_params = _parse_class_veg2(
            veg_section=class_section['veg2'],
            gru_idx=section_landcover_type,
        )
        hyd1_params = _parse_class_hyd1(
            hyd_line=class_section['hyd1'],
        )
        hyd2_params = _parse_class_hyd2(
            hyd_line=class_section['hyd2'],
        )
        soil_params = _parse_class_soil(
            soil_section=class_section['soil'],
        )
        prog1_params = _parse_class_prog1(
            prog_line=class_section['prog1'],
        )
        prog2_params = _parse_class_prog2(
            prog_line=class_section['prog2'],
        )
        prog3_params = _parse_class_prog3(
            prog_line=class_section['prog3'],
        )

        # make a list of parameters for easier literal unpacking inside
        # the gru_entry dictionary
        param_list = [
            veg1_params,
            veg2_params,
            hyd1_params,
            hyd2_params,
            soil_params,
            prog1_params,
            prog2_params,
            prog3_params,
        ]

        # make sure to make an exception for water-like land covers
        if 'water' in hyd2_params['mid'].lower():
            class_type = 'water'
        elif 'snow' in hyd2_params['mid'].lower():
            class_type = 'water'
        elif 'ice' in hyd2_params['mid'].lower():
            class_type = 'water'
        else:
            class_type = class_name_dict[section_landcover_type]

        # adding class type info
        gru_entry[idx] = {
            'class': class_type,
        }
        # adding parameters
        gru_entry[idx].update({k: v for d in param_list for k, v in d.items()})

    return case_entry, info_entry, gru_entry

In [9]:
case_entry, info_entry, gru_entry = _analyze_class_file(class_file=class_file)

Now that we have the necessary dictionary, we can reproduce the CLASS file:

In [10]:
c = mf.utility.render_class_template(
    class_case=case_entry,
    class_info=info_entry,
    class_grus=gru_entry,
)

Let's also analyze the `hydrology` file and build that as well:

In [11]:
def iter_sections(
    text: str,
    drop_separators: bool=True,
):
    # default re directives
    HEADER_RE = re.compile(r'^#{3,}\s*(.*?)\s*#*\s*$', re.MULTILINE)
    SEP_LINE_RE = re.compile(r'^-{3,}#.*$')

    # defining matching headers
    matches = list(HEADER_RE.finditer(text))
    def not_sep(line):
        return not (drop_separators and SEP_LINE_RE.match(line))

    if not matches:
        body = "\n".join(l for l in text.splitlines() if not_sep(l)).strip()
        if body:
            yield ("Preamble", body)
        return

    # preamble
    first_start = matches[0].start()
    if first_start > 0:
        pre_lines = [l for l in text[:first_start].splitlines() if not_sep(l)]
        pre = "\n".join(pre_lines).strip()
        if pre:
            yield ("Preamble", pre)

    # extracting sections
    for i, m in enumerate(matches):
        header = m.group(1).strip()
        body_start = m.end()
        body_end = matches[i+1].start() if i+1 < len(matches) else len(text)
        block = text[body_start:body_end]
        lines = [l for l in block.splitlines() if not_sep(l)]
        body = "\n".join(lines).strip('\n')
        yield (header, body)

def hydrology_section_divide(
    hydrology_file: os.PathLike | str,
) -> List[str]:
    """
    """
    text = Path(hydrology_file).read_text(encoding="utf-8")
    sections = [b for h, b in iter_sections(text)]

    return sections

In [12]:
def _analyze_hydrology(
    hydrology_file : Union[os.PathLike, str],
) -> Dict[str, Union[int, float]]:
    """
    """
    # extract sections from the hydrology file
    sections = hydrology_section_divide(hydrology_file)
    
    # first, the routing dictionary
    routing_df = pd.read_csv(StringIO(sections[2]), comment='#', sep='\s+', index_col=0, skiprows=1, header=None)
    routing_df.index = routing_df.index.str.lower()
    # we should return a list of values
    routing_dict = [v for v in routing_df.to_dict().values()]

    # and second, the hydrology dictionary
    hydrology_df = pd.read_csv(StringIO(sections[4]), comment='#', sep='\s+', index_col=0, skiprows=2, header=None)
    hydrology_df.index = hydrology_df.index.str.lower()
    # and we return a dictionary of this
    hydrology_dict = hydrology_df.to_dict()
    
    return routing_dict, hydrology_dict

In [13]:
routing_dict, hydrology_dict = _analyze_hydrology(hydrology_file)

In [14]:
d = mf.utility.render_hydrology_template(
    routing_params=routing_dict,
    hydrology_params=hydrology_dict
)

Here, we try to make template files that `Ostrich` can read, as an example. The choice of `Ostrich` is just an example, and can be anything else. A friendly reminder to NOT hyperventilate.

____

Let's focus on the first dictionary, `gru_entry` describing `CLASS` parameters:

In [15]:
# custom typehints
NameType = Union[str, int, float]

In [16]:
def param_name_gen(
    computational_unit: NameType,
    name: NameType,
) -> str:
    """Generalized method to template parameter names
    based on hydrological computational unit (gru, hru,
    etc.) and the name of the parameter
    """
    # making strings
    _unit = str(computational_unit)
    _name = str(name)

    # A naming template like the following can be
    # generalized to all models: _+`_unit`+`_name`
    param_name = '_' + _unit.upper() + _name.upper()
    
    return param_name

In [17]:
def param_list_gen(
    names : Dict[NameType, Sequence[NameType]],
) -> Dict[NameType, Dict[NameType, NameType]]:
    """Creating template names for the collection of parameters
    entering calibration experiments.

    Parameters
    ----------
    names : dict
        A dictionary of names with keys set as computational unit of a
        model of interest, and the values being the sequence of model
        names to enter the calibration process.

    Returns
    -------
    dict
        A similar dictionary to `names` but also providing target
        template names for each parameters.
    """
    # creating an empty dictionary to hold the template names
    template_names = {}
    
    # iterating over the names keys
    for unit in names.keys():
        # creating names for each parameter value in the sequence
        template_names[unit] = {
            name: param_name_gen(unit, name) for name in names[unit]
        }

    return template_names

In [18]:
params = {
    1: ['sno', 'rhos', 'clay1', 'sand1', 'clay3'],
    3: ['lamx', 'cmas', 'clay1', 'clay2'],
}

In [19]:
template_params = param_list_gen(params)
template_params

{1: {'sno': '_1SNO',
  'rhos': '_1RHOS',
  'clay1': '_1CLAY1',
  'sand1': '_1SAND1',
  'clay3': '_1CLAY3'},
 3: {'lamx': '_3LAMX',
  'cmas': '_3CMAS',
  'clay1': '_3CLAY1',
  'clay2': '_3CLAY2'}}

Now that I have new names for parameters of certain computational units (in MESH is GRU -- don't hyperventilate MC), it's time to update the parameter values and make template files for `OSTRICH`.

In [20]:
# iterate over the computational units
for unit in template_params.keys():
    # update the values of parameters in each unit
    unit_params = template_params[unit]
    for p in unit_params.keys():
        if p in gru_entry[unit].keys():
            # updating the target gru_entry dictionary
            gru_entry[unit][p] = unit_params[p]

Let's dump this into a JSON file and see how it looks:

In [21]:
# create the directory if doesn't exist
os.makedirs('./junk_files', exist_ok=True)
# dump the data as a JSON file
with open('./junk_files/class.json', 'w') as f:
    json.dump(gru_entry, f, indent=2)

____

Now, we have manually changed the `class_changed.json` file, let's read it back and see if we can decode it properly:

In [22]:
# Precompile regexes for speed/readability
_INT_RE = re.compile(r'^[-+]?\d+$')
_FLOAT_RE = re.compile(
    r"""^[-+]?(                # optional sign
        (?:\d+\.\d*|\d*\.\d+)  # something with a decimal point
        (?:[eE][-+]?\d+)?      # optional exponent
        |
        \d+[eE][-+]?\d+        # or integer with exponent (e.g. 1e6)
    )$""",
    re.X
)

def parse_numeric_string(s: str):
    """
    Try to interpret a numeric-looking string as int or float.
    Return the converted number, or the original string if not numeric.
    """
    if _INT_RE.match(s):
        # Keep as int if it fits typical Python int (Python int is unbounded anyway)
        return int(s)
    if _FLOAT_RE.match(s):
        # Anything with decimal point or exponent
        return float(s)
    return s  # not numeric-looking

def convert_numeric_strings(obj):
    """
    Recursively walk lists/dicts and convert numeric-like strings.
    """
    if isinstance(obj, dict):
        return {k: convert_numeric_strings(v) for k, v in obj.items()}
    if isinstance(obj, list):
        return [convert_numeric_strings(v) for v in obj]
    if isinstance(obj, str):
        return parse_numeric_string(obj.strip())
    return obj  # leaves int, float, bool, None, etc. untouched


def make_object_hook():
    def object_hook(d):
        for k, v in d.items():
            d[k] = convert_numeric_strings(v)  # reuse earlier function
        return d
    return object_hook

with open('./junk_files/class_changed.json', "r", encoding="utf-8") as f:
    gru_entry_modified = json.load(f, object_hook=make_object_hook())

In [23]:
c_modified = mf.utility.render_class_template(
    class_case=case_entry,
    class_info=info_entry,
    class_grus=gru_entry_modified,
)

____

Now, we have to make a Jinja2 template of the parameters, and their ranges:

In [24]:
# third-party libraries
from jinja2 import (
    Environment,
    FileSystemLoader,
    PackageLoader,
)

# built-in libraries
import os
import json
import copy
import time

# global variables and helper functions
def raise_helper(msg):
    """Jinja2 helper function to raise exceptions."""
    raise Exception(msg)

# Jinja2 environment setup
environment = Environment(
    # loader=PackageLoader("meshflow", "templates"),
    loader=FileSystemLoader('../src/fiatmodel/calibration/ostrich/templates/'),
    trim_blocks=True,
    lstrip_blocks=True,
    line_comment_prefix='##',
)

environment.globals['raise'] = raise_helper

In [25]:
gru_entry_bounds = {
    1: {
        'sno': [0.0, 0.5],
        'rhos': [0.0, 0.5],
        'clay1': [10, 50],
        'sand1': [30, 70],
        'clay3': [20, 30],
    },
    3: {
        'lamx': [0.0, 1.0],
        'cmas': [0.0, 1.0],
        'clay2': [20, 60],
        'clay1': [0, 10],
    },
}

____

Now, working on constraints, notice that this is a `MESH` specific workflow, and the generalized form from this example should be applicable to other models as well, and calibration parts should be able to interpret this.

In [26]:
gru_entry_bounds

{1: {'sno': [0.0, 0.5],
  'rhos': [0.0, 0.5],
  'clay1': [10, 50],
  'sand1': [30, 70],
  'clay3': [20, 30]},
 3: {'lamx': [0.0, 1.0],
  'cmas': [0.0, 1.0],
  'clay2': [20, 60],
  'clay1': [0, 10]}}

In [27]:
# define a list of parameters that need to be included in contraints
# these are MESH-specific
constraints_params_template = ['clay', 'sand']
# and building invidiual parameters present in all MESH configurations
constraint_params = []

# default is assuming MESH has 3 soil layers
for i in range(1, 4):
    # iterate over the parameter template values
    for p in constraints_params_template:
        # create the parameter name
        param_name = f"{p.lower()}{i}"
        # append to the list
        constraint_params.append(param_name)


In [28]:
constraint_params

['clay1', 'sand1', 'clay2', 'sand2', 'clay3', 'sand3']

In [29]:
gru_entry_bounds

{1: {'sno': [0.0, 0.5],
  'rhos': [0.0, 0.5],
  'clay1': [10, 50],
  'sand1': [30, 70],
  'clay3': [20, 30]},
 3: {'lamx': [0.0, 1.0],
  'cmas': [0.0, 1.0],
  'clay2': [20, 60],
  'clay1': [0, 10]}}

In [30]:
# iterate over `gru_entry`; the reason to iterate over `gru_entry` elements will be investigated later
# as there are multiple dictionary to go over, so this needs to be logically detected and targetted.

# calibration constraints for each computation unit
calibration_constraints = {}

for unit in gru_entry_bounds.keys():
    # creating a set of parameters for the computational
    # unit to be calibrated
    calibrated_set = set(gru_entry_bounds[unit].keys())
    
    # check whether any of `constrain_params` elements are available
    # in each computational unit's set of parameters
    match = [x for _, x in enumerate(constraint_params) if x in calibrated_set]
    
    # set it aside if match is found
    if match is not None:
        calibration_constraints[unit] = match

In [31]:
calibration_constraints

{1: ['clay1', 'sand1', 'clay3'], 3: ['clay1', 'clay2']}

____

There are two types of dictionaries that we need to deal with during the calibration process:
1. `parameters` dictionary and
2. `calibration_bounds` dictionary.

All the elements in `calibration_bounds` necessarily need to be present in `parameters` dictionary.

In this example, I just focus on the `parameters:gru_entry` and `calibration_bounds:gru_entry_bounds` dictionaries.

To derive the contstraints for soil parameters, we have to have a vivid idea of `clay`, `sand` and `silt` values. In `MESH`, only `clay` and `sand` values are explicitely defined, and `silt = 100 - clay - sand` values.

Therefore, constraints are defined as follows:

`clay_x = (_XCLY_ / _XSUM_SOIL_)`

and 

`_XSUM_SOIL_ = _XCLY_ + _XSND_ + _XSLT_`

And all values are in percentage.

Note that, one or two soil parameters can be calibrated, for each soil layer. There is no limitation.

____

In [32]:
# defining bounds for various parameters

## Hydrology parameters

In [33]:
hydro_params = {
    1: ['zsnl', 'zplg'],
    12: ['zpls', 'zsnl', 'zplg'],
}

In [34]:
template_hydro_params = param_list_gen(hydro_params)
template_hydro_params

{1: {'zsnl': '_1ZSNL', 'zplg': '_1ZPLG'},
 12: {'zpls': '_12ZPLS', 'zsnl': '_12ZSNL', 'zplg': '_12ZPLG'}}

In [35]:
# iterate over the computational units
for unit in template_hydro_params.keys():
    # update the values of parameters in each unit
    unit_params = template_hydro_params[unit]
    for p in unit_params.keys():
        if p in hydrology_dict[unit].keys():
            # updating the target gru_entry dictionary
            hydrology_dict[unit][p] = unit_params[p]

In [36]:
hydrology_dict

{1: {'zsnl': '_1ZSNL', 'zpls': 0.109, 'zplg': '_1ZPLG', 'iwf': 1.0},
 2: {'zsnl': 0.134, 'zpls': 0.109, 'zplg': 0.312, 'iwf': 1.0},
 3: {'zsnl': 0.172, 'zpls': 0.122, 'zplg': 0.223, 'iwf': 1.0},
 4: {'zsnl': 0.578, 'zpls': 0.051, 'zplg': 0.13, 'iwf': 1.0},
 5: {'zsnl': 0.257, 'zpls': 0.09, 'zplg': 0.26, 'iwf': 1.0},
 6: {'zsnl': 0.057, 'zpls': 0.021, 'zplg': 0.02, 'iwf': 1.0},
 7: {'zsnl': 0.057, 'zpls': 0.021, 'zplg': 0.02, 'iwf': 1.0},
 8: {'zsnl': 0.21, 'zpls': 0.134, 'zplg': 0.134, 'iwf': 1.0},
 9: {'zsnl': 0.1, 'zpls': 0.13, 'zplg': 0.13, 'iwf': 1.0},
 10: {'zsnl': 0.35, 'zpls': 0.09, 'zplg': 0.26, 'iwf': 1.0},
 11: {'zsnl': 0.11, 'zpls': 0.09, 'zplg': 0.26, 'iwf': 0.0},
 12: {'zsnl': '_12ZSNL', 'zpls': '_12ZPLS', 'zplg': '_12ZPLG', 'iwf': 1.0}}

In [37]:
hydrology_dict_bounds = {
    1: {
        'zsnl': [0, 10],
        'zplg': [5, 15],
       },
    12: {
        'zpls': [0, 42],
        'zsnl': [0, 10],
        'zplg': [5, 15],
    },
}

## Routing parameters

In [38]:
routing_params = {
    1: ['r2n', 'r1n'],
    3: ['pwr', 'flz'],
}

In [39]:
routing_template_params = param_list_gen(routing_params)
routing_template_params

{1: {'r2n': '_1R2N', 'r1n': '_1R1N'}, 3: {'pwr': '_3PWR', 'flz': '_3FLZ'}}

In [40]:
# iterate over the computational units
for unit in routing_template_params.keys():
    # update the values of parameters in each unit
    unit_params = routing_template_params[unit]
    for p in unit_params.keys():
        if p in routing_dict[unit - 1].keys():
            # updating the target gru_entry dictionary
            routing_dict[unit - 1][p] = unit_params[p]

In [41]:
routing_dict_bounds = {
    1: {
        'r2n': [0.0, 1.0],
        'r1n': [0.0, 1.0],
    },
    3: {
        'pwr': [0.0, 1.0],
        'flz': [0.0, 1.0],
    },

}

## CLASS parameters

In [42]:
# Already done above, as constrains also needed to be defined right after

## Calibration file generation

In [43]:
calibration_config = {
    'random_seed': int(time.time()),
    'algorithm': 'DDS',
    'algorithm_specs': { # use calibration software specific keys
        'PerturbationValue': 0.2,
        'MaxIteration': 10_000,
        'UseRandomParamValue': None,
    },
    'metric': 'kge_2012',
    'parameters': {
        'gru_entry': gru_entry,
        'hydrology_dict': hydrology_dict,
        'routing_dict': routing_dict,
    },
    'calibration_bounds': {
        'gru_entry': gru_entry_bounds,
        'hydrology_dict': hydrology_dict_bounds,
        'routing_dict': routing_dict_bounds,
        # 'routing_dict': None,
    },
    'constraints': {
        'gru_entry': calibration_constraints,
        'hydrology_dict': None,
        'routing_dict': None,
    },
}

In [44]:
template_file = 'ostIn.txt.jinja2'

In [45]:
routing_dict

[{'r2n': '_1R2N', 'r1n': '_1R1N', 'pwr': 1.361, 'flz': 4.2e-05},
 {'r2n': 0.05, 'r1n': 0.119, 'pwr': 1.361, 'flz': 4.2e-05},
 {'r2n': 0.05, 'r1n': 0.119, 'pwr': '_3PWR', 'flz': '_3FLZ'},
 {'r2n': 0.05, 'r1n': 0.119, 'pwr': 1.361, 'flz': 4.2e-05},
 {'r2n': 0.05, 'r1n': 0.119, 'pwr': 1.361, 'flz': 4.2e-05}]

In [46]:
routing_dict_bounds

{1: {'r2n': [0.0, 1.0], 'r1n': [0.0, 1.0]},
 3: {'pwr': [0.0, 1.0], 'flz': [0.0, 1.0]}}

In [47]:
# create the template environment
template = environment.get_template(template_file)

# default dictionaries for each calibration software
import default_dicts as default_dicts
template.globals["default_dicts"] = default_dicts

# create content
content = template.render(
    info=calibration_config,
)

TemplateNotFound: 'ostIn.txt.jinja2' not found in search path: '../src/fiatmodel/calibration/ostrich/templates/'

In [None]:
print(content)