In [1]:
from dask.distributed import Client, LocalCluster
from nwm import NWM
from tqdm import tqdm
import pandas as pd
import os
import sys
import logging

In [2]:
start_date_str = "2000-01-01"
end_date_str = "2022-12-31"

comids_file = os.path.join("data", "comid_huc8_boundary_conditions.csv")

comids_hucs = pd.read_csv(comids_file)
comids_hucs

Unnamed: 0,comid,huc8_from,huc8_to
0,721640,1010002,01010008
1,816963,1010010,
2,817437,1010005,
3,818525,1010011,
4,1025266,1030001,01030003
...,...,...,...
2107,948090631,18090203,
2108,948090648,18090202,18090203
2109,948091000,18090204,
2110,948091293,18090207,


In [3]:
n_workers = 10
threads_per_worker = 2
processes = False

In [4]:
scheduler = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker, processes=processes)

In [5]:
test_comids = comids_hucs.iloc[3:5].comid.to_list()
test_comids

[818525, 1025266]

In [6]:
logging.basicConfig(format='%(asctime)s | %(levelname)s : %(message)s', level=logging.INFO, stream=sys.stdout)


In [7]:
import s3fs
import zarr
import xarray as xr
import numpy as np
import dask
import datetime

NWM_URL = "s3://noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr"
NWM_VARIABLES = ["streamflow", "velocity"]

start_date = datetime.datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.datetime.strptime(end_date_str, "%Y-%m-%d")

client = Client(scheduler)
client

2023-01-11 09:00:49,286 | INFO : Receive client connection: Client-5a43bf5a-91b8-11ed-bb34-08d23e7f709b
2023-01-11 09:00:49,294 | INFO : Starting established connection to inproc://192.168.1.65/310068/24


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://192.168.1.65:8787/status,

0,1
Dashboard: http://192.168.1.65:8787/status,Workers: 10
Total threads: 20,Total memory: 127.76 GiB
Status: running,Using processes: False

0,1
Comm: inproc://192.168.1.65/310068/1,Workers: 10
Dashboard: http://192.168.1.65:8787/status,Total threads: 20
Started: Just now,Total memory: 127.76 GiB

0,1
Comm: inproc://192.168.1.65/310068/4,Total threads: 2
Dashboard: http://192.168.1.65:62286/status,Memory: 12.78 GiB
Nanny: None,
Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-oa5__heb,Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-oa5__heb

0,1
Comm: inproc://192.168.1.65/310068/5,Total threads: 2
Dashboard: http://192.168.1.65:62287/status,Memory: 12.78 GiB
Nanny: None,
Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-fqq2esom,Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-fqq2esom

0,1
Comm: inproc://192.168.1.65/310068/6,Total threads: 2
Dashboard: http://192.168.1.65:62288/status,Memory: 12.78 GiB
Nanny: None,
Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-ki2rpxr4,Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-ki2rpxr4

0,1
Comm: inproc://192.168.1.65/310068/7,Total threads: 2
Dashboard: http://192.168.1.65:62289/status,Memory: 12.78 GiB
Nanny: None,
Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-gy61blqz,Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-gy61blqz

0,1
Comm: inproc://192.168.1.65/310068/8,Total threads: 2
Dashboard: http://192.168.1.65:62290/status,Memory: 12.78 GiB
Nanny: None,
Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-2hujtrh5,Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-2hujtrh5

0,1
Comm: inproc://192.168.1.65/310068/9,Total threads: 2
Dashboard: http://192.168.1.65:62291/status,Memory: 12.78 GiB
Nanny: None,
Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-feqnej3p,Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-feqnej3p

0,1
Comm: inproc://192.168.1.65/310068/10,Total threads: 2
Dashboard: http://192.168.1.65:62292/status,Memory: 12.78 GiB
Nanny: None,
Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-7wwayatp,Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-7wwayatp

