In [1]:
import os
import fsspec
from datetime import datetime, timedelta

import orjson
from kerchunk.hdf import SingleHdf5ToZarr 
from kerchunk.combine import MultiZarrToZarr
import xarray as xr
import dask

from pathlib import Path
import numpy as np

import matplotlib.pyplot as plt
import pandas as pd
import geopandas as gpd

import matplotlib as mpl
import matplotlib.style as mplstyle
from glob import glob

import matplotlib.dates as mdates
from multiprocessing import Pool

import cv2
import imutils

## Enter model run hour (00, 06, 12, 18)

In [2]:
hourlyrun='00'

# Prepare to create the json files - one for each netcdf file

In [3]:
fs = fsspec.filesystem('s3', anon='True', skip_instance_cache=True)

In [4]:
latest = sorted(fs.glob('s3://noaa-nwm-pds/nwm*'))[-1]

In [5]:
urls1 = sorted(fs.glob(f's3://{latest}/medium_range_mem1/nwm.t{hourlyrun}z.medium_range.channel*.nc'))
urls2 = sorted(fs.glob(f's3://{latest}/medium_range_mem2/nwm.t{hourlyrun}z.medium_range.channel*.nc'))
urls3 = sorted(fs.glob(f's3://{latest}/medium_range_mem3/nwm.t{hourlyrun}z.medium_range.channel*.nc'))
urls4 = sorted(fs.glob(f's3://{latest}/medium_range_mem4/nwm.t{hourlyrun}z.medium_range.channel*.nc'))
urls5 = sorted(fs.glob(f's3://{latest}/medium_range_mem5/nwm.t{hourlyrun}z.medium_range.channel*.nc'))
urls6 = sorted(fs.glob(f's3://{latest}/medium_range_mem6/nwm.t{hourlyrun}z.medium_range.channel*.nc'))
urls7 = sorted(fs.glob(f's3://{latest}/medium_range_mem7/nwm.t{hourlyrun}z.medium_range.channel*.nc'))

In [6]:
urls1=urls1[0:204]

In [7]:
urls11=urls1[0:102]
urls12=urls1[102:204]
urls21=urls2[0:102]
urls22=urls2[102:204]
urls31=urls3[0:102]
urls32=urls3[102:204]
urls41=urls4[0:102]
urls42=urls4[102:204]
urls51=urls5[0:102]
urls52=urls5[102:204]
urls61=urls6[0:102]
urls62=urls6[102:204]
urls71=urls7[0:102]
urls72=urls7[102:204]

In [8]:
json_dir11 = './jsons11/'
json_dir12 = './jsons12/'
json_dir21 = './jsons21/'
json_dir22 = './jsons22/'
json_dir31 = './jsons31/'
json_dir32 = './jsons32/'
json_dir41 = './jsons41/'
json_dir42 = './jsons42/'
json_dir51 = './jsons51/'
json_dir52 = './jsons52/'
json_dir61 = './jsons61/'
json_dir62 = './jsons62/'
json_dir71 = './jsons71/'
json_dir72 = './jsons72/'

In [9]:
import os
mydirs = ['./jsons11/', './jsons12/', './jsons21/', './jsons22/', './jsons31/', './jsons32/',
         './jsons41/', './jsons42/', './jsons51/', './jsons52/', './jsons61/', './jsons62/',
         './jsons71/', './jsons72/',]
for items in mydirs:
    if not os.path.exists(items):
        os.mkdir(items)

In [10]:
def delfiles(name):
    dir = name
    filelist = glob(os.path.join(dir, "*"))
    for f in filelist:
        os.remove(f)

for d in mydirs:
    delfiles(d)

In [11]:
json_dirs = [json_dir11, json_dir12, json_dir21, json_dir22, json_dir31, json_dir32, json_dir41,
            json_dir42, json_dir51, json_dir52, json_dir61, json_dir62, json_dir71, json_dir72]

## Spin up a dask cluster and write out the jsons

