In [1]:
!pip install swiftspec

Collecting swiftspec
  Using cached swiftspec-0.0.4-py3-none-any.whl (7.4 kB)
Installing collected packages: swiftspec
Successfully installed swiftspec-0.0.4


In [2]:
!pip install zarr-swiftstore

Collecting zarr-swiftstore
  Using cached zarr_swiftstore-1.2.3-py3-none-any.whl
Collecting python-swiftclient>=3.10.0
  Using cached python_swiftclient-4.1.0-py3-none-any.whl (87 kB)
Installing collected packages: python-swiftclient, zarr-swiftstore
Successfully installed python-swiftclient-4.1.0 zarr-swiftstore-1.2.3


In [3]:
import xarray as xr
import s3fs
import swiftspec
import zarr
import numpy as np
import dask.array as da
import xarray as xr
from dask.utils import parse_bytes
import math
import pandas as pd
import dask
from zarrswift import SwiftStore

# Access on read only with S3

In [4]:
client_kwargs={'endpoint_url': 'https://object-store.cloud.muni.cz'}

s3 = s3fs.S3FileSystem(anon=True, client_kwargs=client_kwargs)

In [5]:
s3.ls('foss4g-data')

['foss4g-data/CGLS_LTS_1999_2019',
 'foss4g-data/CGLS_LTS_1999_2019_Lombardia',
 'foss4g-data/test']

In [6]:
try:
    s3.touch('foss4g-data/myfile')
except Exception as e:
    print(e)

An error occurred (AccessDenied) when calling the PutObject operation: Unknown


# Read write access with CESNET S3 credentials

First, set your access and secret key, obtained following [../EGI-CLI-Swift.md](../EGI-CLI-Swift.md).

In [7]:
#!aws configure set aws_access_key_id yourkey
#!aws configure set aws_secret_access_key yoursecret

Then, we get them because we need to pass them explicitly to Dask workers so distributed writes work.

In [8]:
access_key = !aws configure get aws_access_key_id
access_key = access_key[0]
secret_key = !aws configure get aws_secret_access_key
secret_key = secret_key[0]

In [9]:
client_kwargs={'endpoint_url': 'https://object-store.cloud.muni.cz'}

#s3 = s3fs.S3FileSystem(anon=False, client_kwargs=client_kwargs) # Works only when using s3 in this Notebook, not with distributed.
s3 = s3fs.S3FileSystem(key=access_key, secret=secret_key, client_kwargs=client_kwargs)

In [10]:
s3.ls('foss4g-data')

['foss4g-data/CGLS_LTS_1999_2019',
 'foss4g-data/CGLS_LTS_1999_2019_Lombardia',
 'foss4g-data/test']

In [11]:
s3.touch('foss4g-data/myfile')

{'ResponseMetadata': {'RequestId': 'tx0000000000000001114d2-006324d6cb-6dd439d2-default',
  'HostId': '',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'accept-ranges': 'bytes',
   'content-length': '0',
   'date': 'Fri, 16 Sep 2022 20:04:28 GMT',
   'etag': '"d41d8cd98f00b204e9800998ecf8427e"',
   'x-amz-request-id': 'tx0000000000000001114d2-006324d6cb-6dd439d2-default'},
  'RetryAttempts': 0},
 'ETag': '"d41d8cd98f00b204e9800998ecf8427e"'}

In [12]:
s3.ls('foss4g-data')

['foss4g-data/CGLS_LTS_1999_2019',
 'foss4g-data/CGLS_LTS_1999_2019_Lombardia',
 'foss4g-data/myfile',
 'foss4g-data/test']

In [13]:
s3.rm('foss4g-data/myfile')

In [14]:
s3.ls('foss4g-data')

['foss4g-data/CGLS_LTS_1999_2019',
 'foss4g-data/CGLS_LTS_1999_2019_Lombardia',
 'foss4g-data/test']

# Access with swiftfsspec

The following cell don't work. Currently not able to make swiftspec work.

In [15]:
swift = swiftspec.SWIFTFileSystem()

In [16]:
#import fsspec
#with fsspec.open("swift://object-store.cloud.muni.cz/swift/v1/pangeo-test/jupyterbook-html.zip", "r") as f:
#    print(f.read())

In [17]:
#fs.ls("swift://object-store.cloud.muni.cz/swift/pangeo-test/none")

# Zarr swift store

