Basic imports and path management

In [26]:
import pathlib
import sys
import time
import glob
from tqdm import tqdm
import multiprocessing

root = pathlib.Path("../..").resolve()
sys.path.append(str(root.joinpath("src", "python_framework_v02")))
sys.path.append(str(root.joinpath("src", "python_framework_v01")))
sys.path.append(".")
import nhd_io as nio
import compute_nhd_routing_SingleSeg as tr
import nhd_network_utilities_v01 as nnu
import nhd_reach_utilities as nru

custom_input_folder = root.joinpath("test", "input", "json")
custom_input_file = "florence_933020089.json"
run_pocono2_test = None

Read the primary data input from json

In [27]:
supernetwork_parameters = None
waterbody_parameters = None
if custom_input_file:
    (
        supernetwork_parameters,
        waterbody_parameters,
        forcing_parameters,
        restart_parameters,
        output_parameters,
        run_parameters,
    ) = nio.read_custom_input(custom_input_folder.joinpath(custom_input_file))

    break_network_at_waterbodies = run_parameters.get(
        "break_network_at_waterbodies", None
    )

    dt = run_parameters.get("dt", None)
    nts = run_parameters.get("nts", None)
    qts_subdivisions = run_parameters.get("qts_subdivisions", None)
    debuglevel = -1 * int(run_parameters.get("debuglevel", 0))
    verbose = run_parameters.get("verbose", None)
    showtiming = run_parameters.get("showtiming", None)
    percentage_complete = run_parameters.get("percentage_complete", None)
    do_network_analysis_only = run_parameters.get("do_network_analysis_only", None)
    assume_short_ts = run_parameters.get("assume_short_ts", None)
    parallel_compute = run_parameters.get("parallel_compute", None)
    cpu_pool = run_parameters.get("cpu_pool", None)
    sort_networks = run_parameters.get("sort_networks", None)

    csv_output = output_parameters.get("csv_output", None)
    nc_output_folder = output_parameters.get("nc_output_folder", None)

    qlat_const = forcing_parameters.get("qlat_const", None)
    qlat_input_file = forcing_parameters.get("qlat_input_file", None)
    qlat_input_folder = forcing_parameters.get("qlat_input_folder", None)
    qlat_file_pattern_filter = forcing_parameters.get(
        "qlat_file_pattern_filter", None
    )
    qlat_file_index_col = forcing_parameters.get("qlat_file_index_col", None)
    qlat_file_value_col = forcing_parameters.get("qlat_file_value_col", None)

    wrf_hydro_channel_restart_file = restart_parameters.get(
        "wrf_hydro_channel_restart_file", None
    )
    wrf_hydro_channel_ID_crosswalk_file = restart_parameters.get(
        "wrf_hydro_channel_ID_crosswalk_file", None
    )
    wrf_hydro_channel_ID_crosswalk_file_field_name = restart_parameters.get(
        "wrf_hydro_channel_ID_crosswalk_file_field_name", None
    )
    wrf_hydro_channel_restart_upstream_flow_field_name = restart_parameters.get(
        "wrf_hydro_channel_restart_upstream_flow_field_name", None
    )
    wrf_hydro_channel_restart_downstream_flow_field_name = restart_parameters.get(
        "wrf_hydro_channel_restart_downstream_flow_field_name", None
    )
    wrf_hydro_channel_restart_depth_flow_field_name = restart_parameters.get(
        "wrf_hydro_channel_restart_depth_flow_field_name", None
    )

    wrf_hydro_waterbody_restart_file = restart_parameters.get(
        "wrf_hydro_waterbody_restart_file", None
    )
    wrf_hydro_waterbody_ID_crosswalk_file = restart_parameters.get(
        "wrf_hydro_waterbody_ID_crosswalk_file", None
    )
    wrf_hydro_waterbody_ID_crosswalk_file_field_name = restart_parameters.get(
        "wrf_hydro_waterbody_ID_crosswalk_file_field_name", None
    )
    wrf_hydro_waterbody_crosswalk_filter_file = restart_parameters.get(
        "wrf_hydro_waterbody_crosswalk_filter_file", None
    )
    wrf_hydro_waterbody_crosswalk_filter_file_field_name = restart_parameters.get(
        "wrf_hydro_waterbody_crosswalk_filter_file_field_name", None
    )