In [12]:
def gen_json(u, json_dir):
    fstem = Path(u).stem 
    outf = f'{json_dir}{fstem}.json'
    with fs.open(u, **so) as infile:
        h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
        with open(outf, 'wb') as f:
            f.write(orjson.dumps(h5chunks.translate()));

In [13]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=36) 
client = Client(cluster)  

In [14]:
urls11 = [f"s3://{url}" for url in urls11]
urls12 = [f"s3://{url}" for url in urls12]
urls21 = [f"s3://{url}" for url in urls21]
urls22 = [f"s3://{url}" for url in urls22]
urls31 = [f"s3://{url}" for url in urls31]
urls32 = [f"s3://{url}" for url in urls32]
urls41 = [f"s3://{url}" for url in urls41]
urls42 = [f"s3://{url}" for url in urls42]
urls51 = [f"s3://{url}" for url in urls51]
urls52 = [f"s3://{url}" for url in urls52]
urls61 = [f"s3://{url}" for url in urls61]
urls62 = [f"s3://{url}" for url in urls62]
urls71 = [f"s3://{url}" for url in urls71]
urls72 = [f"s3://{url}" for url in urls72]

In [15]:
urls = [urls11, urls12, urls21, urls22, urls31, urls32, urls41, urls42, urls51, urls52, urls61, urls62, urls71, urls72]

In [16]:
so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')

In [17]:
a = [dask.delayed(gen_json)(u,json_dirs[0]) for u in urls[0]] 
aa = [dask.delayed(gen_json)(u,json_dirs[1]) for u in urls[1]] 
b = [dask.delayed(gen_json)(u,json_dirs[2]) for u in urls[2]] 
bb = [dask.delayed(gen_json)(u,json_dirs[3]) for u in urls[3]] 
c = [dask.delayed(gen_json)(u,json_dirs[4]) for u in urls[4]] 
cc = [dask.delayed(gen_json)(u,json_dirs[5]) for u in urls[5]] 
d = [dask.delayed(gen_json)(u,json_dirs[6]) for u in urls[6]]
dd = [dask.delayed(gen_json)(u,json_dirs[7]) for u in urls[7]] 
e = [dask.delayed(gen_json)(u,json_dirs[8]) for u in urls[8]] 
ee = [dask.delayed(gen_json)(u,json_dirs[9]) for u in urls[9]] 
f = [dask.delayed(gen_json)(u,json_dirs[10]) for u in urls[10]] 
ff = [dask.delayed(gen_json)(u,json_dirs[11]) for u in urls[11]] 
g = [dask.delayed(gen_json)(u,json_dirs[12]) for u in urls[12]] 
gg = [dask.delayed(gen_json)(u,json_dirs[13]) for u in urls[13]]

In [18]:
%%time
_ = dask.compute(*a, *aa, *b, *bb, *c, *cc, *d, *dd, *e, *ee, *f, *ff, *g, *gg)

CPU times: user 32.4 s, sys: 9.59 s, total: 42 s
Wall time: 2min 56s


In [19]:
client.close() 
client.shutdown()

## Create 7 lists of json files - one for each ensemble member

In [20]:
flist1 = sorted(glob('./jsons11/*.json'))
flist2 = sorted(glob('./jsons12/*.json'))
flist3 = sorted(glob('./jsons21/*.json'))
flist4 = sorted(glob('./jsons22/*.json'))
flist5 = sorted(glob('./jsons31/*.json'))
flist6 = sorted(glob('./jsons32/*.json'))
flist7 = sorted(glob('./jsons41/*.json'))
flist8 = sorted(glob('./jsons42/*.json'))
flist9 = sorted(glob('./jsons51/*.json'))
flist10 = sorted(glob('./jsons52/*.json'))
flist11 = sorted(glob('./jsons61/*.json'))
flist12 = sorted(glob('./jsons62/*.json'))
flist13 = sorted(glob('./jsons71/*.json'))
flist14 = sorted(glob('./jsons72/*.json'))

In [21]:
fl1 = flist1+flist2
fl2 = flist3+flist4
fl3 = flist5+flist6
fl4 = flist7+flist8
fl5 = flist9+flist10
fl6 = flist11+flist12
fl7 = flist13+flist14

