Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes #7

Merged
merged 6 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions marquette/conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,27 @@ units: mm/day
date_codes: ${data_path}/date_codes.json
is_streamflow_split: true
start_date: 01-01-1980
#end_date: 12-31-2019
end_date: 12-31-2020
end_date: 12-31-2019
#end_date: 12-31-2020
drainage_area_treshold: 0.1
continent: 7
area: 1
area: 3
num_partitions: 64
zone: ${continent}${area}
streamflow_version: dpl_v3
streamflow_version: dpl_v1
filter: True
pad_gage_id: True
save_paths:
gage_locations: ${data_path}/HUC/raw_observations/gages3000Info.csv
usgs_flowline_intersections: ${data_path}/HUC/raw_observations/gage_9322_intersection.shp
attributes: ${data_path}/streamflow/attributes_dpl_v2.csv
attributes: ${data_path}/streamflow/attributes_${streamflow_version}.csv
basins: ${data_path}/${name}/raw/basins/cat_pfaf_${continent}${area}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp
flow_lines: ${data_path}/${name}/raw/flowlines
huc10: ${data_path}/HUC/raw_observations/huc_10_CONUS.shp
streamflow_files: ${data_path}/streamflow/predictions/${streamflow_version}/
csv:
gage_information: ${data_path}/MERIT/gage_information/gages_3000_merit_info.csv
zone_gage_information: ${data_path}/MERIT/gage_information/${zone}.csv
json:
gage_to_zone_mapping: ${data_path}/MERIT/json/gage_to_zone_mapping.json
zone_to_gage_mapping: ${data_path}/MERIT/json/zone_to_gage_mapping.json
usgs_flowline_mapping: ${data_path}/MERIT/json/usgs_merit_flowline_mapping.json
gage_information: ${data_path}/${name}/gage_information/gages_3000_merit_info.csv
zone_gage_information: ${data_path}/${name}/gage_information/${zone}.csv
zarr:
edges: ${data_path}/${name}/zarr/graph/edges/
gage_coo_indices: ${data_path}/MERIT/zarr/gage_coo_indices
Expand Down
34 changes: 34 additions & 0 deletions marquette/conf/saved_configs/grdc_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: MERIT
data_path: /data/tkb5476/projects/marquette/data/
dx: 2000
buffer: 0.3334
units: mm/day
date_codes: ${data_path}/date_codes.json
is_streamflow_split: true
start_date: 01-01-1980
end_date: 12-31-2020
drainage_area_treshold: 0.1
continent: 7
area: 4
num_partitions: 64
zone: ${continent}${area}
streamflow_version: dpl_v3
filter: False
pad_gage_id: False
save_paths:
gage_locations: ${data_path}/HUC/raw_observations/gages3000Info.csv
usgs_flowline_intersections: ${data_path}/HUC/raw_observations/large_basin_formatted_intersection.shp
attributes: ${data_path}/streamflow/attributes_dpl_v2.csv
basins: ${data_path}/${name}/raw/basins/cat_pfaf_${continent}${area}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp
flow_lines: ${data_path}/${name}/raw/flowlines
huc10: ${data_path}/HUC/raw_observations/huc_10_CONUS.shp
streamflow_files: ${data_path}/streamflow/predictions/${streamflow_version}/
csv:
gage_information: ${data_path}/MERIT/gage_information/large_basin_merit_info.csv
zone_gage_information: ${data_path}/MERIT/gage_information/large_basin_${zone}.csv
zarr:
edges: ${data_path}/${name}/zarr/graph/edges/
gage_coo_indices: ${data_path}/MERIT/zarr/gage_coo_indices
HUC_TM: ${data_path}/${name}/zarr/TMs/PFAF_${continent}${area}
MERIT_TM: ${data_path}/${name}/zarr/TMs/MERIT_FLOWLINES_${continent}${area}
streamflow: ${data_path}/streamflow/zarr/${streamflow_version}/${zone}
35 changes: 35 additions & 0 deletions marquette/conf/saved_configs/srb_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: MERIT
data_path: /data/tkb5476/projects/marquette/data/
dx: 2000
buffer: 0.3334
units: mm/day
date_codes: ${data_path}/date_codes.json
is_streamflow_split: true
start_date: 01-01-1980
#end_date: 12-31-2019
end_date: 12-31-2020
drainage_area_treshold: 0.1
continent: 7
area: 2
num_partitions: 64
zone: ${continent}${area}
streamflow_version: dpl_v3
filter: False
pad_gage_id: True
save_paths:
gage_locations: ${data_path}/HUC/raw_observations/gages3000Info.csv
usgs_flowline_intersections: ${data_path}/HUC/raw_observations/srb_formatted_gage_points.shp
attributes: ${data_path}/streamflow/attributes_dpl_v2.csv
basins: ${data_path}/${name}/raw/basins/cat_pfaf_${continent}${area}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp
flow_lines: ${data_path}/${name}/raw/flowlines
huc10: ${data_path}/HUC/raw_observations/huc_10_CONUS.shp
streamflow_files: ${data_path}/streamflow/predictions/${streamflow_version}/
csv:
gage_information: ${data_path}/MERIT/gage_information/srb_merit_info.csv
zone_gage_information: ${data_path}/MERIT/gage_information/srb_${zone}.csv
zarr:
edges: ${data_path}/${name}/zarr/graph/edges/
gage_coo_indices: ${data_path}/MERIT/zarr/gage_coo_indices
HUC_TM: ${data_path}/${name}/zarr/TMs/PFAF_${continent}${area}
MERIT_TM: ${data_path}/${name}/zarr/TMs/MERIT_FLOWLINES_${continent}${area}
streamflow: ${data_path}/streamflow/zarr/${streamflow_version}/${zone}
29 changes: 10 additions & 19 deletions marquette/merit/_TM_calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
log = logging.getLogger(__name__)


