In [1]:
import json
import logging
import multiprocessing
from pathlib import Path
from typing import Any, Dict, List, Tuple

from dask.callbacks import Callback
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import dask_geopandas as dg
import geopandas as gpd
import numpy as np
from omegaconf import DictConfig, OmegaConf
import pandas as pd
from scipy.sparse import csr_matrix
from shapely.geometry import LineString, MultiLineString, Point
from tqdm import tqdm
import utm
import xarray as xr
import zarr

import sys
sys.path.append("..")

from marquette.merit._graph import _find_flowlines

log = logging.getLogger(__name__)

from dask.distributed import Client

client = Client(dashboard_address=':8989')
client
### TODO:
# - Preprocessing functions for catchment area of MERIT basins
# - Preprocessing function for flowline geometry (using custom UTM zones)
# - Preprocess function for the number of edges (the DX/buffer)
# - Create Edges

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8989/status,

0,1
Dashboard: http://127.0.0.1:8989/status,Workers: 12
Total threads: 144,Total memory: 503.74 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:39595,Workers: 12
Dashboard: http://127.0.0.1:8989/status,Total threads: 144
Started: Just now,Total memory: 503.74 GiB

0,1
Comm: tcp://127.0.0.1:37061,Total threads: 12
Dashboard: http://127.0.0.1:41285/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:36789,
Local directory: /tmp/dask-scratch-space/worker-uj_mq804,Local directory: /tmp/dask-scratch-space/worker-uj_mq804

0,1
Comm: tcp://127.0.0.1:33713,Total threads: 12
Dashboard: http://127.0.0.1:32785/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:43737,
Local directory: /tmp/dask-scratch-space/worker-xozys_lc,Local directory: /tmp/dask-scratch-space/worker-xozys_lc

0,1
Comm: tcp://127.0.0.1:36723,Total threads: 12
Dashboard: http://127.0.0.1:36719/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:40785,
Local directory: /tmp/dask-scratch-space/worker-yeftggzp,Local directory: /tmp/dask-scratch-space/worker-yeftggzp

0,1
Comm: tcp://127.0.0.1:33031,Total threads: 12
Dashboard: http://127.0.0.1:34699/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:37239,
Local directory: /tmp/dask-scratch-space/worker-io15e1h1,Local directory: /tmp/dask-scratch-space/worker-io15e1h1

0,1
Comm: tcp://127.0.0.1:36691,Total threads: 12
Dashboard: http://127.0.0.1:41681/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:36037,
Local directory: /tmp/dask-scratch-space/worker-gmz83sb_,Local directory: /tmp/dask-scratch-space/worker-gmz83sb_

0,1
Comm: tcp://127.0.0.1:43661,Total threads: 12
Dashboard: http://127.0.0.1:38949/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:42695,
Local directory: /tmp/dask-scratch-space/worker-0xy2vkv6,Local directory: /tmp/dask-scratch-space/worker-0xy2vkv6

0,1
Comm: tcp://127.0.0.1:46281,Total threads: 12
Dashboard: http://127.0.0.1:36017/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:39167,
Local directory: /tmp/dask-scratch-space/worker-hcuftgdw,Local directory: /tmp/dask-scratch-space/worker-hcuftgdw

0,1
Comm: tcp://127.0.0.1:35501,Total threads: 12
Dashboard: http://127.0.0.1:44961/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:38323,
Local directory: /tmp/dask-scratch-space/worker-iqt11tcn,Local directory: /tmp/dask-scratch-space/worker-iqt11tcn

0,1
Comm: tcp://127.0.0.1:37565,Total threads: 12
Dashboard: http://127.0.0.1:39819/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:34395,
Local directory: /tmp/dask-scratch-space/worker-p0_njtmp,Local directory: /tmp/dask-scratch-space/worker-p0_njtmp

0,1
Comm: tcp://127.0.0.1:38635,Total threads: 12
Dashboard: http://127.0.0.1:34159/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:36319,
Local directory: /tmp/dask-scratch-space/worker-1pdjhqmw,Local directory: /tmp/dask-scratch-space/worker-1pdjhqmw