In [22]:
flist = [fl1, fl2, fl3, fl4, fl5, fl6, fl7]

## Get the feature ids for the gauge sites and merge them with the lat/lon info from the river shapefile

In [23]:
#sfile = gpd.read_file('./test/shapefile/GRR/nwm_reaches_conus_2.shp')
sfile = gpd.read_file('http://weather.eas.cmich.edu/nwm/shp/grr_shp.zip')
#allfeats=sfile['feature_id'].to_list()
fids=pd.read_parquet('http://weather.eas.cmich.edu/nwm/feature_ids.parquet')
fids['gage_id'] = fids['gage_id'].str.decode('utf-8')
fids = fids.rename(columns={'gage_id': 'ID'})
feats=fids['feature_id'].to_list()
fids.index.rename('None', inplace=True)
merged=pd.merge(sfile,fids, on='feature_id', how='inner')

## Create a single json file for each ensemble member, use jsons to create a list of 7 xarrays

In [24]:
def getdata(files, memnum):

    global times
    
    mzz = MultiZarrToZarr(
        files, 
        remote_protocol='s3',
        remote_options={'anon':True},
        xarray_open_kwargs={
            'decode_cf' : False,
            'mask_and_scale' : False,
            'decode_times' : False,
            'use_cftime' : False,
            'drop_variables': ['reference_time', 'crs'],
            'decode_coords' : False
        },
        xarray_concat_args={
            "join": "override",
            "combine_attrs": "override",
            "dim": "time"
        }
    )

    rpath = '/home/baxte1ma/notebooks/Real-time NWM/nwm_medium'+str(memnum)+'.json'
    
    if os.path.isfile(rpath):
        os.remove(rpath)
    else:    
        pass
    
    local_consolidated_json = rpath
    mzz.translate(local_consolidated_json)

    s_opts = {'requester_pays':True, 'skip_instance_cache':True}
    r_opts = {'anon':True}
    fs = fsspec.filesystem("reference", fo=rpath, ref_storage_args=s_opts,
                       remote_protocol='s3', remote_options=r_opts)
    m = fs.get_mapper("")
    ds = xr.open_dataset(m, engine="zarr", backend_kwargs={'consolidated':False}, chunks=None)
    times=ds['time']
    dsnew = ds['streamflow']
    dsnew2 = dsnew.sel(feature_id=feats)

    return(dsnew2)

In [25]:
%%time
dsall=[]

for i in range(0,7):
    ds=getdata(flist[i],i)
    dsall.append(ds)

CPU times: user 10.2 s, sys: 623 ms, total: 10.8 s
Wall time: 13.2 s


## Combine the 7 xarrays into one xarray

In [26]:
%%time

dsout = xr.concat(dsall, dim= 'number')
nmems = len(dsout.number.values)
dsout['number'] = np.arange(0,nmems)

CPU times: user 1min 3s, sys: 4.42 s, total: 1min 7s
Wall time: 1min 29s


## Output the NWM forecast time weighted ensemble mean to pandas and a parquet file

In [43]:
mean = (dsout[0,:,:]*.35)+(dsout[1,:,:]*.25)+(dsout[2,:,:]*.15)+(dsout[3,:,:]*.10)+\
    (dsout[4,:,:]*.05)+(dsout[5,:,:]*.05)+(dsout[6,:,:]*.05)

In [58]:
mean_df=mean.to_dataframe()
mean_df.drop(['number'], axis=1, inplace=True)

In [60]:
mean_df.to_parquet('realtimeNWMmean.parquet')

## Output the NWM forecast data to pandas and a parquet file

In [27]:
dsout_df=dsout.to_dataframe()
dsout_df.to_parquet('realtimeNWM.parquet')

## Get model output times

In [29]:
runtime = times[0] - np.timedelta64(1, 'h')
yest = datetime.today() - timedelta(hours = 1 )
ts = pd.to_datetime(str(runtime.values)) 
d=ts.strftime('NWM %m-%d-%y %H UTC Forecast for')
timesnew=pd.to_datetime(times.values)

