# Creation of `storage-benchmark` Datasets
Datasets from LLC4320 MITgcm implementation have been stored on Columbia-LDEO servers. This notebook subsets this dataset to be made available on various platforms

In [6]:
from dask.distributed import Client, LocalCluster
from matplotlib import pyplot as plt
import gcsfs
import xarray as xr
import zarr
import os
import glob

import os.path as op
import xmitgcm
import xrsigproc
from dask.diagnostics import ProgressBar

In [7]:
cluster = LocalCluster(n_workers=16, ip='129.236.21.48')
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://129.236.21.48:45526  Dashboard: http://129.236.21.48:8787/status,Cluster  Workers: 16  Cores: 80  Memory: 1.20 TB


In [56]:
from dask.distributed import Client
c = Client("tcp://gyre.ldeo.columbia.edu:8786")
c

0,1
Client  Scheduler: tcp://gyre.ldeo.columbia.edu:8786  Dashboard: http://gyre.ldeo.columbia.edu:8787/status,Cluster  Workers: 32  Cores: 64  Memory: 512.00 GB


In [38]:
c.close()

# Create datasets
Tidy up LLC4320 datasets (get in right coordinates, just take temperature measurements, etc.), and cull down to approximately the right size we want for our benchmarks.

In [19]:
# Access multiple time sets of data. 
ddir     = '/swot/SUM01/LLC/llc_4320_agulhas/'
gridfile = op.join(ddir, 'llc_4320_agulhas_grid.nc')

# 1 TB dataset size
#bfdata   = xr.open_mfdataset('/swot/SUM01/LLC/llc_4320_agulhas/llc_4320_agulhas.00001[0-9]*.nc', 
#                            decode_cf=False, autoclose=True, chunks={'k': 1, 'k_l': 1})

# 100 GB
bfdata   = xr.open_mfdataset('/swot/SUM01/LLC/llc_4320_agulhas/llc_4320_agulhas.000010[0-9]*.nc', 
                             decode_cf=False, autoclose=True, chunks={'k': 1, 'k_l': 1})

# 10 GB
#bfdata   = xr.open_mfdataset('/swot/SUM01/LLC/llc_4320_agulhas/llc_4320_agulhas.0000100*.nc', 
#                             decode_cf=False, autoclose=True, chunks={'k': 1, 'k_l': 1})


grid     = xr.open_dataset(gridfile)#.chunk()
bfds     = xr.merge([grid.drop('face'), bfdata.drop('face')],)
bfds     = xmitgcm.mds_store._swap_dimensions(bfds, geometry='sphericalpolar')
bfds

<xarray.Dataset>
Dimensions:   (XC: 2160, XG: 2160, YC: 2160, YG: 2160, Z: 90, Zl: 90, Zp1: 91, Zu: 90, time: 69)
Coordinates:
  * YC        (YC) float32 -57.001026 -56.989952 -56.978878 -56.9678 ...
  * YG        (YG) float32 -57.00656 -56.995487 -56.984413 -56.97334 ...
  * XC        (XC) float32 -15.489583 -15.46875 -15.447917 -15.427083 ...
  * XG        (XG) float32 -15.5 -15.479167 -15.458333 -15.4375 -15.416667 ...
  * Zp1       (Zp1) float32 0.0 -1.0 -2.14 -3.44 -4.93 -6.63 -8.56 -10.76 ...
  * Z         (Z) float32 -0.5 -1.57 -2.79 -4.185 -5.78 -7.595 -9.66 -12.01 ...
  * Zl        (Zl) float32 0.0 -1.0 -2.14 -3.44 -4.93 -6.63 -8.56 -10.76 ...
  * Zu        (Zu) float32 -1.0 -2.14 -3.44 -4.93 -6.63 -8.56 -10.76 -13.26 ...
  * time      (time) float64 2.502e+06 2.506e+06 2.509e+06 2.513e+06 ...
Data variables:
    rA        (YC, XC) float32 ...
    rAw       (YC, XG) float32 ...
    rAs       (YG, XC) float32 ...
    rAz       (YG, XG) float32 ...
    dxG       (YG, XC) float32

In [20]:
ds_Theta = bfds.Theta.to_dataset(name='Theta')
ds_Theta

<xarray.Dataset>
Dimensions:  (XC: 2160, YC: 2160, Z: 90, time: 69)
Coordinates:
  * YC       (YC) float32 -57.001026 -56.989952 -56.978878 -56.9678 ...
  * XC       (XC) float32 -15.489583 -15.46875 -15.447917 -15.427083 ...
  * Z        (Z) float32 -0.5 -1.57 -2.79 -4.185 -5.78 -7.595 -9.66 -12.01 ...
  * time     (time) float64 2.502e+06 2.506e+06 2.509e+06 2.513e+06 ...