# Any specific commandline arguments will override the file
# TODO: There are probably some pathological collisions that could
# arise from this ordering ... check these out.

In [28]:
if run_pocono2_test:
    if verbose:
        print("running test case for Pocono_TEST2 domain")
    # Overwrite the following test defaults
    supernetwork = "Pocono_TEST2"
    break_network_at_waterbodies = False
    qts_subdivisions = 1  # change qts_subdivisions = 1 as  default
    dt = 300 / qts_subdivisions
    nts = 144 * qts_subdivisions
    csv_output = {"csv_output_folder": os.path.join(root, "test", "output", "text")}
    nc_output_folder = os.path.join(root, "test", "output", "text")
    # test 1. Take lateral flow from re-formatted wrf-hydro output from Pocono Basin simulation
    qlat_input_file = os.path.join(
       
        root, r"test/input/geo/PoconoSampleData2/Pocono_ql_testsamp1_nwm_mc.csv"
    )

In [29]:
if showtiming:
    program_start_time = time.time()
if verbose:
    print(f"begin program t-route ...")

# STEP 1: Read the supernetwork dataset and build the connections graph
if verbose:
    print("creating supernetwork connections set")
if showtiming:
    start_time = time.time()

if supernetwork_parameters:
    supernetwork_values = nnu.get_nhd_connections(
        supernetwork_parameters=supernetwork_parameters,
        verbose=False,
        debuglevel=debuglevel,
    )

else:
    test_folder = os.path.join(root, r"test")
    geo_input_folder = os.path.join(test_folder, r"input", r"geo")
    supernetwork_parameters, supernetwork_values = nnu.set_networks(
        supernetwork=supernetwork,
        geo_input_folder=geo_input_folder,
        verbose=False,
        debuglevel=debuglevel,
    )
    waterbody_parameters = nnu.set_waterbody_parameters(
        supernetwork=supernetwork,
        geo_input_folder=geo_input_folder,
        verbose=False,
        debuglevel=debuglevel,
    )

if verbose:
    print("supernetwork connections set complete")
if showtiming:
    print("... in %s seconds." % (time.time() - start_time))

connections = supernetwork_values[0]

begin program t-route ...
creating supernetwork connections set
supernetwork connections set complete
... in 0.22557783126831055 seconds.


In [30]:
# STEP 2: Separate the networks and build the sub-graph of reaches within each network
if showtiming:
    start_time = time.time()
if verbose:
    print("organizing connections into reaches ...")
networks = nru.compose_networks(
    supernetwork_values,
    break_network_at_waterbodies=break_network_at_waterbodies,
    verbose=False,
    debuglevel=debuglevel,
    showtiming=showtiming,
)

if verbose:
    print("reach organization complete")
if showtiming:
    print("... in %s seconds." % (time.time() - start_time))
    start_time = time.time()

organizing connections into reaches ...
reach organization complete
... in 0.0044155120849609375 seconds.


In [31]:
# STEP 3: Organize Network for Waterbodies
if break_network_at_waterbodies:
    if showtiming:
        start_time = time.time()
    if verbose:
        print("reading waterbody parameter file ...")

    ## STEP 3a: Read waterbody parameter file
    waterbodies_values = supernetwork_values[12]
    waterbodies_segments = supernetwork_values[13]
    connections_tailwaters = supernetwork_values[4]

    waterbodies_df = nio.read_waterbody_df(
        waterbody_parameters, waterbodies_values,
    )
    waterbodies_df = waterbodies_df.sort_index(axis="index").sort_index(
        axis="columns"
    )

    nru.order_networks(connections, networks, connections_tailwaters)

    if verbose:
        print("waterbodies complete")
    if showtiming:
        print("... in %s seconds." % (time.time() - start_time))
        start_time = time.time()

    ## STEP 3b: Order subnetworks above and below reservoirs
    if showtiming:
        start_time = time.time()
    if verbose:
        print("ordering waterbody subnetworks ...")

    max_network_seqorder = -1
    for network in networks:
        max_network_seqorder = max(
            networks[network]["network_seqorder"], max_network_seqorder
        )
    ordered_networks = {}

    for terminal_segment, network in networks.items():
        if network["network_seqorder"] not in ordered_networks:
            ordered_networks[network["network_seqorder"]] = []
        ordered_networks[network["network_seqorder"]].append(
            (terminal_segment, network)
        )

    if verbose:
        print("ordering waterbody subnetworks complete")
    if showtiming:
        print("... in %s seconds." % (time.time() - start_time))
        start_time = time.time()

