In [1]:
import netCDF4 as nc
import numpy as np
import matplotlib.pyplot as plt
import geopandas as gpd
import pandas as pd
import networkx as nx
import os


In [2]:
base_dir = '/Users/donghui/Box Sync/Research/PhD/Projects/Water_Supply_Drought'
data_dir = f'{base_dir}/data'

os.chdir(base_dir)

# Read Grids & Sort by Upstream-Downstream

In [None]:
# ---- Read geo files related to a given HUC4 basin ---- #

huc4 = '0601'

nhd_data_dir = '/Users/donghui/Box Sync/Research/PhD/Projects/Drought_Cycle_Analysis/Data'
crs = 'EPSG:4326'
huc2_conus = [f'0{i}' if i<10 else f'{i}' for i in range(1, 19)]

# read HUCs
huc2 = huc4[0:2]
gdb_file = f'{nhd_data_dir}/Raw/WBD/WBD_{huc2}_HU2_GDB.gdb'
gdf_huc2_all = gpd.read_file(gdb_file, layer='WBDHU2')
gdf_huc4_all = gpd.read_file(gdb_file, layer='WBDHU4')
gdf_huc6_all = gpd.read_file(gdb_file, layer='WBDHU6')
gdf_huc8_all = gpd.read_file(gdb_file, layer='WBDHU8')
gdf_huc10_all = gpd.read_file(gdb_file, layer='WBDHU10')

# set crs
gdf_huc2_all = gdf_huc2_all.set_crs(crs, inplace=False, allow_override=True)    # includes the huc2 region
gdf_huc4_all = gdf_huc4_all.set_crs(crs, inplace=False, allow_override=True)    # includes all huc4 subregions in this huc2 region
gdf_huc6_all = gdf_huc6_all.set_crs(crs, inplace=False, allow_override=True)    # includes all huc6 basins in this huc2 region
gdf_huc8_all = gdf_huc8_all.set_crs(crs, inplace=False, allow_override=True)    # includes all huc8 subbasins in this huc2 region
gdf_huc10_all = gdf_huc10_all.set_crs(crs, inplace=False, allow_override=True)    # includes all huc10 subbasins in this huc2 region

########## Prepare flow lines ##########

if huc2 == '03':    # multiple NHDP files for 03
    flow_attr_file_list = [f'{nhd_data_dir}/Raw/NHDPlus/NHDPlus{huc2}{i}/NHDPlusAttributes' for i in ['N','S','W']]
    hydro_file_list = [f'{nhd_data_dir}/Raw/NHDPlus/NHDPlus{huc2}{i}/NHDSnapshot/Hydrography' for i in ['N','S','W']]
elif huc2 == '10': 
    flow_attr_file_list = [f'{nhd_data_dir}/Raw/NHDPlus/NHDPlus{huc2}{i}/NHDPlusAttributes' for i in ['U','L']]
    hydro_file_list = [f'{nhd_data_dir}/Raw/NHDPlus/NHDPlus{huc2}{i}/NHDSnapshot/Hydrography' for i in ['U','L']]
else:
    flow_attr_file_list = [f'{nhd_data_dir}/Raw/NHDPlus/NHDPlus{huc2}/NHDPlusAttributes']
    hydro_file_list = [f'{nhd_data_dir}/Raw/NHDPlus/NHDPlus{huc2}/NHDSnapshot/Hydrography']

gdf_flow_list = []
for flow_attr_file, hydro_file in zip(flow_attr_file_list, hydro_file_list):
    gdf_fline_vaa = gpd.read_file(flow_attr_file, layer='PlusFlowlineVAA')
    gdf_fline = gpd.read_file(hydro_file, layer='NHDFlowline')

    # change COMID to ComID if the error exists
    if not 'ComID' in gdf_fline:
        gdf_fline.rename(columns={'COMID':'ComID'}, inplace=True)

    # change vaa file ComID to int
    to_int_var = ['ComID', 'StreamOrde', 'StreamCalc']
    gdf_fline_vaa[to_int_var] = gdf_fline_vaa[to_int_var].astype(int)

    # merge this two gdfs
    to_merge_vars = ['ComID', 'StreamOrde', 'StreamCalc', 'FromNode', 'ToNode']
    gdf_flow = gdf_fline.merge(gdf_fline_vaa[to_merge_vars], how='inner', on='ComID')
    
    gdf_flow_list.append(gdf_flow)

gdf_flow = pd.concat(gdf_flow_list)

# set crs
gdf_flow = gdf_flow.set_crs(crs, inplace=True, allow_override=True)

