In [1]:
import xarray as xr
import dask
import dask.array as da
import numpy as np
import pandas as pd
import dask.multiprocessing
import blosc

import zarr
import h5py
import gcsfs
from dask_kubernetes import KubeCluster
from dask.distributed import Client, progress, LocalCluster

  from ._conv import register_converters as _register_converters


In [2]:
cluster = KubeCluster(n_workers=20)
# cluster.adapt()
cluster


In [None]:
cluster.close()

In [3]:
#client = Client(cluster)
from dask.distributed import Client
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://10.20.207.9:46095  Dashboard: /user/kaipak/proxy/8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [4]:
def randn(shape, frac_nan=None, chunks=None, seed=0):
    rng = np.random.RandomState(seed)
    if chunks is None:
        x = rng.standard_normal(shape)
    else:
        import dask.array as da
        rng = da.random.RandomState(seed)
        x = rng.standard_normal(shape, chunks=chunks)

    if frac_nan is not None:
        inds = rng.choice(range(x.size), int(x.size * frac_nan))
        x.flat[inds] = np.nan

    return x

def randint(low, high=None, size=None, frac_minus=None, seed=0):
    rng = np.random.RandomState(seed)
    x = rng.randint(low, high, size)
    if frac_minus is not None:
        inds = rng.choice(range(x.size), int(x.size * frac_minus))
        x.flat[inds] = -1

    return x

def rand_numpy(nz=None, empty=True):
    """
    Generate random 3D Numpy dataset.

    :params;

    """
    if nz == None:
        nz = getTestConfigValue("num_slices")
    if not nz or nz <= 0:
        raise NotImplementedError("num_slices invalid")
    ny = 256
    nx = 512
    dtype = 'f8'
    # Create a dataset
    #dset = f.create_dataset(_DATASET_NAME, shape=(nz,ny,nx), dtype=dtype)

    data = np.random.rand(*dset.shape).astype(dset.dtype)
    return data

def rand_xarray(nt=None):
    """
    Generate synthetic geoscience-like Xarray dataset filled with random 
    data.
    :param ds: dataset that gets generated.
    :param nt: number of timesteps for data. Primary control over how large
               the dataset is.
    :returns: A synthetic xarray dataset that mimics geodata.
    """

    ds = xr.Dataset()
    if nt == None:
        nt = getTestConfigValue("ntime_slices")
    ny = 1000
    nx = 1000
    block_chunks = {'time': nt / 4,
                             'lon': nx / 3,
                             'lat': ny / 3}

    time_chunks = {'time': int(nt / 36)}

    times = pd.date_range('1970-01-01', periods=nt, freq='D')
    lons = xr.DataArray(np.linspace(0, 360, nx), dims=('lon', ),
                        attrs={'units': 'degrees east',
                               'long_name': 'longitude'})
    lats = xr.DataArray(np.linspace(-90, 90, ny), dims=('lat', ),
                        attrs={'units':'degrees north',
                               'long_name': 'latitude'})
    ds['foo'] = xr.DataArray(randn((nt, nx, ny), frac_nan=0.2),
                             coords={'lon': lons, 'lat': lats,'time': times},
                             dims=('time', 'lon', 'lat'),
                             name='foo', encoding=None,
                             attrs={'units': 'foo units',
                                    'description': 'a description'})
    ds['bar'] = xr.DataArray(randn((nt, nx, ny), frac_nan=0.2),
                             coords={'lon': lons, 'lat': lats, 'time': times},
                             dims=('time', 'lon', 'lat'),
                             name='bar', encoding=None,
                             attrs={'units': 'bar units',
                                    'description': 'a description'})
    ds['baz'] = xr.DataArray(randn((nx, ny), frac_nan=0.2).astype(np.float32),
                             coords={'lon': lons, 'lat': lats},
                             dims=('lon', 'lat'),
                             name='baz', encoding=None,
                             attrs={'units': 'baz units',
                                    'description': 'a description'})

    ds.attrs = {'history': 'created for xarray benchmarking'}

    oinds = {'time': randint(0, nt, 120),
             'lon': randint(0, nx, 20),
             'lat': randint(0, ny, 10)}
    vinds = {'time': xr.DataArray(randint(0, nt, 120), dims='x'),
             'lon': xr.DataArray(randint(0, nx, 120), dims='x'),
             'lat': slice(3, 20)}

    return ds



In [25]:
xr_ds = rand_xarray(125)
xr_ds.nbytes/ 1024**2

250.5068130493164

In [26]:
chunks = (5, 1000, 1000)
size = (1350, 1000, 1000)
dask_arr = da.random.normal(10, 0.1, size=size, chunks=chunks)
dask_arr.nbytes/ 1024**3

10.058283805847168