In [18]:
import os
os.environ["OS_STORAGE_URL"] = "https://object-store.cloud.muni.cz/swift/v1"
os.environ["OS_AUTH_TOKEN"] = "gAAAAABjJNaRkgzp5vN9IsiH76iygdwuaPtS8Zr2py8MG8rgIXjbSMcYrvJS0TA2d2dU9kCVA7CIj3cqRgTFcaSDeFT8NtPgCTtTyiKyazPWJB1O06Yy9cCt9qPWw-uYhnrWDNvbR0qFf_15cNJAA11nc1keIiJwpj7_s_xyBZoZQnmYhfRNsdCFTtXjT2ORU2bRiNz38OkuRKOw_o57ix2ePkPQ91ucDH8U6KYavOtfu2Qlb8RjryVkBbsf5ekLoFQyhsYmlfHG"

In [19]:
auth = {
    "preauthurl": os.environ["OS_STORAGE_URL"],
    "preauthtoken": os.environ["OS_AUTH_TOKEN"],
}

In [20]:
ds = xr.Dataset(
        {"foo": (('x', 'y'), np.random.rand(4, 5))},
        coords = {
          'x': [10, 20, 30, 40],
          'y': [1, 2, 3, 4, 5],
        },
)

store = SwiftStore(container='demo', prefix='xarray-demo', storage_options=auth)

In [21]:
store.clear()

In [22]:
%%time
ds.to_zarr(store=store, mode='w', consolidated=True)

CPU times: user 209 ms, sys: 12.5 ms, total: 221 ms
Wall time: 22.7 s


<xarray.backends.zarr.ZarrStore at 0x7f17058c0820>

We observe that writing such a small dataset already takes time, probably mainly for meta data and setting up things?

In [23]:
ds = xr.open_zarr(store=store, consolidated=True)

# Performance test

## Dataset setup

In [24]:
def timeseries(
    chunk_per_worker=5,
    chunk_size="128 MB",
    num_nodes=12,
    worker_per_node=4,
    chunking_scheme=None,
    lat=320,
    lon=384,
    start="1980-01-01",
    freq="1H",
    nan=False,
):
    """ Create synthetic Xarray dataset filled with random
    data.
    Parameters
    ----------
    chunk_per_worker : int
          number of chunk placed per worker.
          see docs.dask.org, best practices, for chunk.
          Best chunk size is around 100M but, each worker can
          have many chunk, which automate the parallelism in dask.
    chunk_size : str
          chunk size in bytes, kilo, mega or any factor of bytes
    num_nodes : int
           number of compute nodes
    worker_per_node: int
           number of dask workers per node
    chunking_scheme : str
           Whether to chunk across time dimension ('temporal') or
           horizontal dimensions (lat, lon) ('spatial').
           If None, automatically determine chunk sizes along all dimensions.
    lat : int
         number of latitude values
    lon : int
         number of longitude values
    start : datetime (or datetime-like string)
        Start of time series
    freq : string
        String like '2s' or '1H' or '12W' for the time series frequency
    nan : bool
         Whether to include nan in generated data
    Examples
    ---------
    >>> from benchmarks.datasets import timeseries
    >>> ds = timeseries('128MB', 5, chunking_scheme='spatial', lat=500, lon=600)
    >>> ds
    <xarray.Dataset>
    Dimensions:  (lat: 500, lon: 600, time: 267)
    Coordinates:
    * time     (time) datetime64[ns] 1980-01-01 1980-01-02 ... 1980-09-23
    * lon      (lon) float64 -180.0 -179.4 -178.8 -178.2 ... 178.8 179.4 180.0
    * lat      (lat) float64 -90.0 -89.64 -89.28 -88.92 ... 88.92 89.28 89.64 90.0
    Data variables:
        sst      (time, lon, lat) float64 dask.array<shape=(267, 600, 500), .....
    Attributes:
        history:  created for compute benchmarking
    """

    dt = np.dtype("f8")
    itemsize = dt.itemsize
    chunk_size = parse_bytes(chunk_size)
    total_bytes = chunk_size * num_nodes * worker_per_node * chunk_per_worker
    # total_bytes = chunk_size * num_nodes * worker_per_node
    size = total_bytes / itemsize
    timesteps = math.ceil(size / (lat * lon))
    shape = (timesteps, lon, lat)
    if chunking_scheme == "temporal":
        x = math.ceil(chunk_size / (lon * lat * itemsize))
        chunks = (x, lon, lat)
    elif chunking_scheme == "spatial":
        x = math.ceil(math.sqrt(chunk_size / (timesteps * itemsize)))
        chunks = (timesteps, x, x)
    else:
        chunks = "auto"

    lats = xr.DataArray(np.linspace(start=-90, stop=90, num=lat), dims=["lat"])
    lons = xr.DataArray(np.linspace(start=-180, stop=180, num=lon), dims=["lon"])
    times = xr.DataArray(pd.date_range(start=start, freq=freq, periods=timesteps), dims=["time"])
    if chunks == "auto":
        with dask.config.set({"array.chunk-size": chunk_size}):
            random_data = randn(shape=shape, chunks=chunks, nan=nan)
    else:
        random_data = randn(shape=shape, chunks=chunks, nan=nan)
    ds = xr.DataArray(
        random_data,
        dims=["time", "lon", "lat"],
        coords={"time": times, "lon": lons, "lat": lats},
        name="sst",
        attrs={"units": "baz units", "description": "a description"},
    ).to_dataset()
    ds.attrs = {"history": "created for compute benchmarking"}

    return ds


