# `fsspec-reference-maker` tutorial

Created June 8th, 2020 by [Lucas Sterzinger](mailto:lsterzinger@ucdavis.edu) ([Twitter](https://twitter.com/lucassterzinger)) as part of the NCAR [Summer Internship in Parallel Computational Science (SIParCS)](https://www2.cisl.ucar.edu/siparcs)

If any part of this tutorial is now out of date, please feel free to open a pull request with a fix

In [2]:
from fsspec_reference_maker.hdf import SingleHdf5ToZarr 
from fsspec_reference_maker.combine import MultiZarrToZarr

## Create metadata JSONs

### This function returns a list of S3 files for a given satellite, year, and day

In [1]:
import xarray as xr
import numpy as np
import matplotlib.pyplot as plt
import s3fs
import datetime as dt
import zipfile
import logging
import fsspec
import ujson
from tqdm import tqdm
from glob import glob
import os

In [3]:
import dask
from dask.distributed import Client

client = Client(n_workers=8)
client

distributed.diskutils - INFO - Found stale lock file and directory '/Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-j3y5bfd1', purging
distributed.diskutils - INFO - Found stale lock file and directory '/Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-mtx2f4rb', purging
distributed.diskutils - INFO - Found stale lock file and directory '/Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-57p8sc_n', purging
distributed.diskutils - INFO - Found stale lock file and directory '/Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-jidkze5q', purging
distributed.diskutils - INFO - Found stale lock file and directory '/Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-7l3tfgve', purging
distributed.diskutils - INFO - Found stale lock file and directory '/Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-nkcg9l_n', 

0,1
Connection method: Cluster object,Cluster type: LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Status: running,Using processes: True
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads:  8,Total memory:  16.00 GiB

0,1
Comm: tcp://127.0.0.1:51609,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads:  8
Started:  Just now,Total memory:  16.00 GiB

0,1
Comm: tcp://127.0.0.1:51631,Total threads: 1
Dashboard: http://127.0.0.1:51632/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:51615,
Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-japd4n4m,Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-japd4n4m

0,1
Comm: tcp://127.0.0.1:51625,Total threads: 1
Dashboard: http://127.0.0.1:51626/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:51612,
Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-bchdh2nw,Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-bchdh2nw

0,1
Comm: tcp://127.0.0.1:51628,Total threads: 1
Dashboard: http://127.0.0.1:51629/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:51616,
Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-reahpsbp,Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-reahpsbp

0,1
Comm: tcp://127.0.0.1:51637,Total threads: 1
Dashboard: http://127.0.0.1:51638/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:51614,
Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-zczcx8ua,Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-zczcx8ua

0,1
Comm: tcp://127.0.0.1:51622,Total threads: 1
Dashboard: http://127.0.0.1:51623/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:51613,
Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-h0pron_2,Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-h0pron_2

0,1
Comm: tcp://127.0.0.1:51619,Total threads: 1
Dashboard: http://127.0.0.1:51620/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:51611,
Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-1xfmnr5x,Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-1xfmnr5x

0,1
Comm: tcp://127.0.0.1:51640,Total threads: 1
Dashboard: http://127.0.0.1:51641/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:51618,
Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-4okgn7r9,Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-4okgn7r9

0,1
Comm: tcp://127.0.0.1:51634,Total threads: 1
Dashboard: http://127.0.0.1:51635/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:51617,
Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-hw6id_fy,Local directory: /Users/lucass/Documents/fsspec-reference-maker-tutorial/dask-worker-space/worker-hw6id_fy


In [4]:
def get_file_list(sat,lyr,idyjl):
    # arguments
    # sat   goes-east,goes-west,himawari
    # lyr   year
    # idyjl day of year
    
    d = dt.datetime(lyr,1,1) + dt.timedelta(days=idyjl)
    fs = s3fs.S3FileSystem(anon=True) #connect to s3 bucket!

    #create strings for the year and julian day
    imon,idym=d.month,d.day
    syr,sjdy,smon,sdym = str(lyr).zfill(4),str(idyjl).zfill(3),str(imon).zfill(2),str(idym).zfill(2)
    
    #use glob to list all the files in the directory
    if sat=='goes-east':
        file_location,var = fs.glob('s3://noaa-goes16/ABI-L2-SSTF/'+syr+'/'+sjdy+'/*/*.nc'),'SST'
    if sat=='goes-west':
        file_location,var = fs.glob('s3://noaa-goes17/ABI-L2-SSTF/'+syr+'/'+sjdy+'/*/*.nc'),'SST'
    
    return file_location

In [5]:
flist = get_file_list("goes-east", 2020, 210)
urls = ["s3://" + f for f in flist]

### This function creates JSON metadata files for each of the S3 files in the local `jsons/` directory

These files point to the S3 location of the netCDF files, and only need to be created once. Tihs process took me about 10 minutes to generate the JSONs for 24 files. This function could easily be made to run in parallel for faster performance

In [6]:
def gen_json(u):
    so = dict(
        mode="rb", anon=True, default_fill_cache=False, default_cache_type="none"
    )
    with fsspec.open(u, **so) as inf:
        h5chunks = SingleHdf5ToZarr(inf, u, inline_threshold=300)
        with open(f"jsons/{u.split('/')[-1]}.json", 'wb') as outf:
            outf.write(ujson.dumps(h5chunks.translate()).encode())


In [7]:
dask.compute(*[dask.delayed(gen_json)(u) for u in urls]);

# If not using dask, use
# for u in tqdm(urls):
#     gen_json(u)

(None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None)

***
## Read remote netCDF files with xarray and fsspec

### First, create a list of JSON files

In [8]:
json_list = sorted(glob("./jsons/*.json"))

### Then, loop over the files and use `fsspec.get_mapper()` to create mappers for each file object, creating a list of mappers

In [10]:
m_list = []
for j in tqdm(json_list):
    with open(j) as f:
        m_list.append(fsspec.get_mapper("reference://", 
                        fo=ujson.load(f),
                        remote_protocol='s3',
                        remote_options={'anon':True}))

100%|██████████| 24/24 [00:00<00:00, 43.42it/s]


### Now, the mapper list can be passed directly to xarray.open_mfdataset() as long as the engine is specified as "zarr"


In [11]:
%%time
ds = xr.open_mfdataset(m_list, engine='zarr', combine='nested', concat_dim='t', 
                        coords='minimal', data_vars='minimal', compat='override',
                        parallel=True)
ds

CPU times: user 3.06 s, sys: 363 ms, total: 3.43 s
Wall time: 9.63 s


Unnamed: 0,Array,Chunk
Bytes,16 B,16 B
Shape,"(4,)","(4,)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 16 B 16 B Shape (4,) (4,) Count 2 Tasks 1 Chunks Type float32 numpy.ndarray",4  1,

Unnamed: 0,Array,Chunk
Bytes,16 B,16 B
Shape,"(4,)","(4,)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,16 B,16 B
Shape,"(4,)","(4,)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 16 B 16 B Shape (4,) (4,) Count 2 Tasks 1 Chunks Type float32 numpy.ndarray",4  1,

Unnamed: 0,Array,Chunk
Bytes,16 B,16 B
Shape,"(4,)","(4,)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,"(1,)","(1,)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4 B 4 B Shape (1,) (1,) Count 2 Tasks 1 Chunks Type float32 numpy.ndarray",1  1,

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,"(1,)","(1,)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,"(1,)","(1,)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4 B 4 B Shape (1,) (1,) Count 2 Tasks 1 Chunks Type float32 numpy.ndarray",1  1,

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,"(1,)","(1,)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.63 GiB,199.52 kiB
Shape,"(24, 5424, 5424)","(1, 226, 226)"
Count,41496 Tasks,13824 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 2.63 GiB 199.52 kiB Shape (24, 5424, 5424) (1, 226, 226) Count 41496 Tasks 13824 Chunks Type float32 numpy.ndarray",5424  5424  24,

Unnamed: 0,Array,Chunk
Bytes,2.63 GiB,199.52 kiB
Shape,"(24, 5424, 5424)","(1, 226, 226)"
Count,41496 Tasks,13824 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.63 GiB,199.52 kiB
Shape,"(24, 5424, 5424)","(1, 226, 226)"
Count,41496 Tasks,13824 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 2.63 GiB 199.52 kiB Shape (24, 5424, 5424) (1, 226, 226) Count 41496 Tasks 13824 Chunks Type float32 numpy.ndarray",5424  5424  24,

Unnamed: 0,Array,Chunk
Bytes,2.63 GiB,199.52 kiB
Shape,"(24, 5424, 5424)","(1, 226, 226)"
Count,41496 Tasks,13824 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 192 B 8 B Shape (24, 2) (1, 2) Count 96 Tasks 24 Chunks Type float32 numpy.ndarray",2  24,

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 192 B 8 B Shape (24, 2) (1, 2) Count 96 Tasks 24 Chunks Type float32 numpy.ndarray",2  24,

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 192 B 8 B Shape (24, 2) (1, 2) Count 96 Tasks 24 Chunks Type float32 numpy.ndarray",2  24,

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 192 B 8 B Shape (24, 2) (1, 2) Count 96 Tasks 24 Chunks Type float32 numpy.ndarray",2  24,

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 192 B 8 B Shape (24, 2) (1, 2) Count 96 Tasks 24 Chunks Type float32 numpy.ndarray",2  24,

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,384 B,16 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,datetime64[ns],numpy.ndarray
"Array Chunk Bytes 384 B 16 B Shape (24, 2) (1, 2) Count 96 Tasks 24 Chunks Type datetime64[ns] numpy.ndarray",2  24,

Unnamed: 0,Array,Chunk
Bytes,384 B,16 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,datetime64[ns],numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 192 B 8 B Shape (24, 2) (1, 2) Count 96 Tasks 24 Chunks Type float32 numpy.ndarray",2  24,

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 192 B 8 B Shape (24, 2) (1, 2) Count 96 Tasks 24 Chunks Type float32 numpy.ndarray",2  24,

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 192 B 8 B Shape (24, 2) (1, 2) Count 96 Tasks 24 Chunks Type float32 numpy.ndarray",2  24,

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24, 2)","(1, 2)"
Count,96 Tasks,24 Chunks
Type,float32,numpy.ndarray


## However, this doesn't scale well for larger datasets.
### Instead, we can combine multiple JSONS into a single JSON describing the whole dataset

In [None]:
mzz = MultiZarrToZarr(
    json_list,
    remote_protocol="s3",
    remote_options={'anon':True},
    xarray_kwargs={
        "decode_cf" : False,
        "mask_and_scale" : False,
        "decode_times" : False,
        "decode_timedelta" : False,
        "use_cftime" : False,
        "decode_coords" : False
    },
    xarray_concat_args={
        'data_vars' : 'minimal',
        'coords' : 'minimal',
        'compat' : 'override',
        'join' : 'override', 
        'combine_attrs' : 'override',
        'dim' : 't'
    }
)

In [None]:
%%time
mzz.translate("combined.json")

%%time
fs = fsspec.filesystem("reference", fo="./combined.json", remote_protocol="s3", 
                        remote_options={"anon":True}, skip_instance_cache=True)
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine='zarr')

In [None]:
ds

In [None]:
import hvplot.xarray

In [None]:
ds.SST.where(ds.DQF==0).isel(t=0).hvplot.image(x='x', y='y', rasterize=True, aspect='equal', cmap='turbo')

### Take a subset of the data (in this case, the Gulf Stream)

### Select a single time with `.isel(t=14)`

In [None]:
%%time
subset = ds.sel(x=slice(-0.01,0.07215601),y=slice(0.12,0.09))  #reduce to GS region

masked = subset.SST.where(subset.DQF==0)

masked.isel(t=14).plot(vmin=14+273.15,vmax=30+273.15,cmap='inferno')

### Plot a mean along the time axis (1-day average)

In [None]:
%%time
subset = ds.sel(x=slice(-0.01,0.07215601),y=slice(0.12,0.09))  #reduce to GS region

masked = subset.SST.where(subset.DQF==0)

masked.mean("t", skipna=True).plot(vmin=14+273.15,vmax=30+273.15,cmap='inferno')