# subset to the target huc4
gdf_flow_huc4 = gdf_flow.sjoin(gdf_huc4_all.loc[gdf_huc4_all['huc4']==huc4], how='inner', predicate='intersects')

########## End Prepare flow lines ##########


########## Read .nc files ##########
conus_grid_nc = f'{data_dir}/processed/LRR/input/conus_nldas_grid.nc'
conus_reservoir_nc = f'{data_dir}/processed/LRR/input/reservoirs.nc'
# Read CONUS grids
with nc.Dataset(conus_grid_nc) as conus_grid:
    lon_array = conus_grid.variables['lon'][:]
    lat_array = conus_grid.variables['lat'][:]
    grid_id_array = conus_grid.variables['id'][:, :]
    flow_dir_array = conus_grid.variables['flow_dir'][:, :]

In [None]:
nhd_data_dir

In [None]:
from shapely.geometry import Point

def get_grids_in_hu(lon_array, lat_array, gdf_huc):
    """
    Get grids (lon-lat) within the target HU
    
    gdf_huc: geodataframe of the target HU
    
    """
    
    # create lon-lat pairs from global vars: lon_array and lat_array
    lon_lat_all = [[Point(lon, lat), i, j] for i, lon in enumerate(lon_array) for j, lat in enumerate(lat_array)]    # [Point(lon, lat), i-lon index, j-lat index]
    
    # index the wanted hu from the gdf
    huc_geo = gdf_huc['geometry'].values   # the polygon for this huc
        
    # find (lon, lat) pairs within the area
    lon_lat_sub = [i for i in lon_lat_all if huc_geo.contains(i[0])[0]]

    # create point geodataframe for selected points and check
    d = {'lon index': [i[1] for i in lon_lat_sub], 'lat index': [i[2] for i in lon_lat_sub]}
    gdf_points = gpd.GeoDataFrame(d, 
                                  geometry=[i[0] for i in lon_lat_sub], crs='EPSG:4326')   # lon index, lat index, geometry
    
    result = {
        'grids_in_hu': gdf_points,    # gdf - lon index, lat index, geometry; the index represents index in .nc files
        'others': (lon_array, lat_array, gdf_huc)    # this is mainly for plot check
    }
    
    return(result)

def get_downstream_cell(i, j, direction):
    """
    Returns the downstream cell coordinates based on D8 flow direction.
    d8 directions:
    32  64 128
    16  x   1
    8   4   2

    Parameters:
    i (int): Row index of the current cell
    j (int): Column index of the current cell
    direction (int): D8 flow direction code of the current cell

    Returns:
    tuple: Coordinates (row, col) of the downstream cell, or None if no downstream
    """

    # Define the changes in row (di) and column (dj) for each flow direction
    # PLEASE note: the direction is defined as such, because the latitude array increases as the row index increases & longitude array increases as the col index increases!!!

    direction_map = {
        1: (0, 1),     # East: i, j+1
        2: (-1, 1),    # Southeast: i-1, j+1
        4: (-1, 0),    # South: i-1, j (move south, decrease row index)
        8: (-1, -1),   # Southwest: i-1, j-1
        16: (0, -1),   # West: i, j-1
        32: (1, -1),   # Northwest: i+1, j-1
        64: (1, 0),    # North: i+1, j (move north, increase row index)
        128: (1, 1)    # Northeast: i+1, j+1
    }


    # Get the changes in row and column for the given direction
    di, dj = direction_map.get(direction, (0, 0))    # if direction is not in the direction_map, return (0, 0)

    # If di and dj are both 0, it means the direction is not defined (e.g., a sink)
    if di == 0 and dj == 0:
        return None

    # Calculate the coordinates of the downstream cell
    downstream_i = i + di
    downstream_j = j + dj
    
    return (downstream_i, downstream_j)

# test
get_downstream_cell(2, 16, 64)


In [None]:
# Get the grids within the target HU
gdf_huc4 = gdf_huc4_all.loc[gdf_huc4_all['huc4']==huc4]
gdf_grid_index_in_hu = get_grids_in_hu(lon_array, lat_array, gdf_huc4)['grids_in_hu']
lon_index = gdf_grid_index_in_hu['lon index'].values    # contains duplicate values
lat_index = gdf_grid_index_in_hu['lat index'].values    # contains duplicate values

# Subset the flow direction array to the target HU basin (the rectangular area covering the HUC4 basin)
# technically, I didn't "subset", just set the flow direction values outside the HU to -9999
# first, set all conus grids outside the target HU to -9999
mask = np.ones_like(flow_dir_array, dtype=bool)
mask[lat_index, lon_index] = False
flow_dir_array[mask] = -1    # set all conus grids outside the target HU to -1
# second, subset the flow direction array to the target HU basin
lat_index_unique = np.unique(lat_index)
lon_index_unique = np.unique(lon_index)
flow_dir_array_huc4 = flow_dir_array[np.ix_(lat_index_unique, lon_index_unique)]