def randn(shape, chunks=None, nan=False, seed=0):
    rng = da.random.RandomState(seed)
    x = 5 + 3 * rng.standard_normal(shape, chunks=chunks)
    if nan:
        x = da.where(x < 0, np.nan, x)
    return x

## Test without Dask cluster

In [25]:
ds = timeseries(chunk_size='64 MiB', chunking_scheme='temporal', chunk_per_worker=1, num_nodes=1)
ds

Unnamed: 0,Array,Chunk
Bytes,256.88 MiB,64.69 MiB
Shape,"(274, 384, 320)","(69, 384, 320)"
Count,12 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 256.88 MiB 64.69 MiB Shape (274, 384, 320) (69, 384, 320) Count 12 Tasks 4 Chunks Type float64 numpy.ndarray",320  384  274,

Unnamed: 0,Array,Chunk
Bytes,256.88 MiB,64.69 MiB
Shape,"(274, 384, 320)","(69, 384, 320)"
Count,12 Tasks,4 Chunks
Type,float64,numpy.ndarray


In [26]:
store = SwiftStore(container='pangeo-test', prefix='small-data-without-dask', storage_options=auth)
store.clear()

In [27]:
%%time
ds.to_zarr(store=store, mode='w', consolidated=True)

CPU times: user 1.72 s, sys: 388 ms, total: 2.11 s
Wall time: 30.2 s


<xarray.backends.zarr.ZarrStore at 0x7f16cd91f6d0>

A bit more time for this 4 chunks Dataset.

## Setup Dask gateway cluster

In [28]:
from dask_gateway import Gateway
gateway = Gateway()

In [29]:
cluster = gateway.new_cluster(worker_memory=4)
cluster.scale(6)
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [30]:
from dask.distributed import Client
client = Client(cluster)
client.wait_for_workers(6)
client


+-------------+----------+-----------+---------+
| Package     | client   | scheduler | workers |
+-------------+----------+-----------+---------+
| dask        | 2022.7.0 | 2022.8.0  | None    |
| distributed | 2022.7.0 | 2022.8.0  | None    |
+-------------+----------+-----------+---------+


0,1
Connection method: Cluster object,Cluster type: dask_gateway.GatewayCluster
Dashboard: /jupyterhub/services/dask-gateway/clusters/daskhub.4f9bd84bbefd47ce84d881e99f1f2a8f/status,


In [31]:
from dask.distributed import PipInstall
plugin = PipInstall(packages=["zarr-swiftstore"], pip_options=["--upgrade"])

In [32]:
client.register_worker_plugin(plugin)

{'tls://10.244.10.57:40253': {'status': 'OK'},
 'tls://10.244.3.87:33043': {'status': 'OK'},
 'tls://10.244.4.73:38619': {'status': 'OK'},
 'tls://10.244.5.51:34553': {'status': 'OK'},
 'tls://10.244.7.71:45353': {'status': 'OK'},
 'tls://10.244.8.76:34651': {'status': 'OK'}}

## Try example computation

In [33]:
import dask.array as da

sample = 10_000_000_000  # <- this is huge!
xxyy = da.random.uniform(-1, 1, size=(2, sample))
norm = da.linalg.norm(xxyy, axis=0)
summ = da.sum(norm <= 1)
insiders = summ.compute()
pi = 4 * insiders / sample
print("pi ~= {}".format(pi))

pi ~= 3.1415952216


## Really small example dataset with Dask cluster

In [34]:
ds = xr.Dataset(
        {"foo": (('x', 'y'), np.random.rand(4, 5))},
        coords = {
          'x': [10, 20, 30, 40],
          'y': [1, 2, 3, 4, 5],
        },
)

ds = ds.chunk()
ds

Unnamed: 0,Array,Chunk
Bytes,160 B,160 B
Shape,"(4, 5)","(4, 5)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 160 B 160 B Shape (4, 5) (4, 5) Count 1 Tasks 1 Chunks Type float64 numpy.ndarray",5  4,

