# Generate references for the dataset

These functions aggregate references based on glob strings configurable at various level of the url path (e.g. daily or monthly). This can be farmed out to dask workers and then finally combined using the kerchunk MultiZarrToZarr function.

An example for Australian datasets is shown [here](https://github.com/IOMRC/intake-aodn/blob/main/notebooks/kerchunk_extraction/create_and_aggregate.ipynb)

In [2]:
from dask import delayed, compute
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4,threads_per_worker=6)
client = Client(cluster)
client

2022-08-17 06:10:13,224 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-idb4fscn', purging
2022-08-17 06:10:13,225 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-4enhwsh8', purging


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: /user/pbranson/proxy/8787/status,

0,1
Dashboard: /user/pbranson/proxy/8787/status,Workers: 4
Total threads: 24,Total memory: 8.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:43585,Workers: 4
Dashboard: /user/pbranson/proxy/8787/status,Total threads: 24
Started: Just now,Total memory: 8.00 GiB

0,1
Comm: tcp://127.0.0.1:39431,Total threads: 6
Dashboard: /user/pbranson/proxy/46851/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:42535,
Local directory: /tmp/dask-worker-space/worker-apykgz6y,Local directory: /tmp/dask-worker-space/worker-apykgz6y

0,1
Comm: tcp://127.0.0.1:42157,Total threads: 6
Dashboard: /user/pbranson/proxy/33163/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:37715,
Local directory: /tmp/dask-worker-space/worker-srfpxhay,Local directory: /tmp/dask-worker-space/worker-srfpxhay

0,1
Comm: tcp://127.0.0.1:34917,Total threads: 6
Dashboard: /user/pbranson/proxy/39863/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:44319,
Local directory: /tmp/dask-worker-space/worker-8y7e7g38,Local directory: /tmp/dask-worker-space/worker-8y7e7g38

0,1
Comm: tcp://127.0.0.1:34499,Total threads: 6
Dashboard: /user/pbranson/proxy/45097/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:33111,
Local directory: /tmp/dask-worker-space/worker-5f6qv3am,Local directory: /tmp/dask-worker-space/worker-5f6qv3am


# Himawari-SST

You need to break the url into sections, similarly to how we did in Step1.

Example URLs on S3:

```
['noaa-himawari8/AHI-L2-FLDK-SST/2022/01/13/0000/20220113000000-NCCF-L3C_GHRSST-SSTsubskin-AHI_H08-ACSPO_V2.80-v02.0-fv01.0.nc',
 'noaa-himawari8/AHI-L2-FLDK-SST/2022/01/13/0100/20220113010000-NCCF-L3C_GHRSST-SSTsubskin-AHI_H08-ACSPO_V2.80-v02.0-fv01.0.nc',
 'noaa-himawari8/AHI-L2-FLDK-SST/2022/01/13/0200/20220113020000-NCCF-L3C_GHRSST-SSTsubskin-AHI_H08-ACSPO_V2.80-v02.0-fv01.0.nc',
 'noaa-himawari8/AHI-L2-FLDK-SST/2022/01/13/0300/20220113030000-NCCF-L3C_GHRSST-SSTsubskin-AHI_H08-ACSPO_V2.80-v02.0-fv01.0.nc',
 'noaa-himawari8/AHI-L2-FLDK-SST/2022/01/13/0400/20220113040000-NCCF-L3C_GHRSST-SSTsubskin-AHI_H08-ACSPO_V2.80-v02.0-fv01.0.nc',
 'noaa-himawari8/AHI-L2-FLDK-SST/2022/01/13/0500/20220113050000-NCCF-L3C_GHRSST-SSTsubskin-AHI_H08-ACSPO_V2.80-v02.0-fv01.0.nc',
 'noaa-himawari8/AHI-L2-FLDK-SST/2022/01/13/0600/20220113060000-NCCF-L3C_GHRSST-SSTsubskin-AHI_H08-ACSPO_V2.80-v02.0-fv01.0.nc']
 ```

In [24]:
# Helper functions to process a glob string into an aggregate of references

def process_single(url):
    import fsspec
    from kerchunk.hdf import SingleHdf5ToZarr
    
    s3_fn = 's3://' + url
    with fsspec.open(s3_fn, 
                     anon=True, 
                     mode='rb', 
                     default_fill_cache=False, 
                     default_cache_type='none') as f:
        zarr_dict = SingleHdf5ToZarr(f, s3_fn, spec=1, inline_threshold=100).translate()
        
    return zarr_dict  

