<h1>Reading ERA5 data with xarray & Dask</h1>
Extract data from 12 monthly files, then process selected dataset for target lat/lon.  Each file is ~2gb.

In [None]:
import pandas as pd
import numpy as np
import xarray as xr
import datetime as dt
import glob
import sys

csvpath="output/era5/";outfile="";yr=2018
sitelon = 40.02 #example for Boulder, CO
sitelat = -105.27 #example for Boulder, CO

#Adj target lon to be on same scale as data
sitelon=sitelon%360

In [None]:
#Tricks to get around OS file cacheing so we can see actual read data performance...
#These are all pointing to the same files.
#e5path="/work/noaa/co2/kaushik/PVPRM/ERA5_PVPRM"#orion
e5path1='/home/ccg/mund/tmp/lns/e1/'
e5path2='/home/ccg/mund/tmp/lns/e2/'
e5path3='/home/ccg/mund/tmp/lns/e3/'


<h3>Possible approaches</h3>
<ul>
<li>Loop through each monthly file, exctract data and write into intermediate data structure
<li>Use xarray's open_mfdataset to combine files into 1 logical structure (41GB!), extract data and process.  This actually uses dask behind the scenes and runs in multi-threaded mode, so it's memory effecient and fairly fast    
<li>Use xarray's open_mfdataset in parallel processing mode
</ul>

<h4>Looping</h4>

In [None]:
#Remove this line to run... ~5-6 min
%%time
#Traditional looping method: 
outfile="looping_output.csv"
files=glob.glob(e5path1 + 'era5_daily_pvprm_%s*' %(yr))
n=0;frames=[]
for f in files:
    t=xr.open_dataset(f)
    tloc = t.sel(longitude=sitelon, latitude=sitelat, method ='nearest')
    frames.append(tloc)
looploc=xr.combine_by_coords(frames,combine_attrs='drop')
looploc

#CPU times: user 636 ms, sys: 1.4 s, total: 2.03 s
#Wall time: 5min 13s

<h4>Multi-threaded using Xarray open_mfdataset</h4>This uses Dask underneath with a multi-threaded cluster by default.  It is memory efficient, a cleaner algorithm and a little bit faster.

In [None]:
#Remove this line to run... ~4 min
%%time
outfile="mt_output.csv"
dsm = xr.open_mfdataset(e5path2 + 'era5_daily_pvprm_%s*' %(yr))

# subset location, put into dataframe 
dsmloc = dsm.sel(longitude=sitelon, latitude=sitelat, method ='nearest').compute()
#dsmloc

#CPU times: user 566 ms, sys: 1.44 s, total: 2 s
#Wall time: 4min

<h4>Multi-process using Xarray open_mfdataset</h4>
Use dask explicitly to create a multi-processing cluster 

In [None]:
from dask.distributed import Client, progress, SSHCluster
#client = Client()#Default mp cluster
client=Client(address='nimbus2:8750')#ssh nimbi cluster, started from command line
client

In [None]:
%%time
#CPU times: user 11.9 s, sys: 1.22 s, total: 13.1 s
#Wall time: 2min 56s
outfile="mp_output.csv"
dsm = xr.open_mfdataset(e5path3 + 'era5_daily_pvprm_%s*' %(yr),parallel=True,chunks={'latitude': 10, 'longitude': 10})
dsmloc = dsm.sel(longitude=sitelon, latitude=sitelat, method ='nearest').compute()
#dsmloc

#CPU times: user 854 ms, sys: 0 ns, total: 854 ms
#Wall time: 27.4 s

In [None]:
print("DSM size in GB:",dsm.nbytes / 1e9)
print("DSMloc size in GB:",dsmloc.nbytes / 1e9)

<h3>Process selected data into output</h3>
Filter data using Dask, then use Numpy, Pandas & Xarray to process.  This is fast because we're now working on a relatively small dataset.

In [None]:
%%time
# do some math/edit the arrays 

dfmloc=dsmloc.to_dataframe()

# avg soil data, convert stl1+stl2 to avg and append
soilt=(dfmloc.stl1+dfmloc.stl2)/2.
dfmloc['Ts.ERA5']=soilt

# drop stl1, stl2, lat, lon
dfmloc2=dfmloc.drop(['ssrd','stl1','stl2','latitude','longitude'],axis=1)
#del dfmloc

# rename t2m,par into new dataframe
dfmloc3=dfmloc2.rename(columns={"t2m":"Ta.ERA5","par":"PAR.ERA5"})
#del dfmloc2

# convert Ta and Ts to Celsius
dfmloc3['Ta.ERA5']=dfmloc3['Ta.ERA5']-273.15
dfmloc3['Ts.ERA5']=dfmloc3['Ts.ERA5']-273.15
# new time index going up to 23:30 on 12/31 -- era5 data ends at 23:00
t_index=pd.date_range(start='%s-01-01 00:00:00' %(yr), end='%s-12-31 23:30:00' %(yr), freq='30T')

#resample on 30min timestep, interpolate (default linear?), reindex to t_index to go up to 23:30, /
#forward fill last value, reset index so that time is one  of the columns
dfmloc_rs=dfmloc3.resample('30T').interpolate().reindex(t_index).ffill().reset_index()

# save to dataframe, rename index column to time
dfm=dfmloc_rs.rename(columns={"index":"time"})
#del dfmloc_rs

#Output to csv
dfm.to_csv(csvpath+outfile,index=False)
#del dfm