Data variables:
    Theta    (time, Z, YC, XC) float32 dask.array<shape=(69, 90, 2160, 2160), chunksize=(1, 1, 2160, 2160)>

In [21]:
ds_Theta.nbytes / 2**30

107.93425346910954

In [None]:
llc_grid = gcsfs.mapping.GCSMap('pangeo-data/kai-llc4320-vertical-fluxes/'
                                'llc_4320_agulhas_grid')
grid = xr.open_zarr(llc_grid, auto_chunk=False)

In [None]:
grid

In [67]:
# Get a controlled list of NetCDF files culled and copied over

# Access multiple time sets of data and put into list so we have more sane
# control over what time slices get chosen rather than regex.
ddir     = '/swot/SUM01/LLC/llc_4320_agulhas/'
gridfile = op.join(ddir, 'llc_4320_agulhas_grid.nc')
grid = xr.open_dataset(gridfile)

llc_files = []
# ~1000 GB dataset
# llc_files = glob.glob('/swot/SUM01/LLC/llc_4320_agulhas/llc_4320_agulhas.00001[0-9]*.nc')
# llc_files = [os.path.basename(x) for x in glob.glob('/swot/SUM01/LLC/llc_4320_agulhas/llc_4320_agulhas.00001[0-9]*.nc')]
# ~100 GB dataset
#llc_files = [os.path.basename(x) for x in glob.glob('/swot/SUM01/LLC/llc_4320_agulhas/llc_4320_agulhas.000010[0-9]*.nc')]
# ~10 GB dataset
llc_files = [os.path.basename(x) for x in glob.glob('/swot/SUM01/LLC/llc_4320_agulhas/llc_4320_agulhas.0000100*.nc')]


for file in llc_files:
    ds = xr.open_dataset(ddir + file, decode_cf=False, autoclose=True)
    ds = xr.merge([grid.drop('face'), ds.drop('face')],)
    ds = xmitgcm.mds_store._swap_dimensions(ds, geometry='sphericalpolar')
    ds = ds.Theta.to_dataset(name='Theta')
    ds.to_netcdf(path='/home/kyp/fusemnt/storage-benchmarks/llc4320_netcdf_10/' + 'Theta_' + file)

In [None]:
ds = xr.open_mfdataset('/home/kyp/fusemnt/storage-benchmarks/llc4320_netcdf/Theta*')
ds

## Convert NetCDF Datasets to Zarr and Save via GCSFS

In [None]:
fs1 = gcsfs.GCSFileSystem(project='pangeo-181919', token=None)
token = fs1.session.credentials
fs2 = gcsfs.GCSFileSystem(project='pangeo-181919', token=token)
gcsmap_zarr = gcsfs.GCSMap('pangeo-data/storage-benchmarks/llc4320_zarr_10', gcs=fs2)

In [None]:
%time ds_Theta.to_zarr(gcsmap_zarr)

### Verification
The copy process can be a little finicky for large datasets, so some basic checks to make sure the dataset is intact

In [None]:
zarr_read = xr.open_zarr(gcsmap_zarr)

In [None]:
gcs = gcsfs.GCSMap('pangeo-data/storage-benchmarks/llc4320_zarr_10')
za = zarr.open_group(gcsmap_zarr)
za['Theta'].info

In [None]:
zarr_read.Theta.nbytes / 1024**3

# Local Disk Copies
For local tests, just copy off datasets to local drive.

### Zarr Copy

In [14]:
dataset_zarr = "/swot/SUM01/pangeo/storage-benchmarks/llc4320_zarr_1000"

In [None]:
%time ds_Theta.to_zarr(dataset_zarr)

### Verification
The copy process can be a little finicky for large datasets, so some basic checks to make sure the dataset is intact

In [17]:
zarr_local_read = xr.open_zarr(dataset_zarr)
za = zarr.open_group(dataset_zarr)
za['Theta'].info

0,1
Name,/Theta
Type,zarr.core.Array
Data type,float32
Shape,"(694, 90, 2160, 2160)"
Chunk shape,"(1, 1, 2160, 2160)"
Order,C
Read-only,False
Compressor,"Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)"
Store type,zarr.storage.DirectoryStore
No. bytes,1165653504000 (1.1T)


In [18]:
zarr_local_read.nbytes / 2**30

1085.5994440540671