### Benchmarking SCINet GeoCDL Options

This notebook is designed to benchmark the throughput from various data sources to the USDA ARS HPC systems. Much of the code and inspiration for these metrics comes from a similiar effort via the Pangeo ([see this link](http://gallery.pangeo.io/repos/earthcube2020/ec20_abernathey_etal/cloud_storage.html) or this [preprint](https://www.authorea.com/doi/full/10.22541/au.160443768.88917719/v1))

This benchmark uses dask arrays and a dask distributed computing system to scale the number of workers fetching data from remote data repositories. To run a benchmark:
1. Copy the `template.ipynb` file.
2. Rename the file like:<br>
  * DataSource__FileType.ipynb (Note the double _ characters)
  * e.g. `template.ipynb` --> `aws__netcdf.ipynb`
  * e.g. `template.ipynb` --> `DukeFTP__Gdal_tif.ipynb`
3. Fill in the IO code in the "blank" section.
4. Run the entire notebook.
5. Confirm a file was written into the result folder.

In [2]:
import warnings
warnings.filterwarnings('ignore')
from utils import DevNullStore,DiagnosticTimer,total_nthreads,total_ncores,total_workers,get_chunksize
import time, datetime, os, dask
import pandas as pd
import dask.array as da
import hvplot.pandas
from tqdm.notebook import tqdm
from dask.distributed import LocalCluster,Client

In [3]:
## This environment set to optimized reading COG files - see https://github.com/pangeo-data/cog-best-practices

env = dict(GDAL_DISABLE_READDIR_ON_OPEN='EMPTY_DIR', 
           AWS_NO_SIGN_REQUEST='YES',
           GDAL_MAX_RAW_BLOCK_CACHE_SIZE='200000000',
           GDAL_SWATH_SIZE='200000000',
           VSI_CURL_CACHE_SIZE='200000000')

os.environ.update(env)

In [4]:
dask.config.set({'distributed.dashboard.link':'https://localhost:8888/proxy/8787/status'})
cluster = LocalCluster(threads_per_worker=2)
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:37309  Dashboard: https://localhost:8888/proxy/8787/status,Cluster  Workers: 6  Cores: 12  Memory: 26.70 GB


### Dataset Specific IO Code

Load data into a dask array named `data`. Ideally subset the data is it has ~100MB chunks and totals ~25 GBs. An example approach is:
```python
ds = xr.open_rasterio('http://hydrology.cee.duke.edu/POLARIS/PROPERTIES/v1.0/vrt/clay_mode_0_5.vrt',
                      chunks='auto')
data = ds.isel(x=slice(0,5400*5),y=slice(0,5400*5)).data
```

Define the following metadata:

**cloud_source** = Where is the data being housed <br>
**format** = Format of the data. For gdal drivers use gdal_drivername (aka gdal_tif). For other sources, use the file formatt suffix (aka zarr).<br>
**system** = The system (Ceres, Atlas, AWS, GCP, AZURE, etc...)

Example:
```python
cloud_source = 'aws_nasa_daac'
d_format = 'gdal_cog'
system = 'Ceres'
```

In [5]:
### ADD CODE BELOW ###
import xarray as xr
ds = xr.open_rasterio('http://hydrology.cee.duke.edu/POLARIS/PROPERTIES/v1.0/vrt/clay_mode_0_5.vrt',chunks='auto')
data = ds.isel(x=slice(0,5400*5),y=slice(0,5310*5)).data

d_format = 'gdal_vrt'
cloud_source = 'ftp_duke'
system = 'Ceres'
data

Unnamed: 0,Array,Chunk
Bytes,2.87 GB,114.70 MB
Shape,"(1, 26550, 27000)","(1, 5400, 5310)"
Count,751 Tasks,30 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 2.87 GB 114.70 MB Shape (1, 26550, 27000) (1, 5400, 5310) Count 751 Tasks 30 Chunks Type float32 numpy.ndarray",27000  26550  1,

Unnamed: 0,Array,Chunk
Bytes,2.87 GB,114.70 MB
Shape,"(1, 26550, 27000)","(1, 5400, 5310)"
Count,751 Tasks,30 Chunks
Type,float32,numpy.ndarray


### Run Diagnostics on Throughput

In [6]:
diag_timer = DiagnosticTimer()
devnull = DevNullStore()

chunksize = get_chunksize(data)
totalsize = data.nbytes*1e-9

diag_kwargs = dict(nbytes=data.nbytes,
                   chunksize=chunksize,
                   cloud_source=cloud_source,
                   system=system,
                   format=d_format)

for nworkers in tqdm([2,4,6]):
    client.restart()
    cluster.scale(nworkers)
    time.sleep(10)
    client.wait_for_workers(nworkers)
    with diag_timer.time(nthreads=total_nthreads(client),
                         ncores=total_ncores(client),
                         nworkers=total_workers(client),
                         **diag_kwargs):

        future = da.store(data, devnull, lock=False, compute=False)
        dask.compute(future, retries=5)
df = diag_timer.dataframe()
df['throughput_MBps'] = df.nbytes/1e6/df.runtime

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=3.0), HTML(value='')))






### Visualize and save the results

In [8]:
#Save results
df.to_csv('../results/Python__'+system+'__'+cloud_source+'__'+d_format+'__'+datetime.datetime.now().strftime("%Y%m%d_%H%M")+'.csv',index=False)

#Plot Throughput v. nworkers
pl = df.hvplot(x='nworkers',
               y='throughput_MBps',
               kind='line')*\
     df.hvplot(x='nworkers',
               y='throughput_MBps',
               s=50,
               kind='scatter').opts(title='System: '+system+'    |    Data Source: '+cloud_source+'    |    File Format: '+d_format)
pl