else:
    # If we are not splitting the networks, we can put them all in one order
    max_network_seqorder = 0
    ordered_networks = {}
    ordered_networks[0] = [
        (terminal_segment, network)
        for terminal_segment, network in networks.items()
    ]

if do_network_analysis_only:
    sys.exit()

if break_network_at_waterbodies:
    ## STEP 3c: Handle Waterbody Initial States
    if showtiming:
        start_time = time.time()
    if verbose:
        print("setting waterbody initial states ...")

    if wrf_hydro_waterbody_restart_file:

        waterbody_initial_states_df = nio.get_reservoir_restart_from_wrf_hydro(
            wrf_hydro_waterbody_restart_file,
            wrf_hydro_waterbody_ID_crosswalk_file,
            wrf_hydro_waterbody_ID_crosswalk_file_field_name,
            wrf_hydro_waterbody_crosswalk_filter_file,
            wrf_hydro_waterbody_crosswalk_filter_file_field_name,
        )
    else:
        # TODO: Consider adding option to read cold state from route-link file
        waterbody_initial_ds_flow_const = 0.0
        waterbody_initial_depth_const = 0.0
        # Set initial states from cold-state
        waterbody_initial_states_df = pd.DataFrame(
            0, index=waterbodies_df.index, columns=["qd0", "h0",], dtype="float32"
        )
        # TODO: This assignment could probably by done in the above call
        waterbody_initial_states_df["qd0"] = waterbody_initial_ds_flow_const
        waterbody_initial_states_df["h0"] = waterbody_initial_depth_const
        waterbody_initial_states_df["index"] = range(
            len(waterbody_initial_states_df)
        )

    if verbose:
        print("waterbody initial states complete")
    if showtiming:
        print("... in %s seconds." % (time.time() - start_time))
        start_time = time.time()

reading waterbody parameter file ...
waterbodies complete
... in 0.04615163803100586 seconds.
ordering waterbody subnetworks ...
ordering waterbody subnetworks complete
... in 3.743171691894531e-05 seconds.
setting waterbody initial states ...
waterbody initial states complete
... in 0.09616303443908691 seconds.


In [32]:
# STEP 4: Handle Channel Initial States
if showtiming:
    start_time = time.time()
if verbose:
    print("setting channel initial states ...")

if wrf_hydro_channel_restart_file:

    channel_initial_states_df = nio.get_stream_restart_from_wrf_hydro(
        wrf_hydro_channel_restart_file,
        wrf_hydro_channel_ID_crosswalk_file,
        wrf_hydro_channel_ID_crosswalk_file_field_name,
        wrf_hydro_channel_restart_upstream_flow_field_name,
        wrf_hydro_channel_restart_downstream_flow_field_name,
        wrf_hydro_channel_restart_depth_flow_field_name,
    )
else:
    # TODO: Consider adding option to read cold state from route-link file
    channel_initial_us_flow_const = 0.0
    channel_initial_ds_flow_const = 0.0
    channel_initial_depth_const = 0.0
    # Set initial states from cold-state
    channel_initial_states_df = pd.DataFrame(
        0, index=connections.keys(), columns=["qu0", "qd0", "h0",], dtype="float32"
    )
    channel_initial_states_df["qu0"] = channel_initial_us_flow_const
    channel_initial_states_df["qd0"] = channel_initial_ds_flow_const
    channel_initial_states_df["h0"] = channel_initial_depth_const
    channel_initial_states_df["index"] = range(len(channel_initial_states_df))

if verbose:
    print("channel initial states complete")
if showtiming:
    print("... in %s seconds." % (time.time() - start_time))
    start_time = time.time()

