# Google Cloud Parallel Data Read Speeds with Dask

## Imports & Client Initialization

In [None]:
import dask.array as dsa
import fsspec
import numpy as np
import dask.dataframe as dd
from contextlib import contextmanager
import xarray as xr
import intake
import time
import dask
from matplotlib import pyplot as plt
import pandas as pd

In [None]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

## Benchmarking Setup

First, we will create this null storage object. To measure our throughput, all of the data will need to be accessed at a single time and can be achieved by storing the data into this null storage target.

In [None]:
class DevNullStore:
    def __init__(self):
        pass
    def __setitem__(*args, **kwargs):
        pass

null_store = DevNullStore()

The Diagnostic Timer will keep track of data retrieval times and store them within a pandas dataframe for later processing and analysis.

In [None]:
class DiagnosticTimer:
    def __init__(self):
        self.diagnostics = []
        
    @contextmanager
    def time(self, **kwargs):
        tic = time.time()
        yield
        toc = time.time()
        kwargs["runtime"] = toc - tic
        self.diagnostics.append(kwargs)
        
    def dataframe(self):
        return pd.DataFrame(self.diagnostics)
    
diag_timer = DiagnosticTimer()

## Perform Benchmarking

Though we are accessing the data from different formats, the core process will be the exact same. Dask is lazy by default, which means we will be using the previously defined null_store to measure throughput. Converting from DataFrames to Arrays, for example, will not affect the access speed because the data is not actually read from the source until we "store" it. So, these read speeds should be the same whether you are initially pointing to data using a dataframe or array.

In [None]:
token = '/home/ubuntu/Cloud-Data-Transfer-Speed-Benchmarks/cloud-data-benchmarks.json'

In [None]:
def total_nthreads():
    return sum([v for v in client.nthreads().values()])

def total_ncores():
    return sum([v for v in client.ncores().values()])

def total_workers():
    return len(client.ncores())

### CSV

#### Single File

In [None]:
tic1 = time.time()
df0 = dd.read_csv('gs://cloud-data-benchmarks/ETOPO1_Ice_g_gmt4.csv', storage_options={'token':token}, assume_missing=True)
toc1 = time.time()
connectTime = toc1-tic1

da = df0.to_dask_array(lengths=True)
del df0
chunksize = np.prod(da.chunksize) * da.dtype.itemsize
da