Unnamed: 0,Array,Chunk
Bytes,160 B,160 B
Shape,"(4, 5)","(4, 5)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [35]:
store = SwiftStore(container='demo', prefix='xarray-demo-dask', storage_options=auth)
store.clear()

In [36]:
%%time
ds.to_zarr(store=store, mode='w', consolidated=True)

CPU times: user 239 ms, sys: 14.3 ms, total: 253 ms
Wall time: 23.2 s


<xarray.backends.zarr.ZarrStore at 0x7f16cd91f900>

Same time with or without cluster for this dataset.

## Small example with Dask cluster and S3

In [37]:
store_s3 = s3fs.S3Map(root='demo/xarray-demo-dask-s3',
                   s3=s3,
                   check=False)
store_s3.clear()

In [38]:
%%time
ds.to_zarr(store=store_s3, mode='w', consolidated=True)

CPU times: user 406 ms, sys: 22.7 ms, total: 429 ms
Wall time: 29 s


<xarray.backends.zarr.ZarrStore at 0x7f16ccbcddd0>

## Small dataset with Dask cluster

In [39]:
ds = timeseries(chunk_size='64 MiB', chunking_scheme='temporal', chunk_per_worker=1, num_nodes=1)
ds

Unnamed: 0,Array,Chunk
Bytes,256.88 MiB,64.69 MiB
Shape,"(274, 384, 320)","(69, 384, 320)"
Count,12 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 256.88 MiB 64.69 MiB Shape (274, 384, 320) (69, 384, 320) Count 12 Tasks 4 Chunks Type float64 numpy.ndarray",320  384  274,

Unnamed: 0,Array,Chunk
Bytes,256.88 MiB,64.69 MiB
Shape,"(274, 384, 320)","(69, 384, 320)"
Count,12 Tasks,4 Chunks
Type,float64,numpy.ndarray


In [40]:
store = SwiftStore(container='pangeo-test', prefix='small-data-with-dask', storage_options=auth)
store.clear()

In [41]:
%%time
ds.to_zarr(store=store, mode='w', consolidated=True)

CPU times: user 328 ms, sys: 13.3 ms, total: 341 ms
Wall time: 30.5 s


<xarray.backends.zarr.ZarrStore at 0x7f16ccbcbf90>

Just a bit slower with the cluster, not meaningful.

Same as without Dask Cluster. So there are 30s just to setup things.

## Medium Dataset

In [42]:
ds = timeseries(chunk_size='64 MiB', chunking_scheme='temporal', chunk_per_worker=5)
ds

Unnamed: 0,Array,Chunk
Bytes,15.00 GiB,64.69 MiB
Shape,"(16384, 384, 320)","(69, 384, 320)"
Count,714 Tasks,238 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 15.00 GiB 64.69 MiB Shape (16384, 384, 320) (69, 384, 320) Count 714 Tasks 238 Chunks Type float64 numpy.ndarray",320  384  16384,

Unnamed: 0,Array,Chunk
Bytes,15.00 GiB,64.69 MiB
Shape,"(16384, 384, 320)","(69, 384, 320)"
Count,714 Tasks,238 Chunks
Type,float64,numpy.ndarray


In [43]:
store = SwiftStore(container='pangeo-test', prefix='random-data', storage_options=auth)

In [44]:
%%time
store.clear()

CPU times: user 1.03 s, sys: 57.4 ms, total: 1.09 s
Wall time: 1min 59s


Deleting several chunks takes time!!

In [45]:
%%time
ds.to_zarr(store=store, mode='w', consolidated=True)

CPU times: user 764 ms, sys: 47.7 ms, total: 812 ms
Wall time: 1min 56s


<xarray.backends.zarr.ZarrStore at 0x7f16ccbd7660>

Between 1.5s and 2s for writing each chunks. After 30s incompressible time.

In [46]:
30 + 2*238/6

109.33333333333333

## Write with small chunks (bad)

In [47]:
ds_ios = timeseries(chunk_per_worker=100, chunking_scheme='temporal',chunk_size="256 KB")
ds_ios

Unnamed: 0,Array,Chunk
Bytes,1.14 GiB,0.94 MiB
Shape,"(1250, 384, 320)","(1, 384, 320)"
Count,3750 Tasks,1250 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.14 GiB 0.94 MiB Shape (1250, 384, 320) (1, 384, 320) Count 3750 Tasks 1250 Chunks Type float64 numpy.ndarray",320  384  1250,

Unnamed: 0,Array,Chunk
Bytes,1.14 GiB,0.94 MiB
Shape,"(1250, 384, 320)","(1, 384, 320)"
Count,3750 Tasks,1250 Chunks
Type,float64,numpy.ndarray