setting channel initial states ...
channel initial states complete
... in 0.02710890769958496 seconds.


In [33]:
# STEP 5: Read (or set) QLateral Inputs
if showtiming:
    start_time = time.time()
if verbose:
    print("creating qlateral array ...")

# initialize qlateral dict
qlateral = {}

if qlat_input_folder:
    qlat_files = []
    for pattern in qlat_file_pattern_filter:
        qlat_files.extend(glob.glob(qlat_input_folder + pattern))
    qlat_df = nio.get_ql_from_wrf_hydro(
        qlat_files=qlat_files,
        index_col=qlat_file_index_col,
        value_col=qlat_file_value_col,
    )

elif qlat_input_file:
    qlat_df = nio.get_ql_from_csv(qlat_input_file)

else:
    qlat_df = pd.DataFrame(
        qlat_const, index=connections.keys(), columns=range(nts), dtype="float32"
    )

for index, row in qlat_df.iterrows():
    qlateral[index] = row.tolist()

if verbose:
    print("qlateral array complete")
if showtiming:
    print("... in %s seconds." % (time.time() - start_time))
    start_time = time.time()

creating qlateral array ...
qlateral array complete
... in 6.405231952667236 seconds.


In [34]:
# STEP 6: Sort the ordered networks
if sort_networks:
    if showtiming:
        start_time = time.time()
    if verbose:
        print("sorting the ordered networks ...")

    for nsq in range(max_network_seqorder, -1, -1):
        sort_ordered_network(ordered_networks[nsq], True)

    if verbose:
        print("sorting complete")
    if showtiming:
        print("... in %s seconds." % (time.time() - start_time))
        start_time = time.time()



In [35]:
# Define the pool after we create the static global objects (and collect the garbage)
if parallel_compute:
    import gc

    gc.collect()
    pool = multiprocessing.Pool(cpu_pool)

flowveldepth_connect = (
    {}
)  # dict to contain values to transfer from upstream to downstream networks

writing netcdf output to --> ../../test/output/text/8777229.nc
writing netcdf output to --> ../../test/output/text/8777239.ncwriting netcdf output to --> ../../test/output/text/8777247.ncwriting netcdf output to --> ../../test/output/text/8777253.nc
writing netcdf output to --> ../../test/output/text/8777173.nc


writing netcdf output to --> ../../test/output/text/8777191.ncwriting netcdf output to --> ../../test/output/text/8777285.nc
writing netcdf output to --> ../../test/output/text/8778365.nc

writing netcdf output to --> ../../test/output/text/8777207.nc
writing netcdf output to --> ../../test/output/text/8777187.nc
writing netcdf output to --> ../../test/output/text/8777241.nc
writing netcdf output to --> ../../test/output/text/8777141.nc
writing netcdf output to --> ../../test/output/text/8777225.nc
writing netcdf output to --> ../../test/output/text/8777125.nc
writing netcdf output to --> ../../test/output/text/8778363.nc
writing netcdf output to --> ../../test/output/text/877

In [36]:
################### Main Execution Loop across ordered networks
if showtiming:
    main_start_time = time.time()
if verbose:
    print(f"executing routing computation ...")

compute_network_func = tr.compute_network

tr.connections = connections
tr.networks = networks
tr.qlateral = qlateral
tr.waterbodies_df = waterbodies_df
tr.waterbody_initial_states_df = waterbody_initial_states_df
tr.channel_initial_states_df = channel_initial_states_df

progress_count = 0
percentage_complete = True
if percentage_complete:
    for nsq in range(max_network_seqorder, -1, -1):
        for terminal_segment, network in ordered_networks[nsq]:
            progress_count += len(network["all_segments"])
    pbar = tqdm(total=(progress_count))