0,1
Comm: inproc://192.168.1.65/310068/11,Total threads: 2
Dashboard: http://192.168.1.65:62293/status,Memory: 12.78 GiB
Nanny: None,
Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-zaymnk1g,Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-zaymnk1g

0,1
Comm: inproc://192.168.1.65/310068/12,Total threads: 2
Dashboard: http://192.168.1.65:62294/status,Memory: 12.78 GiB
Nanny: None,
Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-omn8ab7v,Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-omn8ab7v

0,1
Comm: inproc://192.168.1.65/310068/13,Total threads: 2
Dashboard: http://192.168.1.65:62295/status,Memory: 12.78 GiB
Nanny: None,
Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-iyuds68u,Local directory: C:\Users\dsmith\AppData\Local\Temp\dask-worker-space\worker-iyuds68u


In [8]:
%%time
s3 = s3fs.S3FileSystem(anon=True)
store = s3fs.S3Map(root=NWM_URL, s3=s3, check=False)
ds = xr.open_zarr(store=store, consolidated=True)
ds

CPU times: total: 11.2 s
Wall time: 42.7 s


Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 10.59 MiB 10.59 MiB Shape (2776738,) (2776738,) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,39.72 MiB,39.72 MiB
Shape,"(2776738,)","(2776738,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,|S15 numpy.ndarray,|S15 numpy.ndarray
"Array Chunk Bytes 39.72 MiB 39.72 MiB Shape (2776738,) (2776738,) Dask graph 1 chunks in 2 graph layers Data type |S15 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,39.72 MiB,39.72 MiB
Shape,"(2776738,)","(2776738,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,|S15 numpy.ndarray,|S15 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 10.59 MiB 10.59 MiB Shape (2776738,) (2776738,) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 10.59 MiB 10.59 MiB Shape (2776738,) (2776738,) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray
"Array Chunk Bytes 10.59 MiB 10.59 MiB Shape (2776738,) (2776738,) Dask graph 1 chunks in 2 graph layers Data type int32 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.42 TiB,153.81 MiB
Shape,"(367439, 2776738)","(672, 30000)"
Dask graph,50871 chunks in 2 graph layers,50871 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.42 TiB 153.81 MiB Shape (367439, 2776738) (672, 30000) Dask graph 50871 chunks in 2 graph layers Data type float64 numpy.ndarray",2776738  367439,

Unnamed: 0,Array,Chunk
Bytes,7.42 TiB,153.81 MiB
Shape,"(367439, 2776738)","(672, 30000)"
Dask graph,50871 chunks in 2 graph layers,50871 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.42 TiB,153.81 MiB
Shape,"(367439, 2776738)","(672, 30000)"
Dask graph,50871 chunks in 2 graph layers,50871 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.42 TiB 153.81 MiB Shape (367439, 2776738) (672, 30000) Dask graph 50871 chunks in 2 graph layers Data type float64 numpy.ndarray",2776738  367439,

Unnamed: 0,Array,Chunk
Bytes,7.42 TiB,153.81 MiB
Shape,"(367439, 2776738)","(672, 30000)"
Dask graph,50871 chunks in 2 graph layers,50871 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [9]:
%%time
valid_comids = []
invalid_comids = []
exceptions = {}
found_comids = {}
for comid in comids_hucs.comid.to_list():
    try:
        _comid_test = ds.sel(feature_id=comid, method='nearest')
        found_comid = int(_comid_test.feature_id)
        valid_comids.append(comid)
        if found_comid != comid:
            found_comids[comid] = found_comid
    except Exception as e:
        exceptions[comid] = e
        invalid_comids.append(comid)
print(f"Valid COMIDs: {len(valid_comids)}, Invalid COMIDs: {len(invalid_comids)}, Nearest COMIDs: {len(found_comids)}")

Valid COMIDs: 2112, Invalid COMIDs: 0, Nearest COMIDs: 22
CPU times: total: 41.3 s
Wall time: 48.5 s


In [10]:
selected_comids = valid_comids[0:2]
selected_comids

[721640, 816963]