## Get station names

In [30]:
stns=pd.read_csv('http://weather.eas.cmich.edu/nwm/stationlatlon.csv')
fids = fids.astype({"ID":int})
merge2=pd.merge(stns,fids, on='ID', how='inner')
stnnm=merge2.loc[merge2['feature_id'] == dsout.feature_id[0].values].NAME[0]

## Open the m-climate and o-climate files, find unique dates from model that we need from these files

In [31]:
mclim=pd.read_parquet('http://weather.eas.cmich.edu/nwm/nwm_gauge_stats.parquet')
oclim=pd.read_parquet('http://weather.eas.cmich.edu/nwm/obs_gauge_stats.parquet')

In [32]:
uniqds=timesnew[:].map(lambda t: t.date()).unique()
uniq12s=timesnew[timesnew[:].map(lambda t: t.time().hour==12)]

In [33]:
mdlist=[]
mdlistall=[]
for f in range(0,9):
    mdlist=uniqds[f].month,uniqds[f].day
    mdlistall.append(mdlist)
    
months=[row[0] for row in mdlistall]
days=[row[1] for row in mdlistall]

## Get the data from the m-climate and o-climate files for the unique dates, save as pandas

In [34]:
mclimlist=[]
for f in range(0,9):
    mclimtemp=mclim[(mclim.index.get_level_values('month') == months[f]) & (mclim.index.get_level_values('day') == days[f])]
    mclimlist.append(mclimtemp)
mclimlist = pd.concat(mclimlist)

oclimlist=[]
for f in range(0,9):
    oclimtemp=oclim[(oclim.index.get_level_values('month') == months[f]) & (oclim.index.get_level_values('day') == days[f])]
    oclimlist.append(oclimtemp)
oclimlist = pd.concat(oclimlist)

## Set up xarrays that are filled with NaN - as we will only have data for each day, not hourly

In [35]:
blankm = np.empty([204,154])
blankm[:,:] = np.nan

blanko = np.empty([204,154])
blanko[:,:] = np.nan

unames=mclimlist.index.get_level_values('NAME').unique()

In [36]:
dao = xr.DataArray(
    data=blanko,
    dims=["time", "NAME"],
    coords=dict(time=times,
               NAME=unames,))

dam = xr.DataArray(
    data=blankm,
    dims=["time", "NAME"],
    coords=dict(time=times,
               NAME=unames,))

## Fill the xarrays with the m-climate and o-climate data from pandas

In [37]:
for s in range(0,154):
    for f in range(0,9):
        dam.loc[uniq12s[f], unames[s]] = mclimlist.loc[mclimlist.index.get_level_values(0)[s]]['streamflow','q50'].values[f]
    
for s in range(0,154):
    for f in range(0,9):
        dao.loc[uniq12s[f], unames[s]] = oclimlist.loc[oclimlist.index.get_level_values(0)[s]]['streamflow','q50'].values[f]    

## Use multiprocessing to make the plots simultaneously

In [39]:
%%time

x=times

def main():
    pool = Pool()
    num_figs = 154
    input = zip(np.array(range(0,num_figs)),range(num_figs))
    pool.map(plot, input)
    pool.close()  
    pool.join() 

def plot(args):
    num, i = args
    mpl.rcParams['path.simplify'] = True
    mpl.rcParams['path.simplify_threshold'] = 1.0
    mplstyle.use('fast')
    fig,ax1=plt.subplots(figsize=(18,12))
    
    val = dsout.feature_id[num].values
    stnnm=merge2.loc[merge2['feature_id'] == val]
    stnnm2=stnnm.NAME.values[0]

    d1 = dsout[0,:,num]
    d2 = dsout[1,:,num]
    d3 = dsout[2,:,num]
    d4 = dsout[3,:,num]
    d5 = dsout[4,:,num]
    d6 = dsout[5,:,num]
    d7 = dsout[6,:,num]
    
#    mean = dsout[:,:,num].mean(axis=0)

