In [1]:
!pip install swiftspec



In [2]:
!pip install zarr-swiftstore



In [3]:
import xarray as xr
import s3fs
import swiftspec
import zarr
import numpy as np
import dask.array as da
import xarray as xr
from dask.utils import parse_bytes
import math
import pandas as pd
import dask
from zarrswift import SwiftStore

# Access on read only with S3

In [4]:
client_kwargs={'endpoint_url': 'https://object-store.cloud.muni.cz'}

fs = s3fs.S3FileSystem(anon=True, client_kwargs=client_kwargs)

In [5]:
fs.ls('foss4g-data')

['foss4g-data/CGLS_LTS_1999_2019',
 'foss4g-data/CGLS_LTS_1999_2019_Lombardia',
 'foss4g-data/test']

In [6]:
fs.mkdir('foss4g-data/mydir')

In [7]:
fs.ls('foss4g-data')

['foss4g-data/CGLS_LTS_1999_2019',
 'foss4g-data/CGLS_LTS_1999_2019_Lombardia',
 'foss4g-data/test']

# Access with swiftfsspec

The following cell don't work. Currently not able to make swiftspec work.

In [8]:
fs = swiftspec.SWIFTFileSystem()

In [9]:
#import fsspec
#with fsspec.open("swift://object-store.cloud.muni.cz/swift/v1/pangeo-test/jupyterbook-html.zip", "r") as f:
#    print(f.read())

In [10]:
#fs.ls("swift://object-store.cloud.muni.cz/swift/pangeo-test/none")

# Zarr swift store

In [11]:
import os
os.environ["OS_STORAGE_URL"] = "https://object-store.cloud.muni.cz/swift/v1"
os.environ["OS_AUTH_TOKEN"] = "gAAAAABjI1RsrihfxjDeLJYetz7yN-7rPQmOLpPTtsxv0-XyW8MjJvpJA9VmrlbgIFO9KPfxNkxKC0U0CNjheBvgtMhXPURo_cH9gHvyitYZSTLNF7ShpTQm2hh0YtPc444IPYsrOxkSk0qCtiyMxfncHzEW6GSMe8hIqDPDkQFRBZav6KzvQtQXIE4V9UksU18RyHC6cA5OcHXYPOf6djp_ead0PqA4sjYbPg0CavY9uVduCKOucaXdCpArr-TRwZjJ3nfzNRqd"


In [12]:
auth = {
    "preauthurl": os.environ["OS_STORAGE_URL"],
    "preauthtoken": os.environ["OS_AUTH_TOKEN"],
}

In [13]:
ds = xr.Dataset(
        {"foo": (('x', 'y'), np.random.rand(4, 5))},
        coords = {
          'x': [10, 20, 30, 40],
          'y': [1, 2, 3, 4, 5],
        },
)

store = SwiftStore(container='demo', prefix='xarray-demo', storage_options=auth)

In [14]:
store.clear()

In [15]:
%%time
ds.to_zarr(store=store, mode='w', consolidated=True)

CPU times: user 218 ms, sys: 16.8 ms, total: 234 ms
Wall time: 23.2 s


<xarray.backends.zarr.ZarrStore at 0x7f4a83a01a50>

We observe that writing such a small dataset already takes time, probably mainly for meta data and setting up things?

In [16]:
ds = xr.open_zarr(store=store, consolidated=True)

# Performance test

## Dataset setup