0,1
Comm: tcp://127.0.0.1:43703,Total threads: 12
Dashboard: http://127.0.0.1:46263/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:42919,
Local directory: /tmp/dask-scratch-space/worker-wuvbxw9s,Local directory: /tmp/dask-scratch-space/worker-wuvbxw9s

0,1
Comm: tcp://127.0.0.1:43109,Total threads: 12
Dashboard: http://127.0.0.1:41917/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:42241,
Local directory: /tmp/dask-scratch-space/worker-ri2m6rwr,Local directory: /tmp/dask-scratch-space/worker-ri2m6rwr


In [135]:
json_data = '''
{
  "name": "MERIT",
  "data_path": "/data/tkb5476/projects/marquette/data/",
  "dx": 2000,
  "buffer": 0.3334,
  "units": "mm/day",
  "date_codes": "${data_path}/date_codes.json",
  "crs": {
    "wgs": "epsg:4326",
    "utm18": "epsg:32618"
  },
  "is_streamflow_split": true,
  "start_date": "01-01-1980",
  "end_date": "12-31-2019",
  "num_cores": 20,
  "continent": 7,
  "area": 8,
  "save_name": "${name}_${continent}${area}",
  "save_paths": {
    "attributes": "${data_path}/${name}/streamflow/attributes.csv",
    "flow_lines": "${data_path}/${name}/raw/flowlines",
    "processed_flow_lines": "${data_path}/${name}/raw/flowlines",
    "streamflow_files": "${data_path}/${name}/streamflow/dpl_v2/dHBV",
    "huc_to_merit_tm": "${data_path}/${name}/streamflow/TMs/${save_name}_huc_10_merit_TM.csv.gz"
  },
  "csv": {
    "edges": "${data_path}/${name}/zarr/dpl_v2/${save_name}_edges/",
    "sorted_edges_keys": "${data_path}/${name}/zarr/dpl_v2/${save_name}_edge_keys/",
    "mapped_streamflow_dir": "${data_path}/${name}/processed_csvs/dpl_v2.1/${save_name}"
  }
}
'''
data_dict = json.loads(json_data)
cfg = OmegaConf.create(data_dict)


In [4]:
# def determine_utm_zone(geom):
#     """
#     Determine UTM zone for a geometry.
#     """
#     lon, lat = geom.centroid.x, geom.centroid.y
#     utm_zone = utm.from_latlon(lat, lon)[2]
#     return f"EPSG:326{utm_zone}" if lat > 0 else f"EPSG:327{utm_zone}"

# def calculate_length_in_utm(row):
#     """
#     Calculate length of a geometry in its appropriate UTM zone.
#     """
#     utm_crs = determine_utm_zone(row.geometry)
#     geom_utm = row.geometry.to_crs(utm_crs)
#     return geom_utm.length

# def preprocess_flowlines(cfg: DictConfig):
#     """
#     Preprocess flowlines: project to UTM and calculate lengths.

#     Parameters:
#     input_shp (str): Path to the input shapefile.
#     output_shp (str): Path to the output shapefile.
#     """
#     # Read the shapefile
#     gdf = gpd.read_file(input_shp)

#     # Determine UTM zone from the first feature (assumes all features are in the same UTM zone)
#     utm_crs = determine_utm_zone(gdf.geometry[0])

#     # Project to UTM
#     gdf_utm = gdf.to_crs(utm_crs)

#     # Calculate lengths in meters and store in a new column
#     gdf_utm['length_m'] = gdf_utm.geometry.length

#     # Write the updated GeoDataFrame to a new shapefile
#     gdf_utm.to_file(output_shp)

# # Example usage
# input_shapefile = '/path/to/your/input/flowline.shp'
# output_shapefile = '/path/to/your/output/flowline_processed.shp'
# preprocess_flowlines(input_shapefile, output_shapefile)