In [11]:
%%time
with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ds_feature = ds[NWM_VARIABLES].sel(
        time=slice(f"{start_date.year}-{start_date.month}-{start_date.day}", f"{end_date.year}-{end_date.month}-{end_date.day}")
    ).sel(
        feature_id=selected_comids, method='nearest'
    ).load(optimize_graph=True, traverse=False)

2023-01-11 09:04:15,949 | INFO : full garbage collection released 47.71 MiB from 1377 reference cycles (threshold: 9.54 MiB)
CPU times: total: 6min 56s
Wall time: 4min 14s


In [12]:
ds_feature

In [14]:
ds_df = ds_feature.to_dataframe()
ds_df

Unnamed: 0_level_0,Unnamed: 1_level_0,streamflow,velocity,elevation,gage_id,latitude,longitude,order
time,feature_id,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2000-01-01 00:00:00,721640,19.380000,0.77,192.559998,b' 01011000',47.073555,-69.068634,5
2000-01-01 00:00:00,816965,1.270000,0.57,98.980003,b' ',46.448925,-67.787590,2
2000-01-01 01:00:00,721640,19.330000,0.77,192.559998,b' 01011000',47.073555,-69.068634,5
2000-01-01 01:00:00,816965,1.270000,0.57,98.980003,b' ',46.448925,-67.787590,2
2000-01-01 02:00:00,721640,19.250000,0.77,192.559998,b' 01011000',47.073555,-69.068634,5
...,...,...,...,...,...,...,...,...
2020-12-31 21:00:00,816965,4.920000,0.91,98.980003,b' ',46.448925,-67.787590,2
2020-12-31 22:00:00,721640,69.359998,1.36,192.559998,b' 01011000',47.073555,-69.068634,5
2020-12-31 22:00:00,816965,4.890000,0.91,98.980003,b' ',46.448925,-67.787590,2
2020-12-31 23:00:00,721640,69.129998,1.36,192.559998,b' 01011000',47.073555,-69.068634,5


In [31]:
comid_df = ds_df.query(f"feature_id=={selected_comids[0]}")
comid_df

Unnamed: 0_level_0,Unnamed: 1_level_0,streamflow,velocity,elevation,gage_id,latitude,longitude,order
time,feature_id,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2000-01-01 00:00:00,721640,19.380000,0.77,192.559998,b' 01011000',47.073555,-69.068634,5
2000-01-01 01:00:00,721640,19.330000,0.77,192.559998,b' 01011000',47.073555,-69.068634,5
2000-01-01 02:00:00,721640,19.250000,0.77,192.559998,b' 01011000',47.073555,-69.068634,5
2000-01-01 03:00:00,721640,19.220000,0.77,192.559998,b' 01011000',47.073555,-69.068634,5
2000-01-01 04:00:00,721640,19.220000,0.77,192.559998,b' 01011000',47.073555,-69.068634,5
...,...,...,...,...,...,...,...,...
2020-12-31 19:00:00,721640,70.019998,1.36,192.559998,b' 01011000',47.073555,-69.068634,5
2020-12-31 20:00:00,721640,69.759998,1.36,192.559998,b' 01011000',47.073555,-69.068634,5
2020-12-31 21:00:00,721640,69.509998,1.36,192.559998,b' 01011000',47.073555,-69.068634,5
2020-12-31 22:00:00,721640,69.359998,1.36,192.559998,b' 01011000',47.073555,-69.068634,5


In [33]:
comid_csv = comid_df.to_csv()

In [39]:
import sys
sys.getsizeof(comid_csv)

63087750

In [40]:
comid_json = comid_df.to_json()
sys.getsizeof(comid_json)

74697725

In [48]:
min_comid_df = comid_df[["streamflow", "velocity"]]
min_comid_df