In [6]:
%time dask_arr.mean().compute()

CPU times: user 147 ms, sys: 20 ms, total: 167 ms
Wall time: 623 ms


9.99998969124158

In [7]:
! gsutil -q -m rm -rf gs://storage-benchmarks/gcsfs-test-nb-zarr

In [8]:
fs = gcsfs.GCSFileSystem(project='pangeo-181919', token='cache')
gcsmapcache =  gcsfs.GCSMap('storage-benchmarks/gcsfs-test-nb')
token = fs.session.credentials

In [9]:
token = fs.session.credentials

In [10]:
fs2 = gcsfs.GCSFileSystem(project='pangeo-181919', token=token)

In [11]:
gcsmap_zarr = gcsfs.GCSMap('storage-benchmarks/gcsfs-test-nb-zarr', gcs=fs2)

In [12]:
zarr_ds = zarr.create(dask_arr.shape, chunks=chunks, 
                       dtype=dask_arr.dtype, store=gcsmap_zarr, overwrite=True)

In [None]:
def test_store(zarr_arr):
    for get in [dask.get, dask.threaded.get, dask.multiprocessing.get]:
        with dask.set_options(get=get):
            %time dask_arr.store(zarr_arr, lock=False)

In [13]:
%time dask_arr.store(zarr_ds, lock=False)

CPU times: user 183 ms, sys: 20 ms, total: 203 ms
Wall time: 2.35 s


In [14]:
my_data = da.from_array(zarr_ds, chunks=chunks)

In [18]:
# Compute speed

%time my_data.mean().compute()

distributed.utils - ERROR - ('mean_agg-aggregate-94eeb873a29e1fd4ca739027ddc210a2',)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/client.py", line 1368, in _gather
    st = self.futures[key]
KeyError: "('mean_agg-aggregate-94eeb873a29e1fd4ca739027ddc210a2',)"

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/utils.py", line 238, in f
    result[0] = yield make_coro()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/distributed/client.py", line 1374, in _gather
    None)
  File "/opt/conda/lib/python3.6/site-packages/six.py", line 693, in reraise
    raise value
concurrent.futures._base.CancelledError

KeyboardInterrupt: 

In [17]:
# Load into memory speed
#%time my_data.compute()
%time my_data.compute(chunks=chunks)

CPU times: user 701 ms, sys: 1.27 s, total: 1.98 s
Wall time: 2.4 s


array([[[ 9.90752591, 10.06445579,  9.88981395, ..., 10.05880353,
          9.96036056, 10.09790597],
        [10.1913001 ,  9.92676068, 10.01722427, ...,  9.93059448,
          9.8320846 , 10.0549677 ],
        [ 9.96285515,  9.97769982, 10.09459049, ..., 10.00732404,
         10.22431458, 10.1654628 ],
        ...,
        [10.06214599,  9.77444611,  9.91541195, ...,  9.98536641,
          9.9460669 , 10.00774887],
        [10.02309033,  9.97913896,  9.91090013, ..., 10.10994026,
          9.99908138,  9.94868469],
        [ 9.8737191 ,  9.89996031, 10.12805808, ..., 10.15413996,
          9.95267696,  9.98252853]],

       [[10.03976515, 10.17830554,  9.92715904, ...,  9.81387834,
         10.06988266, 10.07868017],
        [ 9.95105995,  9.91293287, 10.04498203, ..., 10.14565279,
          9.91354668,  9.977298  ],
        [ 9.84141481,  9.88191308, 10.07421307, ...,  9.92596298,
          9.90128391, 10.09494545],
        ...,
        [ 9.96270756, 10.26202102,  9.94513356, ..., 1

In [None]:
dsa = xr.DataArray(np.random.rand(100,1000,1000),
                  dims=['time', 'y', 'x']
                  ).chunk({'time': 1}).to_dataset(name='foo')

In [None]:
dsa.nbytes / 1024**2

In [None]:
%time dsa.to_zarr(gcsmapcache2)

In [8]:
#zarr_loc = zarr.create(dask_arr.shape, chunks=chunks, 
#                       dtype=dask_arr.dtype, path='/home/jovyan/baz', overwrite=True)
#zarr_dir = zarr.DirectoryStore('/home/jovyan/foo')
zloc = zarr.create(shape=dask_arr.shape, chunks=chunks, 
                   dtype=dask_arr.dtype, store='/gcs/storage-benchmarks/foo', 
                   overwrite=True)

In [10]:
dask_arr.store(zloc, lock=False)

In [16]:
my_fusedata = da.from_array(zloc, chunks=chunks)

In [23]:
%time my_fusedata.mean().compute()

CPU times: user 1.18 s, sys: 83 ms, total: 1.26 s
Wall time: 2.51 s


10.000005148122924

In [None]:
%time