In [5]:
def _plot_gdf(gdf: gpd.GeoDataFrame) -> None:
    """
    A function to find the correct flowline of all MERIT basins using glob

    Parameters
    ----------
    gdf : gpd.GeoDataFrame
        The geodataframe you want to plot

    Returns
    -------
    None

    Raises
    ------
    None
    """
    import matplotlib.pyplot as plt
    fig, ax = plt.subplots(figsize=(10, 10))
    gdf.plot(ax=ax)
    ax.set_title("Polyline Plot")
    ax.set_xlabel("Longitude")
    ax.set_ylabel("Latitude")
    plt.show()

In [6]:
def _find_flowlines(cfg: DictConfig) -> Path:
    """
    A function to find the correct flowline of all MERIT basins using glob

    Parameters
    ----------
    cfg : DictConfig
        The cfg object

    Returns
    -------
    Path
        The file that we're going to create flowline connectivity for

    Raises
    ------
    IndexError
        Raised if no flowlines are found with your MERIT region code
    """
    flowline_path = Path(cfg.save_paths.flow_lines)
    region_id = f"_{cfg.continent}{cfg.area}_"
    matching_file = flowline_path.glob(f"*{region_id}*.shp")
    try:
        found_file = [file for file in matching_file][0]
        return found_file
    except IndexError as e:
        raise IndexError(f"No flowlines found using: *{region_id}*.shp")

# Segments

In [7]:
def create_segment_dict(row: pd.Series, segment_coords: List[Tuple[float, float]], crs: Any, dx: int, buffer: float) -> Dict[str, Any]:
    segment_dict = {
        'id': row["COMID"],
        'order': row["order"],
        'len': row["lengthkm"] * 1000,  # to meters
        'len_dir': row["lengthdir"] * 1000,  # to meters
        'ds': row["NextDownID"],
        # 'is_headwater': False,
        'up': [row[key] for key in ["up1", "up2", "up3", "up4"] if row[key] != 0] if row["maxup"] > 0 else ([] if row["order"] == 1 else []),
        'slope': row["slope"],
        'sinuosity': row["sinuosity"],
        'stream_drop': row["strmDrop_t"],
        'uparea': row["uparea"],
        'coords': segment_coords,
        'crs': crs,
    }

    return segment_dict

In [8]:
def create_segment(row: pd.Series, crs: Any, dx: int, buffer: float) -> dict:
    return create_segment_dict(row, row.geometry, crs, dx, buffer)

In [9]:
def calculate_num_edges(length, dx, buffer):
    """
    Calculate the number of edges and the length of each edge for a given segment.

    This function determines the number of edges a segment should be divided into, 
    based on its length, a desired edge length (dx), and a tolerance (buffer). 
    The function adjusts the number of edges to ensure that the deviation of the 
    actual edge length from dx is within the specified buffer.

    Parameters
    ----------
    length : float
        The length of the segment for which to calculate the number of edges.
    dx : float
        The desired length of each edge.
    buffer : float
        The acceptable deviation from the desired edge length (dx).

    Returns
    -------
    tuple
        A tuple containing two elements:
            - The first element is an integer representing the number of edges.
            - The second element is a float representing the actual length of each edge.

    Examples
    --------
    >> calculate_num_edges(100, 30, 5)
    (3, 33.333333333333336)

    >> calculate_num_edges(100, 25, 2)
    (4, 25.0)
    """
    num_edges = length // dx
    if num_edges == 0:
        num_edges = 1
        if dx - length < buffer:
            edge_len = length
        else:
            edge_len = dx
    else:
        edge_len = length / num_edges
        buf_dev = edge_len - dx
        while abs(buf_dev) > buffer:
            if buf_dev > dx:
                num_edges -= 1
            else:
                num_edges += 1
            edge_len = length / num_edges
            buf_dev = edge_len - dx
    return (int(num_edges), edge_len)

# Edges