# also, get the grid id array for the target HU basin for record
grid_id_array_huc4 = grid_id_array[np.ix_(lat_index_unique, lon_index_unique)]
lat_array_huc4 = lat_array[lat_index_unique]
lon_array_huc4 = lon_array[lon_index_unique]

########## Sort the grids ##########

# create a graph
G = nx.DiGraph()

# add nodes
nrows, ncols = flow_dir_array_huc4.shape
for i in range(nrows):
    for j in range(ncols):
        # add node for each grid
        # if the node flows out of the huc4 basin, skip
        if flow_dir_array_huc4[i, j] == -1:
            continue
        
        G.add_node((i, j), 
                   flow_dir=flow_dir_array_huc4[i, j], grid_id=grid_id_array_huc4[i, j], grid_lat=lat_array_huc4[i], grid_lon=lon_array_huc4[j])

        # determine downstream grids and add edges
        downstream_grid_ij = get_downstream_cell(i, j, flow_dir_array_huc4[i, j])

        if downstream_grid_ij is not None and flow_dir_array_huc4[downstream_grid_ij] != -1:
            # if downstream grid is not None AND the downstream grid is not outside the huc4 basin
            G.add_edge((i, j), downstream_grid_ij)

# typological sorting
sorted_grid_list = list(nx.topological_sort(G))

# store the sorted grid id and upstream grid id for each grid: {grid: [upstream grid list]}
# the grid order is following the topological sorting
upstream_grid_dict = {grid: [] for grid in sorted_grid_list}
for grid in sorted_grid_list:
    upstream_grid_dict[grid] = list(G.predecessors(grid))
# convert the grid index in upstream_grid_dict to grid id (the grid attribute in the node)
upstream_grid_id_dict = {G.nodes[grid]['grid_id']: [G.nodes[upstream_grid]['grid_id'] for upstream_grid in upstream_grid_dict[grid]] for grid in upstream_grid_dict}


In [None]:
G = nx.DiGraph()
nodes = [(0,0), (0,1), (1,0), (2,0), (1,1)]
G.add_nodes_from(nodes)
edges = [((0,0), (1,0)), ((0,1), (1,0)), ((1,0), (2,0)), ((1,1), (2,0))]
G.add_edges_from(edges)

# typological sorting
sorted_grid_list = list(nx.topological_sort(G))

# store the sorted grid id and upstream grid id for each grid: {grid: [upstream grid list]}
# the grid order is following the topological sorting
upstream_grid_dict = {grid: [] for grid in sorted_grid_list}
for grid in sorted_grid_list:
    upstream_grid_dict[grid] = list(G.predecessors(grid))

upstream_grid_dict

In [None]:
# ---- Visualize the graph on the map ---- #

nodes_data = pd.DataFrame.from_dict(dict(G.nodes(data=True)), orient='index')
geometry = [Point(xy) for xy in zip(nodes_data['grid_lon'], nodes_data['grid_lat'])]
gdf_nodes = gpd.GeoDataFrame(nodes_data, geometry=geometry, crs='EPSG:4326')


# ------------------- Plot ------------------- #
# Variables to store components of arrows
x = []
y = []
dx = []
dy = []

default_lon = lon_array_huc4[0]
default_lat = lat_array_huc4[0]
for edge in G.edges():
    start_node = G.nodes[edge[0]]
    end_node = G.nodes[edge[1]]
    try:
        x_start, y_start = start_node['grid_lon'], start_node['grid_lat']
        x_end, y_end = end_node['grid_lon'], end_node['grid_lat']
    except KeyError:
        x_start, y_start = default_lon, default_lat
        x_end, y_end = default_lon, default_lat

    x.append(x_start)
    y.append(y_start)
    dx.append(x_end - x_start)
    dy.append(y_end - y_start)

fig, ax = plt.subplots(figsize=(10, 10))

# Plot arrows
ax.quiver(x, y, dx, dy, angles='xy', scale_units='xy', scale=1, color='gray')

# Plot nodes
gdf_nodes.plot(ax=ax, marker='o', color='black', alpha=0.7, markersize=10)

gdf_huc4.plot(ax=ax, facecolor='none', edgecolor='tab:gray', linewidth=1)

# plot flow line
# specify to which stream order
max_order = gdf_flow_huc4['StreamOrde'].max()
min_order_to_keep = 4
gdf_flow_huc4.loc[gdf_flow_huc4['StreamOrde']>=min_order_to_keep].plot(ax=ax, linewidth=1)