#    weights=np.array([.35,.25,.15,.10,.5,.5,.5]*dsout[])
#    weights.name=weights

    mean = (dsout[0,:,num]*.35)+(dsout[1,:,num]*.25)+(dsout[2,:,num]*.15)+(dsout[3,:,num]*.10)+\
    (dsout[4,:,num]*.05)+(dsout[5,:,num]*.05)+(dsout[6,:,num]*.05)
    
    dam2=dam[:,num]
    dammask = np.isfinite(dam2)
    
    dao2=dao[:,num]
    daomask = np.isfinite(dao2)
    
    ax1.plot(x, d1, lw=6, label='Current GFS Precip', color='#005a32')
    ax1.plot(x, d2, lw=6, label='-6 hr GFS Precip', color='#238b45')
    ax1.plot(x, d3, lw=6, label='-12 hr GFS Precip', color='#41ab5d')
    ax1.plot(x, d4, lw=6, label='-18 hr GFS Precip', color='#74c476')
    ax1.plot(x, d5, lw=6, label='-24 hr GFS Precip', color='#a1d99b')
    ax1.plot(x, d6, lw=6, label='-30 hr GFS Precip', color='#c7e9c0')
    ax1.plot(x, d7, lw=6, label='-36 hr GFS Precip', color='#edf8e9')
    ax1.plot(x, mean, lw=8, label='Time Weighted Mean', color='k')
    
    ax1.plot(x[dammask], dam2[dammask], lw=8, label='M-Climate', color='b', marker='o', markersize=12)
    ax1.plot(x[daomask], dao2[daomask], lw=8, label='O-Climate', color='r', marker='o', markersize=12)
    
    
    ax1.legend(prop={'size':10})


    ax1.xaxis.set_major_formatter(mdates.DateFormatter("%m-%d"))

    
    ax1.set_ylabel('$m^3/s$', fontsize=25)
    ax1.grid(True)
    fig.patch.set_facecolor('antiquewhite')

    plt.setp(ax1.get_xticklabels(), fontsize=25) 
    plt.setp(ax1.get_yticklabels(), fontsize=25) 
    title_string=d                
    plt.title(stnnm2, fontsize=30)
    plt.suptitle(title_string, fontsize=30)

    fileout='./test/temp_fig_%i.png' % i
#    fig.savefig(fileout, dpi=100, bbox_inches="tight", pad_inches=0.5)
    fig.savefig(fileout, dpi=100)
    
    img = cv2.imread(fileout, 1)
    img_inter = imutils.resize(img, width=1571)
    cv2.imwrite(fileout, img_inter)

main()

CPU times: user 126 ms, sys: 1.23 s, total: 1.35 s
Wall time: 5.13 s


In [40]:
!tar -czvf tseries.tar.gz ./test/temp*

./test/temp_fig_00.png
./test/temp_fig_01.png
./test/temp_fig_02.png
./test/temp_fig_03.png
./test/temp_fig_04.png
./test/temp_fig_05.png
./test/temp_fig_06.png
./test/temp_fig_07.png
./test/temp_fig_08.png
./test/temp_fig_09.png
./test/temp_fig_0.png
./test/temp_fig_100.png
./test/temp_fig_101.png
./test/temp_fig_102.png
./test/temp_fig_103.png
./test/temp_fig_104.png
./test/temp_fig_105.png
./test/temp_fig_106.png
./test/temp_fig_107.png
./test/temp_fig_108.png
./test/temp_fig_109.png
./test/temp_fig_10.png
./test/temp_fig_110.png
./test/temp_fig_111.png
./test/temp_fig_112.png
./test/temp_fig_113.png
./test/temp_fig_114.png
./test/temp_fig_115.png
./test/temp_fig_116.png
./test/temp_fig_117.png
./test/temp_fig_118.png
./test/temp_fig_119.png
./test/temp_fig_11.png
./test/temp_fig_120.png
./test/temp_fig_121.png
./test/temp_fig_122.png
./test/temp_fig_123.png
./test/temp_fig_124.png
./test/temp_fig_125.png
./test/temp_fig_126.png
./test/temp_fig_127.png
./test/temp_fig_128.png
./test