In [83]:
def create_edge_json(segment_row, up=None, ds=None, edge_id=None) -> dict:
    edge = {
        'id': edge_id,
        'merit_basin': segment_row['id'],
        'segment_sorting_index': segment_row['index'],
        'order': segment_row['order'],
        'len': segment_row['len'],
        'len_dir': segment_row['len_dir'],
        'ds': ds,
        'up': up,
        'slope': segment_row['slope'],
        'sinuosity': segment_row['sinuosity'],
        'stream_drop': segment_row['stream_drop'],
        'uparea': segment_row['uparea'],
        'coords': segment_row['coords'],
        'crs': segment_row['crs'],
    }
    return edge

def calculate_drainage_area(edge, idx, segment_das):
    prev_up_area = 0
    if edge['up']:
        try:
            prev_up_area = sum(segment_das[seg] for seg in edge['up'])
        except KeyError:
            prev_up_area = 0
            log.info("Missing upstream branch. Treating as head node")
        ratio = (edge["len"] * (idx + 1)) / edge["len_dir"]
        area_difference = edge['uparea'] - prev_up_area
        edge["uparea"] = prev_up_area + (area_difference * ratio)

def get_upstream_ids(row, edge_counts):
    if row['up'] is None:
        return []
    try:
        up_ids = [f"{up}_{edge_counts - 1}" for up in row['up']]
    except KeyError:
        log.error(f"KeyError with segment {row['id']}")
        return []
    return up_ids

In [108]:
def singular_segment_to_edge_partition(df, edge_info, segment_das):
    all_edges = []
    num_edges = 1
    for _, segment in tqdm(df.iterrows(), total=len(df)):
        edge_len = edge_info[segment['id']][1]
        up_ids = get_upstream_ids(segment, num_edges)
        edge = create_edge_json(
            segment,
            up=up_ids,
            ds=f"{segment['ds']}_0",
            edge_id=f"{segment['id']}_0",
        )
        edge["len"] = edge_len
        edge["len_dir"] = edge_len / segment["sinuosity"]
        all_edges.append(edge)
    return pd.DataFrame(all_edges)

def many_segment_to_edge_partition(df, edge_info, segment_das):
    all_edges = []
    for _, segment in tqdm(df.iterrows(), total=len(df), desc="Processing Segments"):
        num_edges, edge_len = edge_info[segment['id']]
        up_ids = get_upstream_ids(segment, num_edges)
        for i in range(num_edges):
            if i == 0:
                edge = create_edge_json(
                    segment,
                    up=up_ids,
                    ds=f"{segment['id']}_{i + 1}",
                    edge_id=f"{segment['id']}_{i}",
                )
            else:
                edge = create_edge_json(
                    segment,
                    up=[f"{segment['id']}_{i - 1}"],
                    ds=f"{segment['id']}_{i + 1}" if i < num_edges - 1 else f"{segment['ds']}_0",
                    edge_id=f"{segment['id']}_{i}",
                )
            edge["len"] = edge_len
            edge["len_dir"] = edge_len / segment["sinuosity"]
            calculate_drainage_area(edge, i, segment_das)
            all_edges.append(edge)
    return pd.DataFrame(all_edges)

In [79]:
# def apply_segments_to_edges(df, func, edge_counts, segment_das):
#     edges_list = df.apply(lambda row: func(row, edge_counts, segment_das), axis=1)
#     return pd.DataFrame(edges_list)


# The functions
### Read in the polylines and convert to dask dataframe

In [39]:
flowline_file: Path = _find_flowlines(cfg)
polyline_gdf: gpd.GeoDataFrame = gpd.read_file(flowline_file)
dx: int = cfg.dx  # Unit: Meters
buffer: float = cfg.buffer * dx  # Unit: Meters
for col in [
    "COMID",
    "NextDownID",
    "up1",
    "up2",
    "up3",
    "up4",
    "maxup",
    "order",
]:
    polyline_gdf[col] = polyline_gdf[col].astype(int)
crs: Any = polyline_gdf.crs
dask_gdf: dg.GeoDataFrame = dg.from_geopandas(polyline_gdf, npartitions=48) 