Unnamed: 0_level_0,Unnamed: 1_level_0,streamflow,velocity
time,feature_id,Unnamed: 2_level_1,Unnamed: 3_level_1
2000-01-01 00:00:00,721640,19.380000,0.77
2000-01-01 01:00:00,721640,19.330000,0.77
2000-01-01 02:00:00,721640,19.250000,0.77
2000-01-01 03:00:00,721640,19.220000,0.77
2000-01-01 04:00:00,721640,19.220000,0.77
...,...,...,...
2020-12-31 19:00:00,721640,70.019998,1.36
2020-12-31 20:00:00,721640,69.759998,1.36
2020-12-31 21:00:00,721640,69.509998,1.36
2020-12-31 22:00:00,721640,69.359998,1.36


In [49]:
output_test_path = os.path.join("data", f"{selected_comids[0]}.h5")
min_comid_df.to_hdf(output_test_path, key="df", mode='w')

In [50]:
h5_read = pd.read_hdf(output_test_path)
h5_read

Unnamed: 0_level_0,Unnamed: 1_level_0,streamflow,velocity
time,feature_id,Unnamed: 2_level_1,Unnamed: 3_level_1
2000-01-01 00:00:00,721640,19.380000,0.77
2000-01-01 01:00:00,721640,19.330000,0.77
2000-01-01 02:00:00,721640,19.250000,0.77
2000-01-01 03:00:00,721640,19.220000,0.77
2000-01-01 04:00:00,721640,19.220000,0.77
...,...,...,...
2020-12-31 19:00:00,721640,70.019998,1.36
2020-12-31 20:00:00,721640,69.759998,1.36
2020-12-31 21:00:00,721640,69.509998,1.36
2020-12-31 22:00:00,721640,69.359998,1.36


In [53]:
import sqlite3
db_file = os.path.join("data", "test-database.sqlite")
conn = sqlite3.connect(db_file)

In [54]:
min_comid_df.to_sql('data', con=conn, if_exists='append')

184104

In [59]:
_comid_df = min_comid_df.reset_index()
_comid_df = _comid_df.set_index(["time"])
_comid_df

Unnamed: 0_level_0,feature_id,streamflow,velocity
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2000-01-01 00:00:00,721640,19.380000,0.77
2000-01-01 01:00:00,721640,19.330000,0.77
2000-01-01 02:00:00,721640,19.250000,0.77
2000-01-01 03:00:00,721640,19.220000,0.77
2000-01-01 04:00:00,721640,19.220000,0.77
...,...,...,...
2020-12-31 19:00:00,721640,70.019998,1.36
2020-12-31 20:00:00,721640,69.759998,1.36
2020-12-31 21:00:00,721640,69.509998,1.36
2020-12-31 22:00:00,721640,69.359998,1.36


In [60]:
%%time
daily_streamflow = _comid_df["streamflow"] .resample("D").sum()
daily_streamflow

CPU times: total: 15.6 ms
Wall time: 19 ms


time
2000-01-01     455.579990
2000-01-02     440.469990
2000-01-03     430.539990
2000-01-04     419.819991
2000-01-05     407.599991
                 ...     
2020-12-27    2119.979953
2020-12-28    2229.829950
2020-12-29    1944.559957
2020-12-30    1844.949959
2020-12-31    1721.569962
Freq: D, Name: streamflow, Length: 7671, dtype: float64

In [61]:
%%time
daily_velocity = _comid_df["velocity"] .resample("D").mean()
daily_velocity

CPU times: total: 15.6 ms
Wall time: 6.01 ms


time
2000-01-01    0.763333
2000-01-02    0.755000
2000-01-03    0.750000
2000-01-04    0.742083
2000-01-05    0.736250
                ...   
2020-12-27    1.417500
2020-12-28    1.430417
2020-12-29    1.396250
2020-12-30    1.384583
2020-12-31    1.368750
Freq: D, Name: velocity, Length: 7671, dtype: float64

In [64]:
min_comid_df.shape[0]

184104

In [75]:
ds_df.index.unique(level=1).to_list()

[721640, 816965]