In [17]:
def timeseries(
    chunk_per_worker=5,
    chunk_size="128 MB",
    num_nodes=12,
    worker_per_node=4,
    chunking_scheme=None,
    lat=320,
    lon=384,
    start="1980-01-01",
    freq="1H",
    nan=False,
):
    """ Create synthetic Xarray dataset filled with random
    data.
    Parameters
    ----------
    chunk_per_worker : int
          number of chunk placed per worker.
          see docs.dask.org, best practices, for chunk.
          Best chunk size is around 100M but, each worker can
          have many chunk, which automate the parallelism in dask.
    chunk_size : str
          chunk size in bytes, kilo, mega or any factor of bytes
    num_nodes : int
           number of compute nodes
    worker_per_node: int
           number of dask workers per node
    chunking_scheme : str
           Whether to chunk across time dimension ('temporal') or
           horizontal dimensions (lat, lon) ('spatial').
           If None, automatically determine chunk sizes along all dimensions.
    lat : int
         number of latitude values
    lon : int
         number of longitude values
    start : datetime (or datetime-like string)
        Start of time series
    freq : string
        String like '2s' or '1H' or '12W' for the time series frequency
    nan : bool
         Whether to include nan in generated data
    Examples
    ---------
    >>> from benchmarks.datasets import timeseries
    >>> ds = timeseries('128MB', 5, chunking_scheme='spatial', lat=500, lon=600)
    >>> ds
    <xarray.Dataset>
    Dimensions:  (lat: 500, lon: 600, time: 267)
    Coordinates:
    * time     (time) datetime64[ns] 1980-01-01 1980-01-02 ... 1980-09-23
    * lon      (lon) float64 -180.0 -179.4 -178.8 -178.2 ... 178.8 179.4 180.0
    * lat      (lat) float64 -90.0 -89.64 -89.28 -88.92 ... 88.92 89.28 89.64 90.0
    Data variables:
        sst      (time, lon, lat) float64 dask.array<shape=(267, 600, 500), .....
    Attributes:
        history:  created for compute benchmarking
    """

    dt = np.dtype("f8")
    itemsize = dt.itemsize
    chunk_size = parse_bytes(chunk_size)
    total_bytes = chunk_size * num_nodes * worker_per_node * chunk_per_worker
    # total_bytes = chunk_size * num_nodes * worker_per_node
    size = total_bytes / itemsize
    timesteps = math.ceil(size / (lat * lon))
    shape = (timesteps, lon, lat)
    if chunking_scheme == "temporal":
        x = math.ceil(chunk_size / (lon * lat * itemsize))
        chunks = (x, lon, lat)
    elif chunking_scheme == "spatial":
        x = math.ceil(math.sqrt(chunk_size / (timesteps * itemsize)))
        chunks = (timesteps, x, x)
    else:
        chunks = "auto"

    lats = xr.DataArray(np.linspace(start=-90, stop=90, num=lat), dims=["lat"])
    lons = xr.DataArray(np.linspace(start=-180, stop=180, num=lon), dims=["lon"])
    times = xr.DataArray(pd.date_range(start=start, freq=freq, periods=timesteps), dims=["time"])
    if chunks == "auto":
        with dask.config.set({"array.chunk-size": chunk_size}):
            random_data = randn(shape=shape, chunks=chunks, nan=nan)
    else:
        random_data = randn(shape=shape, chunks=chunks, nan=nan)
    ds = xr.DataArray(
        random_data,
        dims=["time", "lon", "lat"],
        coords={"time": times, "lon": lons, "lat": lats},
        name="sst",
        attrs={"units": "baz units", "description": "a description"},
    ).to_dataset()
    ds.attrs = {"history": "created for compute benchmarking"}

    return ds


def randn(shape, chunks=None, nan=False, seed=0):
    rng = da.random.RandomState(seed)
    x = 5 + 3 * rng.standard_normal(shape, chunks=chunks)
    if nan:
        x = da.where(x < 0, np.nan, x)
    return x

## Test without Dask cluster

In [18]:
ds = timeseries(chunk_size='64 MiB', chunking_scheme='temporal', chunk_per_worker=1, num_nodes=1)
ds

Unnamed: 0,Array,Chunk
Bytes,256.88 MiB,64.69 MiB
Shape,"(274, 384, 320)","(69, 384, 320)"
Count,12 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 256.88 MiB 64.69 MiB Shape (274, 384, 320) (69, 384, 320) Count 12 Tasks 4 Chunks Type float64 numpy.ndarray",320  384  274,

Unnamed: 0,Array,Chunk
Bytes,256.88 MiB,64.69 MiB
Shape,"(274, 384, 320)","(69, 384, 320)"
Count,12 Tasks,4 Chunks
Type,float64,numpy.ndarray


In [19]:
store = SwiftStore(container='pangeo-test', prefix='small-data-without-dask', storage_options=auth)
store.clear()

In [20]:
%%time
ds.to_zarr(store=store, mode='w', consolidated=True)

CPU times: user 1.71 s, sys: 335 ms, total: 2.04 s
Wall time: 30 s


<xarray.backends.zarr.ZarrStore at 0x7f4a7838c6d0>

A bit more time for this 4 chunks Dataset.

## Setup Dask gateway cluster

In [21]:
from dask_gateway import Gateway
gateway = Gateway()

In [22]:
cluster = gateway.new_cluster(worker_memory=4)
cluster.scale(6)
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [23]:
from dask.distributed import Client
client = Client(cluster)
client.wait_for_workers(6)
client

0,1
Connection method: Cluster object,Cluster type: dask_gateway.GatewayCluster
Dashboard: /jupyterhub/services/dask-gateway/clusters/daskhub.cb43bee2203544e8a0ff85b13e64b3ad/status,


In [24]:
from dask.distributed import PipInstall
plugin = PipInstall(packages=["zarr-swiftstore"], pip_options=["--upgrade"])

In [25]:
client.register_worker_plugin(plugin)