### Create segments and find the ordering of the segments by drainage area

In [76]:
meta = pd.Series([], dtype=object)
with ProgressBar():
    computed_series: dd.Series = dask_gdf.map_partitions(
        lambda df: df.apply(create_segment, args=(polyline_gdf.crs, dx, buffer), axis=1),
        meta=meta
    ).compute()
    
segments_dict = computed_series.to_dict()
sorted_keys = sorted(segments_dict, key=lambda key: segments_dict[key]['uparea'])
segment_das = {segment['id']: segment['uparea'] for segment in segments_dict.values()}

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


In [99]:
num_edges_dict = {segment_["id"]: calculate_num_edges(segment_["len"], dx, buffer) for seg_id, segment_ in tqdm(segments_dict.items(), desc="Processing Number of Edges")}
one_edge_segment = {seg_id: edge_info for seg_id, edge_info in tqdm(num_edges_dict.items(), desc="Filtering Segments == 1") if edge_info[0] == 1}
many_edge_segment = {seg_id: edge_info for seg_id, edge_info in tqdm(num_edges_dict.items(), desc="Filtering Segments > 1") if edge_info[0] > 1} 

Processing Number of Edges: 100%|█████████████████████████████████████████████████████████████████████████████████| 28489/28489 [00:00<00:00, 246050.61it/s]
Filtering Segments == 1: 100%|████████████████████████████████████████████████████████████████████████████████████| 28489/28489 [00:00<00:00, 625132.24it/s]
Filtering Segments > 1: 100%|█████████████████████████████████████████████████████████████████████████████████████| 28489/28489 [00:00<00:00, 612475.53it/s]


In [77]:
segments_with_more_than_one_edge = {}
segments_with_one_edge = {}

for i, segment in segments_dict.items():
    segment_id = segment["id"]
    segment["index"] = i
    
    if segment_id in more_than_one_edge:
        segments_with_more_than_one_edge[segment_id] = segment
    elif segment_id in one_edge_segment:
        segments_with_one_edge[segment_id] = segment
    else:
        print(f"MISSING ID: {segment_id}")

df_one = pd.DataFrame.from_dict(segments_with_one_edge, orient='index')
df_many = pd.DataFrame.from_dict(segments_with_more_than_one_edge, orient='index')
ddf_one = dd.from_pandas(df_one, npartitions=48)
ddf_many = dd.from_pandas(df_many, npartitions=48)

In [109]:
many_segment_to_edge_partition(df_many, many_edge_segment, segment_das).head()

Processing Segments: 100%|██████████████████████████████████████████████████████████████████████████████████████████| 22690/22690 [00:19<00:00, 1136.23it/s]