plt.show()



In [None]:
grid = np.zeros((3, 5), dtype=[
    ('var_1', int),
    ('var_2', float),
    ('var_3', str)
])

grid = np.zeros((3, 5), dtype=[('grid_id', int), ('elevation', float), ('slope', float), ('runoff', float), ('inflow', float), ('outflow', float), ('storage', float), ('has_reservoir', bool), ('reservoir_id', 'U10')])


grid

# Combine Daily NLDAS Runoff .nc to Single Input File

In [None]:
# ---- Read NLDAS daily runoff data & combine to a single file ---- #

nldas_runoff_dir = f'{data_dir}/processed/nldas_daily'

nc_files_list = [f for f in os.listdir(nldas_runoff_dir) if f.endswith('.nc')]
date_list = pd.to_datetime([f.split('.')[0] for f in nc_files_list]).sort_values().tolist()

# get basic dimension info from the first nc file
with nc.Dataset(f'{nldas_runoff_dir}/{nc_files_list[0]}') as daily_runoff:
    lat_array_conus = daily_runoff.variables['lat'][:]
    lon_array_conus = daily_runoff.variables['lon'][:]

# create a new nc dataset
combined_nc_file_path = f'{data_dir}/processed/LRR/input/nldas_runoff.nc'
with nc.Dataset(combined_nc_file_path, 'w') as combined_nc:
    # create dimensions
    combined_nc.createDimension('time', len(date_list))
    combined_nc.createDimension('lat', len(lat_array_conus))
    combined_nc.createDimension('lon', len(lon_array_conus))

    # create variables
    combined_nc.createVariable('time', 'S10', ('time',))
    combined_nc.createVariable('lat', 'f4', ('lat',))
    combined_nc.createVariable('lon', 'f4', ('lon',))
    combined_nc.createVariable('Qs', 'f8', ('time', 'lat', 'lon',))
    combined_nc.createVariable('Qsb', 'f8', ('time', 'lat', 'lon',))

    # variable attributes
    combined_nc.variables['time'].units = 'none'
    combined_nc.variables['time'].long_name = 'string datetime (yyyy-mm-dd) from 1980-01-01 to 2019-12-31'
    combined_nc.variables['lat'].long_name = 'latitude'
    combined_nc.variables['lon'].long_name = 'longitude'
    combined_nc.variables['Qs'].long_name = 'surface runoff (mm/d)'
    combined_nc.variables['Qsb'].long_name = 'baseflow (mm/d)'

    # write data
    combined_nc.variables['time'][:] = np.array(date_list).astype('datetime64[D]').astype(str)
    combined_nc.variables['lat'][:] = lat_array_conus
    combined_nc.variables['lon'][:] = lon_array_conus

    # write runoff data from each nc file
    for i, date in enumerate(date_list):
        if i % 100 == 0:
            print(f'Processing {date}')
        year = date.year
        month = f'{date.month:02d}'
        day = f'{date.day:02d}'

        nc_file_path = f'{nldas_runoff_dir}/{year}{month}{day}.nc'
        with nc.Dataset(nc_file_path) as daily_runoff:
            # PLEASE NOTE FOR THE RUNOFF DATA
            # The runoff data in the original nc files are hourly average values for each day (mm/hr)
            # So, here, I need to multiply the values by 24 to get the daily values (mm/day)
            combined_nc.variables['Qs'][date_list.index(date), :, :] = daily_runoff.variables['Qs'][:] * 24    # convert from mm/hr to mm/day
            combined_nc.variables['Qsb'][date_list.index(date), :, :] = daily_runoff.variables['Qsb'][:] * 24    # convert from mm/hr to mm/day

In [None]:
# ---- Test the runoff data ---- #

with nc.Dataset(combined_nc_file_path) as conus_runoff:
    # Qs = conus_runoff.variables['Qs'][:]
    # Qsb = conus_runoff.variables['Qsb'][:]

    time = conus_runoff.variables['time'][:]

start_date = '1980-01-01'
end_date = '1980-12-31'
start_date_index = np.where(time==start_date)[0][0]
end_date_index = np.where(time==end_date)[0][0]

time_sub = time[start_date_index:end_date_index+1]

# Prepare Observed Storage for Assimilation
Concat all storage series of the 452 reservoirs

In [57]:
res_ts_dir = '/Users/donghui/Box Sync/Research/PhD/Projects/DROM_CONUS_Analysis/Data/HydroShare/data_training'
res_meta_file = f'/Users/donghui/Box Sync/Research/PhD/Projects/DROM_CONUS_Analysis/Data/HydroShare/reservoir_metadata.csv'