In [48]:
store_ios = SwiftStore(container='pangeo-test', prefix='random-data-iops', storage_options=auth)
store_ios.clear()

This one was not timed, but it took a really really long time.

In [49]:
%%time
ds_ios.to_zarr(store_ios)

CPU times: user 2.2 s, sys: 118 ms, total: 2.32 s
Wall time: 3min 55s


<xarray.backends.zarr.ZarrStore at 0x7f16ccd49740>

About 0.5s write time for each chunk. Latency is measured here. But it takes also more than 30s to really start.

In [50]:
30 + 0.8*1250/6

196.66666666666666

## Scale and Write

In [51]:
cluster.scale(12)
client.wait_for_workers(12)
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [52]:
client.register_worker_plugin(plugin)

{'tls://10.244.10.57:40253': {'status': 'OK'},
 'tls://10.244.10.58:45237': {'status': 'OK'},
 'tls://10.244.2.150:34487': {'status': 'OK'},
 'tls://10.244.3.87:33043': {'status': 'OK'},
 'tls://10.244.3.88:33019': {'status': 'OK'},
 'tls://10.244.4.73:38619': {'status': 'OK'},
 'tls://10.244.4.74:44611': {'status': 'OK'},
 'tls://10.244.5.51:34553': {'status': 'OK'},
 'tls://10.244.7.71:45353': {'status': 'OK'},
 'tls://10.244.8.76:34651': {'status': 'OK'},
 'tls://10.244.8.77:44805': {'status': 'OK'},
 'tls://10.244.9.59:39655': {'status': 'OK'}}

In [53]:
ds = timeseries(chunk_size='64 MiB', chunking_scheme='temporal', chunk_per_worker=5)
ds

Unnamed: 0,Array,Chunk
Bytes,15.00 GiB,64.69 MiB
Shape,"(16384, 384, 320)","(69, 384, 320)"
Count,714 Tasks,238 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 15.00 GiB 64.69 MiB Shape (16384, 384, 320) (69, 384, 320) Count 714 Tasks 238 Chunks Type float64 numpy.ndarray",320  384  16384,

Unnamed: 0,Array,Chunk
Bytes,15.00 GiB,64.69 MiB
Shape,"(16384, 384, 320)","(69, 384, 320)"
Count,714 Tasks,238 Chunks
Type,float64,numpy.ndarray


In [54]:
%%time
store = SwiftStore(container='pangeo-test', prefix='random-data', storage_options=auth)
store.clear()

CPU times: user 994 ms, sys: 66.3 ms, total: 1.06 s
Wall time: 1min 51s


In [55]:
%%time
ds.to_zarr(store=store, mode='w', consolidated=True)

CPU times: user 669 ms, sys: 53.3 ms, total: 722 ms
Wall time: 1min 14s


<xarray.backends.zarr.ZarrStore at 0x7f16ccbd7430>

With 12 workers, the chunk writing time is a bit longer, between 1.5 and sometimes close to 2.5s. Considering also the initial incompressible time, speed up is clearly not twice as fast, but still noticeable.

## Write with S3

In [56]:
store_s3 = s3fs.S3Map(root='pangeo-test/random-data',
                   s3=s3,
                   check=False)

In [57]:
%%time
store_s3.clear()

CPU times: user 118 ms, sys: 920 µs, total: 119 ms
Wall time: 8.75 s


S3 delete time seems fastest.

In [58]:
%%time
ds.to_zarr(store=store_s3, mode='w', consolidated=True)

CPU times: user 873 ms, sys: 66.7 ms, total: 939 ms
Wall time: 1min 28s


<xarray.backends.zarr.ZarrStore at 0x7f16ccbda4a0>

## Read with Zarr-swift

Here we assume mean computation is negligible compared to IOs.

In [59]:
%%time
ds_read = xr.open_zarr(store)
ds_read.sst.mean().compute()

CPU times: user 164 ms, sys: 7.14 ms, total: 171 ms
Wall time: 38.2 s


Chunks read time between 1s and 1.6s.

## Read with S3

In [60]:
store_s3 = s3fs.S3Map(root='pangeo-test/random-data',
                   s3=s3,
                   check=False)

In [61]:
%%time
ds_read_s3 = xr.open_zarr(store_s3)
ds_read_s3.sst.mean().compute()

CPU times: user 160 ms, sys: 12 ms, total: 172 ms
Wall time: 37.4 s


Faster to read using S3 interface?
Chunks read time between 800ms and 1.6s.

## Clean resources

In [62]:
cluster.shutdown()

  self.scheduler_comm.close_rpc()


In [63]:
client.close()