def create_HUC_MERIT_TM(cfg: DictConfig, gdf: gpd.GeoDataFrame) -> zarr.hierarchy.Group:
def create_HUC_MERIT_TM(cfg: DictConfig, edges: zarr.hierarchy.Group, gdf: gpd.GeoDataFrame) -> None:
"""
Create a Transfer Matrix (TM) from GeoDataFrame.

Expand All @@ -23,9 +23,8 @@ def create_HUC_MERIT_TM(cfg: DictConfig, gdf: gpd.GeoDataFrame) -> zarr.hierarch
"""
gdf = gdf.dropna(subset=['HUC10'])
huc10_ids = gdf["HUC10"].unique()
merit_ids = gdf["COMID"].unique()
huc10_ids.sort()
merit_ids.sort()
merit_ids = np.unique(edges.merit_basin[:]) # already sorted
data_array = xr.DataArray(np.zeros((len(huc10_ids), len(merit_ids))),
dims=["HUC10", "COMID"],
coords={"HUC10": huc10_ids, "COMID": merit_ids})
Expand All @@ -45,11 +44,10 @@ def create_HUC_MERIT_TM(cfg: DictConfig, gdf: gpd.GeoDataFrame) -> zarr.hierarch
zarr_path = Path(cfg.zarr.HUC_TM)
xr_dataset.to_zarr(zarr_path, mode='w')
zarr_hierarchy = zarr.open_group(Path(cfg.zarr.HUC_TM), mode='r')
return zarr_hierarchy


def create_MERIT_FLOW_TM(
cfg: DictConfig, edges: zarr.hierarchy.Group, huc_to_merit_TM: zarr.hierarchy.Group
cfg: DictConfig, edges: zarr.hierarchy.Group
) -> zarr.hierarchy.Group:
"""
Creating a TM that maps MERIT basins to their reaches. Flow predictions are distributed
Expand All @@ -59,11 +57,15 @@ def create_MERIT_FLOW_TM(
:param huc_to_merit_TM:
:return:
"""
COMIDs = huc_to_merit_TM.COMID[:]
COMIDs = np.unique(edges.merit_basin[:]) # already sorted
river_graph_ids = edges.id[:]
merit_basin = edges.merit_basin[:]
river_graph_len = edges.len[:]
proportion_array = np.zeros((len(COMIDs), len(river_graph_ids)))
data_array = xr.DataArray(
data=np.zeros((len(COMIDs), len(river_graph_ids))),
dims=["COMID", "EDGEID"], # Explicitly naming the dimensions
coords={"COMID": COMIDs, "EDGEID": river_graph_ids} # Adding coordinates
)
for i, basin_id in enumerate(tqdm(COMIDs, desc="Processing River flowlines")):
indices = np.where(merit_basin == basin_id)[0]

