In [1]:
import xarray as xr
import numpy as np
from numpy import pi, sin, cos, arccos, clip, deg2rad
import numpy.ma as ma
from datetime import datetime
import dask
import time

#### I have 30 datasets stored in 30 zarr-stores, each dataset is just 5 columns of equal size:
- latitude, shape=(2105319,)
- longitude,   shape=(2105319,)
- time,   shape=(2105319,)
- mean,   shape=(2105319, 1)
- anomalies,   shape=(2105319, 1)

In [105]:
import json
import gcsfs

with open('pangeo-181919-e7bc5bdaf4d5.json') as f:
    token = json.load(f)
gcs = gcsfs.GCSFileSystem(token=token)


plevel = 4 # Choosing an arbitrary dataset out of the 30
dspath='pangeo-argo-eke/global/mean_and_anomalies/with_pressure_coordinate/readable_ws_and_NHarm/mean_and_anomalies_global_ws500_plevel'+str(plevel)+'.zarr'
dsmapper = gcs.get_mapper(dspath)

In [12]:
ds = xr.open_zarr(dsmapper, consolidated=True)

### Showing how it was saved

In [15]:
# Mean and anomalies for pressurelevel is saved to output-path:
ds = xr.Dataset(
    data_vars = dict(
        Mean = ( ["i", "pressure"], ds.Mean.load().values ),
        Anomalies = ( ["i", "pressure"], ds.Anomalies.load().values ),
        time = ( ["i"], ds.time.load().values ),
        latitude = ( ["i"], ds.latitude.load().values ),
        longitude = ( ["i"], ds.longitude.load().values ),
        
    ),
    coords=dict(
        pressure = (["pressure"], ds.pressure.load().values ),
        i = (["i"], np.arange(ds.Mean.size) )

    ),
    attrs=dict(
        description = 'Estimated mean dynamic height on profile-coordinates, and anomalies by subtracting estimated mean from observations',
        pressureindex = ds.pressureindex,
        number_of_harmonics = ds.number_of_harmonics,
        window_size = ds.window_size,
        creation_date = str( datetime.now() )
    )
)   

ds.i.attrs["standard_name"] = 'Observation_index'
ds.time.attrs["standard_name"] = 'time'
#ds.time.attrs["units"] = 'days since 1970-01-01 00:00:00'
ds.latitude.attrs["standard_name"] = 'latitude'
ds.latitude.attrs["units"] = 'degrees_north'
ds.longitude.attrs["standard_name"] = 'longitude'
ds.longitude.attrs["units"] = 'degrees_east'
ds.pressure.attrs["standard_name"] = 'pressure'
ds.pressure.attrs["units"] = 'decibar'
ds.Mean.attrs["standard_name"] = 'Estimated mean dynamic height'
ds.Mean.attrs["units"] = 'm^2/s^2'
ds.Anomalies.attrs["standard_name"] = 'dynamic height anomalies'
ds.Anomalies.attrs["units"] = 'm^2/s^2'

dsc = ds.chunk({"i":ds.Mean.size, "pressure":1 })
outfile = 'pangeo-argo-eke/chunk_alternatives/global_mean_and_anomalies_plevel'+str(plevel)+'.zarr'
mapper = gcs.get_mapper(outfile)
dsc.to_zarr(mapper, consolidated=True)

<xarray.backends.zarr.ZarrStore at 0x7f01b092f3e0>

In [17]:
dsc

Unnamed: 0,Array,Chunk
Bytes,16.06 MiB,16.06 MiB
Shape,"(2105319, 1)","(2105319, 1)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 16.06 MiB 16.06 MiB Shape (2105319, 1) (2105319, 1) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1  2105319,

Unnamed: 0,Array,Chunk
Bytes,16.06 MiB,16.06 MiB
Shape,"(2105319, 1)","(2105319, 1)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,16.06 MiB,16.06 MiB
Shape,"(2105319, 1)","(2105319, 1)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 16.06 MiB 16.06 MiB Shape (2105319, 1) (2105319, 1) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",1  2105319,