def process_aggregate(root='noaa-himawari8/AHI-L2-FLDK-SST/',
                       mask_dict=dict(year='2022',month='01',day='01'),
                       mask='{year}/{month}/{day}/*/{year}{month}{day}',
                       suffix='-NCCF-L3C_GHRSST-SSTsubskin-AHI_H08-ACSPO_V2.80-v02.0-fv01.0',
                       extension='nc',
                       storage_options=dict(anon=True),
                       dest='/home/jovyan/kerchunk_indexes/',
                       dask=True,
                       preprocess=None):
    import fsspec
    import json
    import os
    from kerchunk.combine import MultiZarrToZarr
    
    fs = fsspec.filesystem('s3',use_listings_cache=False,**storage_options)
    
    mask = mask.format(**mask_dict)
    agg_file = f"{root}{''.join(list(mask_dict.values()))}{suffix}.json"
    out_file = os.path.join(dest,agg_file)
    
    print(f'{dest}{root}')
    if not os.path.exists(f'{dest}{root}'):
        print(f'Creating {dest}{root}')
        os.makedirs(f'{dest}{root}')
    
    if not os.path.exists(out_file):
        globstr = f"s3://{root}{mask}*{suffix}.{extension}"
        urls = fs.glob(globstr)

        print(f'Aggregating {globstr} - {len(urls)} found.')

        if len(urls) >= 1:
            so = dict(protocol='s3',
                        profile='default', 
                        default_fill_cache=False, 
                        default_cache_type='first')

            print('Loading references...')
            if dask:
                print('... using dask ...')
                from dask import delayed, compute
                d_process_single = delayed(process_single)
                futures = [d_process_single(u) for u in urls]
                ref_dicts = compute(futures)[0]
            else:
                from tqdm import tqdm 
                ref_dicts = []
                for u in tqdm(urls):
                    ref_dicts.append(process_single(u))

                #setup output location
                
            print(f'Aggregating into {out_file}')

            output = None
            if len(ref_dicts) == 1: # Only one refence in this set... just use the source reference file
                output = json.dumps(ref_dicts[0]).encode()
            else: # otherwise join the references into one file
                mzz = MultiZarrToZarr(ref_dicts,
                                    remote_protocol="s3",
                                    remote_options=storage_options,
                                    concat_dims=["time"], coo_map={"time": "data:time"},
                                    preprocess=preprocess)

                try:
                    dict_agg = mzz.translate()
                    output = json.dumps(dict_agg).encode()
                except Exception as ex:
                    agg_file = f'ERROR(UNKOWN): {agg_file} {str(ex)}'
                    raise ex

            if not output is None:
                with open(out_file,"wb") as outf:
                    print('writing')
                    outf.write(output)

            return {mask: agg_file}
        else:
            return {mask: 'ERROR(NOFILES)'}
    return {mask:'existing'}

In [27]:
process_aggregate()

/home/jovyan/kerchunk_indexes/noaa-himawari8/AHI-L2-FLDK-SST/
Aggregating s3://noaa-himawari8/AHI-L2-FLDK-SST/2022/01/01/*/20220101*-NCCF-L3C_GHRSST-SSTsubskin-AHI_H08-ACSPO_V2.80-v02.0-fv01.0.nc - 24 found.
Loading references...
... using dask ...
Aggregating into /home/jovyan/kerchunk_indexes/noaa-himawari8/AHI-L2-FLDK-SST/20220101-NCCF-L3C_GHRSST-SSTsubskin-AHI_H08-ACSPO_V2.80-v02.0-fv01.0.json
writing


{'2022/01/01/*/20220101': 'noaa-himawari8/AHI-L2-FLDK-SST/20220101-NCCF-L3C_GHRSST-SSTsubskin-AHI_H08-ACSPO_V2.80-v02.0-fv01.0.json'}

In [28]:
def open_single(fn,preprocess=None,storage_options=dict(anon=True)):
    import fsspec
    import xarray as xr
    
    mapper=fsspec.get_mapper('reference://',
                             fo=fn,
                             remote_protocol='s3',
                             remote_options=storage_options,
                            )
    ds = xr.open_zarr(mapper,chunks={}, consolidated=False, decode_times=False)   
    
    if preprocess is not None:
        ds = preprocess(ds)
    
    return ds

ds = open_single('/home/jovyan/kerchunk_indexes/noaa-himawari8/AHI-L2-FLDK-SST/20220101-NCCF-L3C_GHRSST-SSTsubskin-AHI_H08-ACSPO_V2.80-v02.0-fv01.0.json')
ds

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 24.72 MiB Shape (24, 9000, 18000) (1, 1800, 3600) Count 601 Tasks 600 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 24.72 MiB Shape (24, 9000, 18000) (1, 1800, 3600) Count 601 Tasks 600 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 24.72 MiB Shape (24, 9000, 18000) (1, 1800, 3600) Count 601 Tasks 600 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 24.72 MiB Shape (24, 9000, 18000) (1, 1800, 3600) Count 601 Tasks 600 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24,)","(1,)"
Count,25 Tasks,24 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 192 B 8 B Shape (24,) (1,) Count 25 Tasks 24 Chunks Type float64 numpy.ndarray",24  1,

