# My attempts to save datasets into zarrs in the google bucket. 
I am trying it with bedmachine and with PISM output.

One problem was with the bedmachine dataset. I sorted that and then managed to get a small dataset (initially loaded from a netcdf) into and back out of the google bucket. Now the issue that I am running out of memory and the server crashes. 

Next step is to try to get it running on the cluster, but .to_zarr does not run on the cluster as it stands. 

In [1]:
import dask
import dask.array as da
import dask.delayed
from dask.distributed import Client
import dask_gateway
import numpy as np
import xarray as xr
xr.set_options(display_style="html")
import fsspec
import gcsfs

### 1. Start a cluster

In [2]:
# get the dask-gateway version
dask_gateway.__version__
# show the default dask-gateway settings
dask.config.config['gateway']
#default gateway call
gateway = dask_gateway.Gateway()
# default new_cluster call
cluster = gateway.new_cluster()
#gateway = Gateway()
gateway.list_clusters()
# the dashboard_link property will show the link that can be pasted into the Dask labextension
cluster.dashboard_link
# scale cluster to 8 workers using the scale() method
cluster.scale(8)
# connect a client
# the distributed client is used for running parallel tasks with Dask
client = Client(cluster)
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [13]:
# test that we can access the cluster  - this does send this job to the cluster
arr = da.ones((100,), chunks=(10,))
arr.compute()

array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])

### 2a. load the netcdf
This, I now understand, does not load a dask array, just a lazily loaded xarray.

In [37]:
url = 'https://storage.googleapis.com/ldeo-glaciology/bedmachine/BedMachineAntarctica_2019-11-05_v01.nc'  
with  fsspec.open(url, mode='rb')  as openfile:  
    bm = xr.open_dataset(openfile)  

# remove the variable mapping because it was causing an error in the write to zarr
bm.attrs = {**bm.attrs, **bm.mapping.attrs} # but keep the information in the attributes of the whole dataset. 
bm = bm.drop('mapping')   # remove the variable. 
type(bm.thickness.data)

numpy.ndarray

### 2b. load the netcdf
By defining chunk size, this one loads a dask array.

In [36]:
url = 'https://storage.googleapis.com/ldeo-glaciology/bedmachine/BedMachineAntarctica_2019-11-05_v01.nc'  
with  fsspec.open(url, mode='rb')  as openfile:  
    bm_dask = xr.open_dataset(openfile,chunks=3000)  

# remove the variable mapping because it was causing an error in the write to zarr
bm_dask.attrs = {**bm_dask.attrs, **bm_dask.mapping.attrs} # but keep the information in the attributes of the whole dataset. 
bm_dask = bm_dask.drop('mapping')   # remove the variable. 
type(bm_dask.thickness.data)

dask.array.core.Array

### 3. take small subsets of the data (from the non-dask and dask arrays)

In [40]:
bm_small = bm.isel(x=slice(0,20), y = slice(0, 20))  # create a very small version of the dataset for testing the upload and download
bm_small.nbytes/1e3   #  it is only 10 MB

9.76

In [41]:
bm_dask_small = bm_dask.isel(x=slice(0,20), y = slice(0, 20))  # create a very small version of the dataset for testing the upload and download
bm_dask_small.nbytes/1e3   #  it is only 10 MB

9.76

### 4. write the small subset to the bucket
bm_small (the non-dask array) can be written to a zarr directory in our bucket no problem. 

In [42]:
bm_small_mapper = fsspec.get_mapper('gs://ldeo-glaciology/temp/bm_small7.zarr', mode='ab',
                            token='../secrets/ldeo-glaciology-bc97b12df06b.json')  # get a mapper object using the token stored in the ooi environment
bm_small.to_zarr(bm_small_mapper, mode='w');   # write the dataset to zarr in the google bucket

The next cell loads the small subset as a dask array, but fails by killing a worker

In [44]:
#bm_small_mapper = fsspec.get_mapper('gs://ldeo-glaciology/temp/bm_small4.zarr') # This also works - just to make sure we dont need the token to access
bm_small_reloaded = xr.open_zarr(bm_small_mapper) # reload the dataset using the same mapper as before
bm_small_reloaded.identical(bm_small)    # check that what we get back is the same as what we tried to load up.

KilledWorker: ('zarr-77e3047971eb1a88492da229d54c270f', <Worker 'tls://10.36.234.226:43439', name: dask-worker-1df6cf19a1494f40b8a35ee1aa2b28a3-sg89z, memory: 0, processing: 1>)

### Next try writing the small set of the dask array to the bucket. This fails with "distributed.protocol.core - CRITICAL - Failed to Serialize" and "CancelledError: store-b2d9d9a2-29ae-11eb-b891-76f16e16af4b"

In [45]:

#bm_small_mapper = fsspec.get_mapper('gs://ldeo-glaciology/temp/bm_small_dask_1.zarr', mode='ab',
#                            token='../secrets/ldeo-glaciology-bc97b12df06b.json')  # get a mapper object using the token stored in the ooi environment
#bm_small.to_zarr(bm_small_mapper, mode='w');   # write the dataset to zarr in the google bucket

import json
with open('../secrets/ldeo-glaciology-bc97b12df06b.json') as token_file:
    token = json.load(token_file)
gcs = gcsfs.GCSFileSystem(token=token)

bm_small_mapper = gcs.get_mapper('gs://ldeo-glaciology/temp/bm_small_dask_1.zarr')
bm_dask_small.to_zarr(bm_small_mapper, mode='w');



distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/core.py", line 50, in dumps
    data = {
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/core.py", line 51, in <dictcomp>
    key: serialize(
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type ImplicitToExplicitIndexingAdapter.', 'ImplicitToExplicitIndexingAdapter(array=CopyOnWriteArray(array=LazilyOuterIndexedArray(array=<xarray.backends.h5netcdf_.H5NetCDFArrayWrapper object at 0x7f8f5c6108b0>, key=BasicIndexer((slice(None, None, None), slice(None, None, None))))))')
distributed.comm.utils - ERROR - ('Could not serialize object of type ImplicitToExplicitIndexingAdapter.', 'ImplicitToExplicitIndexingAdapter(array=CopyOnWrite

CancelledError: store-b2d9d9a2-29ae-11eb-b891-76f16e16af4b

### Writing the non-dask version of the full dataset to zarr fails on the largest version of ooi.pangeo.io, but on the larger https://us-central1-b.gcp.pangeo.io/ it works. 

In [7]:
bm_mapper = fsspec.get_mapper('gs://ldeo-glaciology/bedmachine/bm.zarr', mode='ab',
                            token='../secrets/ldeo-glaciology-bc97b12df06b.json')
bm.to_zarr(bm_mapper, mode='w');

In [10]:
bm_reloaded = xr.open_zarr(bm_mapper)  
bm_reloaded.identical(bm)

True

In [15]:
cluster.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
