In [17]:
import sys
import os
import json
sys.path.append('/home/jovyan/intake-aodn')

In [7]:
import fsspec
from fsspec.core import url_to_fs
fs = fsspec.filesystem('s3',use_listings_cache=False,anon=True,)

urls = fs.glob('s3::imos-data/IMOS/ANMN/QLD/**/*.nc')


In [8]:
len(urls)

7613

In [9]:
timeseries = list(filter(lambda x: 'aggregated_timeseries' in x, urls))

In [11]:
len(timeseries)

155

In [12]:
def process_single(url):
    import fsspec
    from kerchunk.hdf import SingleHdf5ToZarr
    
    s3_fn = 's3://' + url
    with fsspec.open(s3_fn, 
                     anon=True, 
                     mode='rb', 
                     default_fill_cache=False, 
                     default_cache_type='none') as f:
        zarr_dict = SingleHdf5ToZarr(f, s3_fn, spec=1, inline_threshold=100).translate()
        
    return zarr_dict 

In [13]:
from intake_aodn.utils import get_distributed_cluster
client = get_distributed_cluster()
client

An existing cluster was found. Connected to cluster [1measihub.b139066194c14b4c8777e506f588ff14[0m


VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

0,1
Connection method: Cluster object,Cluster type: dask_gateway.GatewayCluster
Dashboard: https://hub.csiro.easi-eo.solutions/services/dask-gateway/clusters/easihub.b139066194c14b4c8777e506f588ff14/status,


In [14]:
from dask.distributed import PipInstall
plugin = PipInstall(packages=["kerchunk"], pip_options=["--upgrade"])
client.register_worker_plugin(plugin)

# If using a distributed cluster on EASI build eggs using "python setup.py bdist_egg" and upload to workers
# otherwise dask workers wont have code for imports
client.upload_file('/home/jovyan/intake-aodn/dist/intake_aodn-0+untagged.76.g4219cb6.dirty-py3.8.egg')

{'tls://10.0.34.197:46359': {'status': 'OK'}}

In [16]:
print('... using dask ...')
from dask import delayed, compute
d_process_single = delayed(process_single)
futures = [d_process_single(u) for u in timeseries]
ref_dicts = compute(futures)[0]

... using dask ...


In [18]:
for u,ref in zip(urls,ref_dicts):
    output = json.dumps(ref).encode()
    out_file = "./reffs/"+os.path.basename(u).replace('.nc','.json')
    print(out_file)
    with open(out_file,"wb") as outf:
        outf.write(output)

./reffs/IMOS_ANMN-QLD_CSTZ_20140828T090001Z_CAM050_FV01_CAM050-1408-SBE37SM-RS232-21.6_END-20150221T034501Z_C-20170214T052032Z.json
./reffs/IMOS_ANMN-QLD_CSTZ_20140828T090001Z_CAM050_FV01_CAM050-1408-SBE37SM-RS232-49.4_END-20150221T025501Z_C-20170214T052020Z.json
./reffs/IMOS_ANMN-QLD_CSTZ_20150217T013001Z_CAM050_FV01_CAM050-1502-SBE37SM-RS232-25.6_END-20150727T034501Z_C-20170217T004104Z.json
./reffs/IMOS_ANMN-QLD_CSTZ_20150219T120001Z_CAM050_FV01_CAM050-1502-SBE37SM-RS232-53.4_END-20150729T044001Z_C-20170217T004059Z.json
./reffs/IMOS_ANMN-QLD_CTZ_20140828T090001Z_CAM050_FV00_CAM050-1408-SBE37SM-RS232-21.6_END-20150221T034501Z_C-20170214T052032Z.json
./reffs/IMOS_ANMN-QLD_CTZ_20140828T090001Z_CAM050_FV00_CAM050-1408-SBE37SM-RS232-49.4_END-20150221T025501Z_C-20170214T052020Z.json
./reffs/IMOS_ANMN-QLD_CTZ_20150217T013001Z_CAM050_FV00_CAM050-1502-SBE37SM-RS232-25.6_END-20150727T034501Z_C-20170217T004104Z.json
./reffs/IMOS_ANMN-QLD_CTZ_20150219T120001Z_CAM050_FV00_CAM050-1502-SBE37SM-RS23

In [18]:
!zip -r -q nsw_refs.zip *.json

In [1]:
def open_single(fn,preprocess=None,storage_options=dict(anon=True)):
    import fsspec
    import xarray as xr
    
    mapper=fsspec.get_mapper('reference://',
                             fo=fn,
                             remote_protocol='s3',
                             remote_options=storage_options,
                            )
    ds = xr.open_zarr(mapper,chunks={}, consolidated=False, decode_times=False)   
    
    if preprocess is not None:
        ds = preprocess(ds)
    
    return ds

In [5]:
import json
with open('./IMOS_ANMN-NSW_TZ_20090812T02000', "r") as read_file:
    ref = json.load(read_file)


In [6]:
import xarray as xr
ds = xr.open_dataset(
    "reference://", engine="zarr",
    backend_kwargs={
        "storage_options": {
            "fo": ref,
            "remote_protocol": "s3",
            "remote_options": {"anon": True}
        },
        "consolidated": False
    }
)

In [19]:
!zip -r -q aodn_refs.zip reffs

2022-08-17 05:44:25,242 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
2022-08-17 05:44:25,244 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