This is the main loop, pulled from [Ryan Abernathy's demonstration](https://github.com/earthcube2020/ec20_abernathey_etal/blob/master/cloud_storage.ipynb) . A few modifications have been made, but most notably we also want to measure the connection time to the cloud server. These times will be different depending on the connection method made, and in this case we are using the dask.dataframe.read_csv(. . .) function. This shouldn't matter if we are purely measuring access times, but it is important when we use different modules to connect to Google Cloud Storage.

In [None]:
diag_kwargs = dict(nbytes=da.nbytes, chunksize=chunksize, format='csv')

for nworkers in [1, 3, 4]:
    cluster.scale(nworkers)
    time.sleep(10)
    client.wait_for_workers(nworkers)
    print(nworkers)
    with diag_timer.time(nthreads=total_nthreads(), ncores=total_ncores(), nworkers=total_workers(), connectTime=connectTime, **diag_kwargs):
        future = dsa.store(da, null_store, lock=False, compute=False)
        dask.compute(future, retries=5)
    del future

In [None]:
df = diag_timer.dataframe()
df['throughput_Mbps'] = da.nbytes / 1e6 / df['runtime']
del da
df

#### Multiple Files

In [None]:
tic1 = time.time()
df0 = dd.read_csv('gs://cloud-data-benchmarks/csvpartitions/*', storage_options={'token':token}, assume_missing=True)
toc1 = time.time()
connectTime = toc1-tic1

da = df0.to_dask_array(lengths=True)
del df0
chunksize = np.prod(da.chunksize) * da.dtype.itemsize
da

In [None]:
diag_kwargs = dict(nbytes=da.nbytes, chunksize=chunksize, format='part_csv')

for nworkers in [1, 3, 4]:
    cluster.scale(nworkers)
    time.sleep(10)
    client.wait_for_workers(nworkers)
    print(nworkers)
    with diag_timer.time(nthreads=total_nthreads(), ncores=total_ncores(), nworkers=total_workers(), connectTime=connectTime, **diag_kwargs):
        future = dsa.store(da, null_store, lock=False, compute=False)
        dask.compute(future, retries=5)
    del future

In [None]:
df = diag_timer.dataframe()
df['throughput_Mbps'] = da.nbytes / 1e6 / df['runtime']
del da
df

### NetCDF

Load NetCDF using the intake library. This is a great way to load the data directly from GCS without directly using the gcsfs module. Depending on the NetCDF file contents, decoding the data using the engines included with xarray.open_dataset(...) can be troublesome. Read the XArray documentation about reading files using xarray.open_dataset(...) [here](https://docs.xarray.dev/en/stable/user-guide/io.html).

The intake module is much easier to use, and offers some additional functionality in the form of catalogs. Read about catalogs and their uses [here](https://intake.readthedocs.io/en/latest/catalog.html).

In [None]:
tic1 = time.time()
#gs://cloud-data-benchmarks/ETOPO1_Ice_g_gmt4.nc
data = intake.open_netcdf('gs://cloud-data-benchmarks/ETOPO1_Ice_g_gmt4.nc').to_dask()
toc1 = time.time()
connectTime = toc1-tic1
data = data.to_array() # Changes from DataSet to DataArray
da = data.data # Retrieves raw values from wrapped Xarray DataArray object
da = dsa.from_array(da)
chunksize = np.prod(da.chunksize) * da.dtype.itemsize
da

We encounter a large unmanaged memory warning in the dask.compute(future, retries=5) line, causing the data store to fail.

In [None]:
diag_kwargs = dict(nbytes=da.nbytes, chunksize=chunksize, format='NetCDF')

for nworkers in [1, 3, 4]:
    cluster.scale(nworkers)
    time.sleep(10)
    client.wait_for_workers(nworkers)
    print(nworkers)
    with diag_timer.time(nthreads=total_nthreads(), ncores=total_ncores(), nworkers=total_workers(), connectTime=connectTime, **diag_kwargs):
        future = dsa.store(da, null_store, lock=False, compute=False)
        dask.compute(future, retries=5)
    del future

In [None]:
df = diag_timer.dataframe()
df['throughput_Mbps'] = da.nbytes / 1e6 / df['runtime']
del da
df

### Parquet

Throughput will be tested in two different ways:
* A CSV file was partitioned into a DataFrame by Dask, and 1 parquet file was written per DataFrame partition. All of these files will be read into the null storage target
* A single parquet file containing all of the original CSV information will be read into the null storage target

#### Multiple Files

In [None]:
tic1 = time.time()
df0 = dd.read_parquet("gs://cloud-data-benchmarks/parquetpartitions/*")
toc1 = time.time()
connectTime = toc1 - tic1

da = df0.to_dask_array(lengths=True)
del df0
chunksize = np.prod(da.chunksize) * da.dtype.itemsize
da

In [None]:
diag_kwargs = dict(nbytes=da.nbytes, chunksize=chunksize, format='part_parquet')

for nworkers in [1, 3, 4]:
    cluster.scale(nworkers)
    time.sleep(10)
    client.wait_for_workers(nworkers)
    print(nworkers)
    with diag_timer.time(nthreads=total_nthreads(), ncores=total_ncores(), nworkers=total_workers(), connectTime=connectTime, **diag_kwargs):
        future = dsa.store(da, null_store, lock=False, compute=False)
        dask.compute(future, retries=5)
    del future

In [None]:
df = diag_timer.dataframe()
df['throughput_Mbps'] = da.nbytes / 1e6 / df['runtime']
del da
df

#### Single File

We encounter a problem with large unmanaged memory when attempting to compute the size of DataFrame. Generally, it is recommended that parquet files be partitioned in advance to avoid this issue. 

In [None]:
tic1 = time.time()
df0 = dd.read_parquet("gs://cloud-data-benchmarks/ETOPO1_Ice_g_gmt4.parquet")
toc1 = time.time()
connectTime = toc1 - tic1

# Memory Problem between this print statements, similar to the NetCDF issue
print("Calculating size of array")
da = df0.to_dask_array(lengths=True)
print("Size of array has been calculated")

del df0
chunksize = np.prod(da.chunksize) * da.dtype.itemsize
da

In [None]:
diag_kwargs = dict(nbytes=da.nbytes, chunksize=chunksize, format='parquet')

for nworkers in [1, 3, 4]:
    cluster.scale(nworkers)
    time.sleep(10)
    client.wait_for_workers(nworkers)
    print(nworkers)
    with diag_timer.time(nthreads=total_nthreads(), ncores=total_ncores(), nworkers=total_workers(), connectTime=connectTime, **diag_kwargs):
        future = dsa.store(da, null_store, lock=False, compute=False)
        dask.compute(future, retries=5)
    del future

In [None]:
df = diag_timer.dataframe()
df['throughput_Mbps'] = da.nbytes / 1e6 / df['runtime']
del da
df

### Zarr

See <a href="https://www.programcreek.com/python/example/128207/dask.array.from_zarr#:~:text=.chunks)-,Example%205,-Project%3A%20napari">this python example</a> of dask.array.from_zarr(...) for further understanding of the differences between a Zarr Array & Zarr Group.

A Zarr Group is a very good choice of storage utility for multiscale data.

#### Zarr Array

In [None]:
tic1 = time.time()
da = dsa.from_zarr('gs://cloud-data-benchmarks/ETOPO1_Ice_g_gmt4.zarray')
toc1 = time.time()
connectTime = toc1 - tic1

chunksize = np.prod(da.chunksize) * da.dtype.itemsize
da

In [None]:
diag_kwargs = dict(nbytes=da.nbytes, chunksize=chunksize, format='zarr_array')

for nworkers in [1, 3, 4]:
    cluster.scale(nworkers)
    time.sleep(10)
    client.wait_for_workers(nworkers)
    print(nworkers)
    with diag_timer.time(nthreads=total_nthreads(), ncores=total_ncores(), nworkers=total_workers(), connectTime=connectTime, **diag_kwargs):
        future = dsa.store(da, null_store, lock=False, compute=False)
        dask.compute(future, retries=5)
    del future

In [None]:
df = diag_timer.dataframe()
df['throughput_Mbps'] = da.nbytes / 1e6 / df['runtime']
del da
df

#### Zarr Hierachical Group

In [None]:
tic1 = time.time()
zarr_ds = xr.open_zarr(store='gs://cloud-data-benchmarks/ETOPO1_Ice_g_gmt4.zarr', storage_options={'token':token}, consolidated=True)
toc1 = time.time()
connectTime = toc1-tic1

darray = zarr_ds.to_array()
da = darray.data

chunksize = np.prod(da.chunksize) * da.dtype.itemsize
da

In [None]:
diag_kwargs = dict(nbytes=da.nbytes, chunksize=chunksize, format='zarr_group')

for nworkers in [1, 3, 4]:
    cluster.scale(nworkers)
    time.sleep(10)
    client.wait_for_workers(nworkers)
    print(nworkers)
    with diag_timer.time(nthreads=total_nthreads(), ncores=total_ncores(), nworkers=total_workers(), connectTime=connectTime, **diag_kwargs):
        future = dsa.store(da, null_store, lock=False, compute=False)
        dask.compute(future, retries=5)
    del future

In [None]:
df = diag_timer.dataframe()
df['throughput_Mbps'] = da.nbytes / 1e6 / df['runtime']
del da
df

## Plots