{'tls://10.244.10.54:34601': {'status': 'OK'},
 'tls://10.244.3.84:35537': {'status': 'OK'},
 'tls://10.244.5.49:38591': {'status': 'OK'},
 'tls://10.244.7.68:42889': {'status': 'OK'},
 'tls://10.244.8.73:42697': {'status': 'OK'},
 'tls://10.244.9.56:37505': {'status': 'OK'}}

## Try example computation

In [26]:
import dask.array as da

sample = 10_000_000_000  # <- this is huge!
xxyy = da.random.uniform(-1, 1, size=(2, sample))
norm = da.linalg.norm(xxyy, axis=0)
summ = da.sum(norm <= 1)
insiders = summ.compute()
pi = 4 * insiders / sample
print("pi ~= {}".format(pi))

pi ~= 3.1415656108


## Really small example dataset with Dask cluster

In [27]:
ds = xr.Dataset(
        {"foo": (('x', 'y'), np.random.rand(4, 5))},
        coords = {
          'x': [10, 20, 30, 40],
          'y': [1, 2, 3, 4, 5],
        },
)

ds = ds.chunk()
ds

Unnamed: 0,Array,Chunk
Bytes,160 B,160 B
Shape,"(4, 5)","(4, 5)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 160 B 160 B Shape (4, 5) (4, 5) Count 1 Tasks 1 Chunks Type float64 numpy.ndarray",5  4,

Unnamed: 0,Array,Chunk
Bytes,160 B,160 B
Shape,"(4, 5)","(4, 5)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [28]:
store = SwiftStore(container='demo', prefix='xarray-demo-dask', storage_options=auth)
store.clear()

In [29]:
%%time
ds.to_zarr(store=store, mode='w', consolidated=True)

CPU times: user 236 ms, sys: 18.5 ms, total: 254 ms
Wall time: 24.5 s


<xarray.backends.zarr.ZarrStore at 0x7f4a83a01b30>

Same time with or without cluster for this dataset.

## Small dataset with Dask cluster

In [30]:
ds = timeseries(chunk_size='64 MiB', chunking_scheme='temporal', chunk_per_worker=1, num_nodes=1)
ds

Unnamed: 0,Array,Chunk
Bytes,256.88 MiB,64.69 MiB
Shape,"(274, 384, 320)","(69, 384, 320)"
Count,12 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 256.88 MiB 64.69 MiB Shape (274, 384, 320) (69, 384, 320) Count 12 Tasks 4 Chunks Type float64 numpy.ndarray",320  384  274,

Unnamed: 0,Array,Chunk
Bytes,256.88 MiB,64.69 MiB
Shape,"(274, 384, 320)","(69, 384, 320)"
Count,12 Tasks,4 Chunks
Type,float64,numpy.ndarray


In [31]:
store = SwiftStore(container='pangeo-test', prefix='small-data-with-dask', storage_options=auth)
store.clear()

In [32]:
%%time
ds.to_zarr(store=store, mode='w', consolidated=True)

CPU times: user 315 ms, sys: 26.7 ms, total: 342 ms
Wall time: 33.1 s


<xarray.backends.zarr.ZarrStore at 0x7f4a74326510>

Just a bit slower with the cluster, not meaningful.

Same as without Dask Cluster. So there are 30s just to setup things.

## Medium Dataset

In [33]:
ds = timeseries(chunk_size='64 MiB', chunking_scheme='temporal', chunk_per_worker=5)
ds

Unnamed: 0,Array,Chunk
Bytes,15.00 GiB,64.69 MiB
Shape,"(16384, 384, 320)","(69, 384, 320)"
Count,714 Tasks,238 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 15.00 GiB 64.69 MiB Shape (16384, 384, 320) (69, 384, 320) Count 714 Tasks 238 Chunks Type float64 numpy.ndarray",320  384  16384,

Unnamed: 0,Array,Chunk
Bytes,15.00 GiB,64.69 MiB
Shape,"(16384, 384, 320)","(69, 384, 320)"
Count,714 Tasks,238 Chunks
Type,float64,numpy.ndarray


In [34]:
store = SwiftStore(container='pangeo-test', prefix='random-data', storage_options=auth)

In [35]:
%%time
store.clear()

CPU times: user 1.03 s, sys: 64.7 ms, total: 1.09 s
Wall time: 2min


Deleting several chunks takes time!!

In [36]:
%%time
ds.to_zarr(store=store, mode='w', consolidated=True)

CPU times: user 707 ms, sys: 85.2 ms, total: 792 ms
Wall time: 1min 53s


<xarray.backends.zarr.ZarrStore at 0x7f4a744e9270>

Between 1.5s and 2s for writing each chunks. After 30s incompressible time.

In [37]:
30 + 2*238/6

109.33333333333333

## Write with small chunks (bad)

In [38]:
ds_ios = timeseries(chunk_per_worker=100, chunking_scheme='temporal',chunk_size="256 KB")
ds_ios