Unnamed: 0,id,merit_basin,segment_sorting_index,order,len,len_dir,ds,up,slope,sinuosity,stream_drop,uparea,coords,crs
0,78000001_0,78000001,0,3,2560.284421,1742.96467,78000001_1,"[78000002_2, 78000003_2]",0.005052,1.468925,38.9,1842.457826,LINESTRING (-133.15583333333333 59.00666666666...,EPSG:4326
1,78000001_1,78000001,0,3,2560.284421,1742.96467,78000001_2,[78000001_0],0.005052,1.468925,38.9,3684.915652,LINESTRING (-133.15583333333333 59.00666666666...,EPSG:4326
2,78000001_2,78000001,0,3,2560.284421,1742.96467,78000369_0,[78000001_1],0.005052,1.468925,38.9,5527.373478,LINESTRING (-133.15583333333333 59.00666666666...,EPSG:4326
3,78000002_0,78000002,1,3,2043.085156,1511.372348,78000002_1,"[78000399_9, 78000627_9]",0.007258,1.351808,148.7,671.234704,"LINESTRING (-133.215 59.042500000000004, -133....",EPSG:4326
4,78000002_1,78000002,1,3,2043.085156,1511.372348,78000002_2,[78000002_0],0.007258,1.351808,148.7,1342.469408,"LINESTRING (-133.215 59.042500000000004, -133....",EPSG:4326


### Processing flowline segments to river graph edges

In [111]:
meta = pd.DataFrame({
    'id': pd.Series(dtype='str'),
    'merit_basin': pd.Series(dtype='int'),
    'segment_sorting_index': pd.Series(dtype='int'),
    'order': pd.Series(dtype='int'),
    'len': pd.Series(dtype='float'),
    'len_dir': pd.Series(dtype='float'),
    'ds': pd.Series(dtype='str'),
    'up': pd.Series(dtype='object'),  # List or array
    'slope': pd.Series(dtype='float'),
    'sinuosity': pd.Series(dtype='float'),
    'stream_drop': pd.Series(dtype='float'),
    'uparea': pd.Series(dtype='float'),
    'coords': gpd.GeoSeries(dtype='geometry'),  # Assuming this is a geometry column
    'crs': pd.Series(dtype='object'),  # CRS object
})

edges_results_one = ddf_one.map_partitions(
    singular_segment_to_edge_partition,
    edge_info=one_edge_segment, 
    segment_das=segment_das,
    meta=meta
)
edges_results_many = ddf_many.map_partitions(
    many_segment_to_edge_partition,
    edge_info=many_edge_segment, 
    segment_das=segment_das,
    meta=meta
)

In [113]:
edges_results_one_df = edges_results_one.compute()
edges_results_many_df = edges_results_many.compute()

  0%|          | 0/121 [00:00<?, ?it/s]
  0%|          | 0/120 [00:00<?, ?it/s][A

  0%|          | 0/121 [00:00<?, ?it/s][A[A


100%|██████████| 121/121 [00:00<00:00, 2611.01it/s]
100%|██████████| 121/121 [00:00<00:00, 1638.68it/s]
100%|██████████| 120/120 [00:00<00:00, 1439.97it/s]
100%|██████████| 121/121 [00:00<00:00, 1421.96it/s]
  0%|          | 0/121 [00:00<?, ?it/s]
100%|██████████| 121/121 [00:00<00:00, 6735.38it/s]
100%|██████████| 121/121 [00:00<00:00, 3560.18it/s]
  0%|          | 0/121 [00:00<?, ?it/s]
100%|██████████| 121/121 [00:00<00:00, 4201.14it/s]
100%|██████████| 121/121 [00:00<00:00, 3453.58it/s]
  0%|          | 0/121 [00:00<?, ?it/s]
100%|██████████| 121/121 [00:00<00:00, 3842.57it/s]
100%|██████████| 121/121 [00:00<00:00, 3432.21it/s]
  0%|          | 0/120 [00:00<?, ?it/s]
100%|██████████| 121/121 [00:00<00:00, 4425.10it/s]
100%|██████████| 120/120 [00:00<00:00, 3246.78it/s]
  0%|          | 0/121 [00:00<?, ?it/s]
  0%|          | 0/120 [00:00<?, ?it/s][A



NameError: name 'df' is not defined

In [130]:
merged_df = pd.concat([edges_results_one_df, edges_results_many_df])
for col in ["id", "ds", "up", "coords", "crs"]:
    merged_df[col] = merged_df[col].astype(str)
print(merged_df.dtypes)

id                        object
merit_basin                int64
segment_sorting_index      int64
order                      int64
len                      float64
len_dir                  float64
ds                        object
up                        object
slope                    float64
sinuosity                float64
stream_drop              float64
uparea                   float64
coords                    object
crs                       object
dtype: object


In [136]:
xr_dataset = xr.Dataset.from_dataframe(merged_df)
xr_dataset.to_zarr(Path(cfg.csv.edges), mode='w')

<xarray.backends.zarr.ZarrStore at 0x7f3fb00ffdc0>

In [137]:
sorted_keys_array = np.array(sorted_keys)
zarr.save(cfg.csv.sorted_edges_keys, sorted_keys_array)