Unnamed: 0,Array,Chunk
Bytes,192 B,8 B
Shape,"(24,)","(1,)"
Count,25 Tasks,24 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,38.62 MiB
Shape,"(24, 9000, 18000)","(1, 2250, 4500)"
Count,385 Tasks,384 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 38.62 MiB Shape (24, 9000, 18000) (1, 2250, 4500) Count 385 Tasks 384 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,38.62 MiB
Shape,"(24, 9000, 18000)","(1, 2250, 4500)"
Count,385 Tasks,384 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 24.72 MiB Shape (24, 9000, 18000) (1, 1800, 3600) Count 601 Tasks 600 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 24.72 MiB Shape (24, 9000, 18000) (1, 1800, 3600) Count 601 Tasks 600 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,38.62 MiB
Shape,"(24, 9000, 18000)","(1, 2250, 4500)"
Count,385 Tasks,384 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 38.62 MiB Shape (24, 9000, 18000) (1, 2250, 4500) Count 385 Tasks 384 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,38.62 MiB
Shape,"(24, 9000, 18000)","(1, 2250, 4500)"
Count,385 Tasks,384 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,38.62 MiB
Shape,"(24, 9000, 18000)","(1, 2250, 4500)"
Count,385 Tasks,384 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 38.62 MiB Shape (24, 9000, 18000) (1, 2250, 4500) Count 385 Tasks 384 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,38.62 MiB
Shape,"(24, 9000, 18000)","(1, 2250, 4500)"
Count,385 Tasks,384 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 24.72 MiB Shape (24, 9000, 18000) (1, 1800, 3600) Count 601 Tasks 600 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,38.62 MiB
Shape,"(24, 9000, 18000)","(1, 2250, 4500)"
Count,385 Tasks,384 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 38.62 MiB Shape (24, 9000, 18000) (1, 2250, 4500) Count 385 Tasks 384 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,38.62 MiB
Shape,"(24, 9000, 18000)","(1, 2250, 4500)"
Count,385 Tasks,384 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,38.62 MiB
Shape,"(24, 9000, 18000)","(1, 2250, 4500)"
Count,385 Tasks,384 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 38.62 MiB Shape (24, 9000, 18000) (1, 2250, 4500) Count 385 Tasks 384 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,38.62 MiB
Shape,"(24, 9000, 18000)","(1, 2250, 4500)"
Count,385 Tasks,384 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,28.97 GiB,34.33 MiB
Shape,"(24, 9000, 18000)","(1, 1500, 3000)"
Count,865 Tasks,864 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 28.97 GiB 34.33 MiB Shape (24, 9000, 18000) (1, 1500, 3000) Count 865 Tasks 864 Chunks Type float64 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,28.97 GiB,34.33 MiB
Shape,"(24, 9000, 18000)","(1, 1500, 3000)"
Count,865 Tasks,864 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,38.62 MiB
Shape,"(24, 9000, 18000)","(1, 2250, 4500)"
Count,385 Tasks,384 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 38.62 MiB Shape (24, 9000, 18000) (1, 2250, 4500) Count 385 Tasks 384 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,38.62 MiB
Shape,"(24, 9000, 18000)","(1, 2250, 4500)"
Count,385 Tasks,384 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 24.72 MiB Shape (24, 9000, 18000) (1, 1800, 3600) Count 601 Tasks 600 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,24.72 MiB
Shape,"(24, 9000, 18000)","(1, 1800, 3600)"
Count,601 Tasks,600 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,38.62 MiB
Shape,"(24, 9000, 18000)","(1, 2250, 4500)"
Count,385 Tasks,384 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 14.48 GiB 38.62 MiB Shape (24, 9000, 18000) (1, 2250, 4500) Count 385 Tasks 384 Chunks Type float32 numpy.ndarray",18000  9000  24,

Unnamed: 0,Array,Chunk
Bytes,14.48 GiB,38.62 MiB
Shape,"(24, 9000, 18000)","(1, 2250, 4500)"
Count,385 Tasks,384 Chunks
Type,float32,numpy.ndarray


In [29]:
ds.nbytes / 1E9

264.384108384

In [30]:
# TODO: Map out across full dataset