Expand All @@ -74,25 +76,14 @@ def create_MERIT_FLOW_TM(
proportions = river_graph_len[indices] / total_length
for idx, proportion in zip(indices, proportions):
column_index = np.where(river_graph_ids == river_graph_ids[idx])[0][0]
proportion_array[i, column_index] = proportion

data_array = xr.DataArray(
data=proportion_array,
dims=["COMID", "EDGEID"], # Explicitly naming the dimensions
coords={"COMID": COMIDs, "EDGEID": river_graph_ids} # Adding coordinates
)
data_array.loc[i, column_index] = proportion
xr_dataset = xr.Dataset(
data_vars={"TM": data_array},
attrs={"description": "MERIT -> Edge Transition Matrix"}
)
zarr_path = Path(cfg.zarr.MERIT_TM)
xr_dataset.to_zarr(zarr_path, mode='w')
zarr_hierarchy = zarr.open_group(Path(cfg.zarr.MERIT_TM), mode='r')
return zarr_hierarchy
# zarr_group = zarr.open_group(Path(cfg.zarr.MERIT_TM), mode='w')
# zarr_group.create_dataset('TM', data=proportion_array)
# zarr_group.create_dataset('COMIDs', data=COMIDs)
# zarr_group.create_dataset('EDGEIDs', data=river_graph_ids)


def join_geospatial_data(cfg: DictConfig) -> gpd.GeoDataFrame:
Expand Down
39 changes: 25 additions & 14 deletions marquette/merit/_connectivity_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,21 @@ def find_closest_edge(
unique_gdf["drainage_area_percent_error"] <= cfg.drainage_area_treshold
]

try:
result_df["LNG_GAGE"] = result_df["LON_GAGE"]
result_df["LAT_GAGE"] = result_df["Latitude"]
except KeyError:
pass

columns = [
"STAID",
"STANAME",
"MERIT_ZONE",
"HUC02",
# "MERIT_ZONE",
# "HUC02",
"DRAIN_SQKM",
"LAT_GAGE",
"LNG_GAGE",
"STATE",
# "STATE",
"COMID",
"edge_intersection",
"zone_edge_id",
Expand Down Expand Up @@ -191,6 +197,7 @@ def find_closest_edge(


def create_gage_connectivity(
cfg: DictConfig,
edges: zarr.hierarchy.Group,
gage_coo_root: zarr.hierarchy.Group,
zone_csv: gpd.GeoDataFrame,
Expand Down Expand Up @@ -255,7 +262,7 @@ def stack_traversal(gage_id: str, id_: str, idx: int, merit_flowlines: zarr.Grou
return river_graph

def create_coo_data(
gage_output, padded_gage_id: str, root: zarr.Group
gage_output, _gage_id: str, root: zarr.Group
) -> List[Tuple[Any, Any]]:
"""
Creates coordinate format (COO) data from river graph output for a specific gage.
Expand Down Expand Up @@ -291,30 +298,34 @@ def create_coo_data(
pairs.append((ds, up))

# Create a Zarr dataset for this specific gage
single_gage_csr_data = root.require_group(padded_gage_id)
single_gage_csr_data = root.require_group(_gage_id)
single_gage_csr_data.create_dataset(
"pairs", data=np.array(pairs), chunks=(10000,), dtype="float32"
)

return pairs

def find_connections(row, coo_root, zone_attributes):
gage_id = str(row["STAID"]).zfill(8)
def find_connections(row, coo_root, zone_attributes, _pad_gage_id=True):
if _pad_gage_id:
_gage_id = str(row["STAID"]).zfill(8)
else:
_gage_id = str(row["STAID"])
edge_id = row["edge_intersection"]
zone_edge_id = row["zone_edge_id"]
if gage_id not in coo_root:
if _gage_id not in coo_root:
gage_output = stack_traversal(
gage_id, edge_id, zone_edge_id, zone_attributes
_gage_id, edge_id, zone_edge_id, zone_attributes
)
create_coo_data(gage_output, gage_id, coo_root)
create_coo_data(gage_output, _gage_id, coo_root)

def apply_find_connections(row, gage_coo_root, edges):
return find_connections(row, gage_coo_root, edges)
def apply_find_connections(row, gage_coo_root, edges, pad_gage_id):
return find_connections(row, gage_coo_root, edges, pad_gage_id)

dask_df = dd.from_pandas(zone_csv, npartitions=10)
pad_gage_id = cfg.pad_gage_id
dask_df = dd.from_pandas(zone_csv, npartitions=16)
result = dask_df.apply(
apply_find_connections,
args=(gage_coo_root, edges),
args=(gage_coo_root, edges, pad_gage_id),
axis=1,
meta=(None, "object"),
)
Expand Down
Loading