Unnamed: 0,Array,Chunk
Bytes,1.14 GiB,0.94 MiB
Shape,"(1250, 384, 320)","(1, 384, 320)"
Count,3750 Tasks,1250 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.14 GiB 0.94 MiB Shape (1250, 384, 320) (1, 384, 320) Count 3750 Tasks 1250 Chunks Type float64 numpy.ndarray",320  384  1250,

Unnamed: 0,Array,Chunk
Bytes,1.14 GiB,0.94 MiB
Shape,"(1250, 384, 320)","(1, 384, 320)"
Count,3750 Tasks,1250 Chunks
Type,float64,numpy.ndarray


In [39]:
store_ios = SwiftStore(container='pangeo-test', prefix='random-data-iops', storage_options=auth)
store_ios.clear()

This one was not timed, but it took a really really long time.

In [40]:
%%time
ds_ios.to_zarr(store_ios)

CPU times: user 2 s, sys: 174 ms, total: 2.17 s
Wall time: 3min 55s


<xarray.backends.zarr.ZarrStore at 0x7f4a5bfd5cf0>

About 0.5s write time for each chunk. Latency is measured here. But it takes also more than 30s to really start.

In [41]:
30 + 0.8*1250/6

196.66666666666666

## Scale and Write

In [42]:
cluster.scale(12)
client.wait_for_workers(12)
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [43]:
client.register_worker_plugin(plugin)

{'tls://10.244.10.54:34601': {'status': 'OK'},
 'tls://10.244.10.55:33379': {'status': 'OK'},
 'tls://10.244.2.149:43999': {'status': 'OK'},
 'tls://10.244.3.84:35537': {'status': 'OK'},
 'tls://10.244.3.85:35563': {'status': 'OK'},
 'tls://10.244.4.71:42021': {'status': 'OK'},
 'tls://10.244.5.49:38591': {'status': 'OK'},
 'tls://10.244.7.68:42889': {'status': 'OK'},
 'tls://10.244.7.69:43259': {'status': 'OK'},
 'tls://10.244.8.73:42697': {'status': 'OK'},
 'tls://10.244.8.74:45263': {'status': 'OK'},
 'tls://10.244.9.56:37505': {'status': 'OK'}}

In [44]:
ds = timeseries(chunk_size='64 MiB', chunking_scheme='temporal', chunk_per_worker=5)
ds

Unnamed: 0,Array,Chunk
Bytes,15.00 GiB,64.69 MiB
Shape,"(16384, 384, 320)","(69, 384, 320)"
Count,714 Tasks,238 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 15.00 GiB 64.69 MiB Shape (16384, 384, 320) (69, 384, 320) Count 714 Tasks 238 Chunks Type float64 numpy.ndarray",320  384  16384,

Unnamed: 0,Array,Chunk
Bytes,15.00 GiB,64.69 MiB
Shape,"(16384, 384, 320)","(69, 384, 320)"
Count,714 Tasks,238 Chunks
Type,float64,numpy.ndarray


In [45]:
%%time
store = SwiftStore(container='pangeo-test', prefix='random-data', storage_options=auth)
store.clear()

CPU times: user 1.04 s, sys: 104 ms, total: 1.14 s
Wall time: 1min 56s


In [46]:
%%time
ds.to_zarr(store=store, mode='w', consolidated=True)

CPU times: user 651 ms, sys: 48.9 ms, total: 700 ms
Wall time: 1min 15s


<xarray.backends.zarr.ZarrStore at 0x7f4a744e8ba0>

With 12 workers, the chunk writing time is a bit longer, between 1.5 and sometimes close to 2.5s. Cosnidreing also the initial incompressible time, speed up is clearly not twice as fast.

## Read with Zarr-swift

Here we assume mean computation is negligible compared to IOs.

In [47]:
%%time
ds_read = xr.open_zarr(store)
ds_read.sst.mean().compute()

CPU times: user 434 ms, sys: 14.1 ms, total: 448 ms
Wall time: 33.4 s


Chunks read time between 1s and 1.6s.

## Read with S3

In [48]:
client_kwargs={'endpoint_url': 'https://object-store.cloud.muni.cz'}
fs = s3fs.S3FileSystem(anon=True, client_kwargs=client_kwargs)

In [49]:
store_s3 = s3fs.S3Map(root='pangeo-test/random-data',
                   s3=fs,
                   check=False)

In [50]:
%%time
ds_read_s3 = xr.open_zarr(store_s3)
ds_read_s3.sst.mean().compute()

CPU times: user 140 ms, sys: 519 µs, total: 140 ms
Wall time: 25.7 s


Faster to read using S3 interface?
Chunks read time between 800ms and 1.6s.

## Clean resources

In [51]:
cluster.shutdown()

  self.scheduler_comm.close_rpc()


In [52]:
client.close()