for nsq in range(max_network_seqorder, -1, -1):

    if parallel_compute:
        nslist = []
    results = []

    current_index_total = 0

    for terminal_segment, network in ordered_networks[nsq]:

        if percentage_complete:
            if current_index_total == 0:
                pbar.update(0)

        if break_network_at_waterbodies:
            waterbody = waterbodies_segments.get(terminal_segment)
        else:
            waterbody = None
        if not parallel_compute:  # serial execution
            if showtiming:
                start_time = time.time()
            if verbose:
                print(
                    f"routing ordered reaches for terminal segment {terminal_segment} ..."
                )

            results.append(
                compute_network_func(
                    flowveldepth_connect=flowveldepth_connect,
                    terminal_segment=terminal_segment,
                    supernetwork_parameters=supernetwork_parameters,
                    waterbody_parameters=waterbody_parameters,
                    waterbody=waterbody,
                    nts=nts,
                    dt=dt,
                    qts_subdivisions=qts_subdivisions,
                    verbose=verbose,
                    debuglevel=debuglevel,
                    csv_output=csv_output,
                    nc_output_folder=nc_output_folder,
                    assume_short_ts=assume_short_ts,
                )
            )

            if showtiming:
                print("... complete in %s seconds." % (time.time() - start_time))
            if percentage_complete:
                pbar.update(len(network["all_segments"]))

        else:  # parallel execution
            nslist.append(
                [
                    flowveldepth_connect,
                    terminal_segment,
                    supernetwork_parameters,  # TODO: This should probably be global...
                    waterbody_parameters,
                    waterbody,
                    nts,
                    dt,
                    qts_subdivisions,
                    verbose,
                    debuglevel,
                    csv_output,
                    nc_output_folder,
                    assume_short_ts,
                ]
            )

    if parallel_compute:
        if verbose:
            print(f"routing ordered reaches for networks of order {nsq} ... ")
        if debuglevel <= -2:
            print(f"reaches to be routed include:")
            print(f"{[network[0] for network in ordered_networks[nsq]]}")
        # with pool:
        # with multiprocessing.Pool() as pool:
        results = pool.starmap(compute_network_func, nslist)

        if showtiming:
            print("... complete in %s seconds." % (time.time() - start_time))
        if percentage_complete:
            pbar.update(
                sum(
                    len(network[1]["all_segments"])
                    for network in ordered_networks[nsq]
                )
            )
            # print(f"{[network[0] for network in ordered_networks[nsq]]}")

    max_courant = 0
    maxa = []
    for result in results:
        for seg in result:
            maxa.extend(result[seg][:,8:9])
    max_courant = max(maxa)
    print(f"max_courant: {max_courant}")

    if (
        nsq > 0
    ):  # We skip this step for zero-order networks, i.e., those that have no downstream dependents
        flowveldepth_connect = (
            {}
        )  # There is no need to preserve previously passed on values -- so we clear the dictionary
        for i, (terminal_segment, network) in enumerate(ordered_networks[nsq]):
            # seg = network["reaches"][network["terminal_reach"]]["reach_tail"]
            seg = terminal_segment
            flowveldepth_connect[seg] = {}
            flowveldepth_connect[seg] = results[i][seg]
            # TODO: The value passed here could be much more specific to
            # TODO: exactly and only the most recent time step for the passing reach

if parallel_compute:
    pool.close()

if percentage_complete:
    pbar.close()

if verbose:
    print("ordered reach computation complete")
if showtiming:
    print("... in %s seconds." % (time.time() - main_start_time))
if verbose:
    print("program complete")
if showtiming:
    print("... in %s seconds." % (time.time() - program_start_time))



  0%|          | 0/1108 [00:00<?, ?it/s][A

executing routing computation ...
routing ordered reaches for networks of order 2 ... 



 42%|████▏     | 467/1108 [00:05<00:07, 81.18it/s][A

... complete in 25.412950038909912 seconds.
max_courant: [27.53112221]
routing ordered reaches for networks of order 1 ... 



 45%|████▌     | 499/1108 [00:08<00:19, 30.65it/s][A

... complete in 27.97378373146057 seconds.
max_courant: [0.]
routing ordered reaches for networks of order 0 ... 



100%|██████████| 1108/1108 [00:18<00:00, 36.15it/s][A

... complete in 37.766319274902344 seconds.


100%|██████████| 1108/1108 [00:18<00:00, 59.41it/s]

max_courant: [126.81378174]
ordered reach computation complete
... in 18.672638654708862 seconds.
program complete
... in 57.19902944564819 seconds.