df_res_meta = pd.read_csv(res_meta_file) 

# Initialize a dataframe
df_storage_all = pd.DataFrame()

res_ts_files = [f for f in os.listdir(res_ts_dir) if f.endswith('.csv')]
for res_ts_file in res_ts_files:
    gid = res_ts_file.split('.')[0]
    df_res_i = pd.read_csv(f'{res_ts_dir}/{res_ts_file}')
    df_res_i['Time'] = pd.to_datetime(df_res_i['Time'])
    df_res_i.set_index('Time', inplace=True)

    # remove duplicate indices
    df_res_i = df_res_i[~df_res_i.index.duplicated(keep='first')]

    # use metadata to convert normalized storage to actual storage
    max_storage = df_res_meta.loc[df_res_meta['ID']==int(gid), 'Maximum Storage'].values[0]
    df_res_i['Storage'] = df_res_i['Storage'] * max_storage

    # merge to the combined dataframe
    # rename for merge
    df_res_i.rename(columns={'Storage': gid}, inplace=True)
    df_storage_all = df_storage_all.merge(df_res_i[gid], how='outer', left_index=True, right_index=True)

    # replace all NaN with -9999
    df_storage_all.fillna(-9999, inplace=True)

In [58]:
df_storage_all.to_csv(f'{data_dir}/processed/LRR/input/reservoir_storage.csv')

In [28]:
df = pd.read_csv(f'{data_dir}/processed/LRR/input/reservoir_storage.csv', index_col=0)
df.index = pd.to_datetime(df.index)

In [66]:
def read_storage_assimilation(storage_file_path: str) -> pd.DataFrame:
    """
    Read the reservoir storage time series file for assimilation.

    Returns
    -------
    df_storage_assimilation : pd.DataFrame
        index: Time [pd.Timestamp]
        cols: 'Reservoir gid' - reservoir storage [acft]
    """

    df_storage_assimilation = pd.read_csv(storage_file_path, index_col=0)
    df_storage_assimilation.index = pd.to_datetime(df_storage_assimilation.index)
    return df_storage_assimilation

storage_file_path = f'{data_dir}/processed/LRR/input/reservoir_storage.csv'
df_storage_assimilation = read_storage_assimilation(storage_file_path)


In [71]:
for col in df_storage_assimilation.columns:
    smin = df_storage_assimilation.loc[df_storage_assimilation[col]>=0, col].min()
    smax = df_storage_assimilation.loc[df_storage_assimilation[col]>=0, col].max()
    print(f'{col}: {smin/smax}')

545: 0.0762488233206277
223: 0.22556079794718978
1774: 0.2352372386115238
7311: 0.0281205561968869
1006: 0.49457610435653815
592: 0.0516711833785004
753: 0.41810073660862757
784: 0.0852216748768472
948: 0.25041369
974: 0.0003060042403444
169: 0.2329598398213392
97: 0.06568181818181809
182: 0.139683478501
431: 0.11689986459475143
1827: 0.698705925827796
1833: 0.3814427220325704
419: 0.008830716376744884
1600: 0.0388928546756626
1615: 0.266118186852306
41: 0.0
55: 0.0451094306365261
168: 0.7720074876049783
975: 0.2808438737664546
961: 0.3371327824400073
1761: 0.3990247112150899
1007: 0.0436563462814423
1775: 0.4112100046985274
546: 0.0
1763: 0.4476906935732303
7306: 0.915145360863515
585: 0.07172418534304961
963: 0.029205057502185695
80: 0.2603775989999999
94: 0.2520940341120013
57: 0.0783933676194443
1818: 0.3425645154737413
1824: 0.5758326789578165
368: 0.0
1171: 0.20341685899999998
1617: 0.3775576567111911
396: 0.0045481336099235
1164: 0.263883944
1170: 0.239491223
382: 0.007471018695

In [37]:
start_date = '1980-01-01'
end_date = '1980-12-31'

t = 100
date = pd.date_range(start_date, end_date)[t]

In [41]:
date.month == 4 and date.day == 10

True

In [62]:
date = pd.to_datetime('2003-01-20')

In [63]:
df_storage_assimilation.loc[date, '99']

Time
2003-01-20    3301.0
2003-01-20    3301.0
Name: 99, dtype: float64

In [43]:
df_storage_assimilation.columns

Index(['545', '223', '1774', '7311', '1006', '592', '753', '784', '948', '974',
       ...
       '1619', '1194', '372', '616', '789', '1023', '1037', '7308', '574',
       '1786'],
      dtype='object', length=452)