Unnamed: 0,Array,Chunk
Bytes,16.06 MiB,16.06 MiB
Shape,"(2105319, 1)","(2105319, 1)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,16.06 MiB,16.06 MiB
Shape,"(2105319,)","(2105319,)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,datetime64[ns] numpy.ndarray,datetime64[ns] numpy.ndarray
"Array Chunk Bytes 16.06 MiB 16.06 MiB Shape (2105319,) (2105319,) Dask graph 1 chunks in 1 graph layer Data type datetime64[ns] numpy.ndarray",2105319  1,

Unnamed: 0,Array,Chunk
Bytes,16.06 MiB,16.06 MiB
Shape,"(2105319,)","(2105319,)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,datetime64[ns] numpy.ndarray,datetime64[ns] numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,16.06 MiB,16.06 MiB
Shape,"(2105319,)","(2105319,)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 16.06 MiB 16.06 MiB Shape (2105319,) (2105319,) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",2105319  1,

Unnamed: 0,Array,Chunk
Bytes,16.06 MiB,16.06 MiB
Shape,"(2105319,)","(2105319,)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,16.06 MiB,16.06 MiB
Shape,"(2105319,)","(2105319,)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 16.06 MiB 16.06 MiB Shape (2105319,) (2105319,) Dask graph 1 chunks in 1 graph layer Data type float64 numpy.ndarray",2105319  1,

Unnamed: 0,Array,Chunk
Bytes,16.06 MiB,16.06 MiB
Shape,"(2105319,)","(2105319,)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


#### Size of each dataset in Megabytes

In [16]:
dsc.nbytes/1e6

101.05532

### Connect to a dask-cluster and set cluster-options

In [19]:
from dask_gateway import GatewayCluster, Gateway
from distributed import Client

g = Gateway()
g.list_clusters()

[]

In [20]:
#cluster = g.connect(g.list_clusters()[0].name)

In [21]:
options = g.cluster_options()
options.worker_cores = 2; options.worker_memory = 4
# Create a cluster with those options
cluster = g.new_cluster(options)

In [22]:
g.list_clusters()

[ClusterReport<name=prod.63e5c54136a24eaeae399cde7e97060b, status=RUNNING>]

In [23]:
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_gateway.GatewayCluster
Dashboard: /services/dask-gateway/clusters/prod.63e5c54136a24eaeae399cde7e97060b/status,


In [24]:
cluster.scale(1)

### Loading of data from cloud-storage

Loading data only:

In [100]:
@dask.delayed
def load(mapper):
    
    """
    Load data from zarr-store
    """
    ds = xr.open_dataset(mapper, engine="zarr",consolidated=True, chunks={})
    data0 = ds.Anomalies.load().values[:,0]
    #ii = ~xr.apply_ufunc(np.isnan, data0)
    #data, lat, lon, time = data0[ii], ds.latitude.load().values[ii], ds.longitude.load().values[ii], ds.time.load().values[ii]
    
    ### Calculation using data-, lat-, lon-, and time-arrays
    ###
    ### Returns one array
    #return data, lat, lon, time
    return data0

In [25]:
mapper = gcs.get_mapper(outfile)

#### Alternatives:

<br>
ds = xr.open_zarr(mapper, consolidated=True, chunks='auto') : 

In [97]:
%timeit dask.compute( load(mapper) )[0]

1.82 s ± 46.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


<br>
ds = xr.open_zarr(mapper, consolidated=True, chunks=None) :

In [99]:
%timeit dask.compute( load(mapper) )[0]

1.8 s ± 60 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


<br>
ds = xr.open_dataset(mapper, engine="zarr",consolidated=True, chunks={}) :

In [101]:
%timeit dask.compute( load(mapper) )[0]

1.83 s ± 60.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


#### Scaling down and closing cluster

In [102]:
cluster.scale(0)

In [103]:
cluster.close()

In [104]:
cluster.shutdown()

2023-07-25 12:33:42,776 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
