## Creating LiveOcean reference files using Kerchunk

See https://github.com/fsspec/kerchunk for latest issues/changes and https://fsspec.github.io/kerchunk/ for documentation

In [1]:
import fsspec
import ujson
from kerchunk.hdf import SingleHdf5ToZarr 
from kerchunk.combine import MultiZarrToZarr

Open existing reference file and check if new dates have been added to azure data store

In [2]:
#reference_file = 's3://esip-qhub-public/LiveOcean/LiveOcean_reference.json'  #Location where final reference will be stored
#json_dir = 's3://esip-qhub-public/LiveOcean/individual/' #folder where individual reference files will be stored
reference_file = '/Users/pm8/Documents/LPM/pangeo/LiveOcean_reference.json'
json_dir = '/Users/pm8/Documents/LPM/pangeo/individual/'
# the esip-qhub-public bucket can be read from without credentials but requires credentials to write to

In [3]:
#fs = fsspec.filesystem(protocol='s3', anon = False) #Filesystem where references are saved to 
fs = fsspec.filesystem('file') #Filesystem where references are saved to 
fs_data = fsspec.filesystem('abfs', account_name='pm2') #Filesystem to open Netcdf files

In [4]:
nc_files = fs_data.glob('cas6-v0-u0kb/*/*.nc') #get all available netcdf files
nc_files = [f for f in nc_files if not f.split('-')[-1].split('.')[0] == '0001'] #exclude 0001 files
nc_files = sorted(['abfs://'+f for f in nc_files])  #prepend azure protocol

In [6]:
for nc_fn in nc_files[:5]:
    print(nc_fn)
print('')    
for nc_fn in nc_files[-5:]:
    print(nc_fn)

abfs://cas6-v0-u0kb/f2022-07-18/ocean-his-0002.nc
abfs://cas6-v0-u0kb/f2022-07-18/ocean-his-0003.nc
abfs://cas6-v0-u0kb/f2022-07-18/ocean-his-0004.nc
abfs://cas6-v0-u0kb/f2022-07-18/ocean-his-0005.nc
abfs://cas6-v0-u0kb/f2022-07-18/ocean-his-0006.nc

abfs://cas6-v0-u0kb/f2022-08-03/ocean-his-0021.nc
abfs://cas6-v0-u0kb/f2022-08-03/ocean-his-0022.nc
abfs://cas6-v0-u0kb/f2022-08-03/ocean-his-0023.nc
abfs://cas6-v0-u0kb/f2022-08-03/ocean-his-0024.nc
abfs://cas6-v0-u0kb/f2022-08-03/ocean-his-0025.nc


In [7]:
# the below is my attempt at updating the references daily by running this script as a cronjob. Creating the references locally 
# before the netcdf files are uploaded to azure would probably be a neater workflow

from datetime import datetime, timezone, timedelta
last_update = datetime.now(timezone.utc) - timedelta(days = 1)

updated_files = []
for f in reversed(nc_files):
    if fs_data.info(f)['last_modified'] > last_update:
        updated_files.append(f)
    else:
        break

updated_files.reverse()

In [8]:
for up_fn in updated_files[:5]:
    print(up_fn)
print('')
for up_fn in updated_files[-5:]:
    print(up_fn)

abfs://cas6-v0-u0kb/f2022-08-01/ocean-his-0002.nc
abfs://cas6-v0-u0kb/f2022-08-01/ocean-his-0003.nc
abfs://cas6-v0-u0kb/f2022-08-01/ocean-his-0004.nc
abfs://cas6-v0-u0kb/f2022-08-01/ocean-his-0005.nc
abfs://cas6-v0-u0kb/f2022-08-01/ocean-his-0006.nc

abfs://cas6-v0-u0kb/f2022-08-03/ocean-his-0021.nc
abfs://cas6-v0-u0kb/f2022-08-03/ocean-his-0022.nc
abfs://cas6-v0-u0kb/f2022-08-03/ocean-his-0023.nc
abfs://cas6-v0-u0kb/f2022-08-03/ocean-his-0024.nc
abfs://cas6-v0-u0kb/f2022-08-03/ocean-his-0025.nc


Generate individual reference files for this new data

In [9]:
so = dict(anon=False, skip_instance_cache=True)  #arguments to fs_data.open

#create unique name for each individual reference json
def file_name(file): 
        p = file.split('/')
        fname = p[-1]
        dname = p[-2]
        return f'{json_dir}{dname}_{fname}.json'
    

#generate the individual reference jsons. 
def gen_json(nc_file_url):
    with fs_data.open(nc_file_url, **so) as infile:
        # h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
        h5chunks = SingleHdf5ToZarr(infile, nc_file_url, inline_threshold=300)
        # inline threshold adjusts the Size below which binary blocks are included directly in the output
        # a higher inline threshold can result in a larger json file but faster loading time
        outf = file_name(nc_file_url)
        with fs.open(outf, 'wb') as f:
            f.write(ujson.dumps(h5chunks.translate()).encode()); #write the individual references as jsons 

In [None]:
# help(SingleHdf5ToZarr)

In [9]:
# this spins up a cluster on qhub, if running locally rather use the commented out cell below this one
# import os
# import sys
# sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
# import ebdpy as ebd

# ebd.set_credentials(profile='esip-qhub')

# profile = 'esip-qhub'
# region = 'us-west-2'
# endpoint = f's3.{region}.amazonaws.com'
# ebd.set_credentials(profile=profile, region=region, endpoint=endpoint)
# worker_max = 50
# client,cluster = ebd.start_dask_cluster(profile=profile,worker_max=worker_max, 
#                                       region=region, use_existing_cluster=True,
#                                       adaptive_scaling=True, wait_for_cluster=False, 
#                                       worker_profile='Medium Worker', 
#                                       propagate_env=True)

In [10]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()  # Launches a scheduler and workers locally
client = Client(cluster)  # Connect to distributed cluster and override default

In [11]:
# from dask.distributed import Client

# client = Client("tcp://127.0.0.1:59236")
# client

In [14]:
# here we use dask bag to queue up the parallel computation. Similarly dask delayed could be use.
import dask.bag

b = dask.bag.from_sequence(updated_files[-3:])
b1 = b.map(gen_json)

In [None]:
# run the gen_json computations
b1.compute()

In [None]:
#close the cluster
cluster.close()
client.close()

Now we take these individual reference files and consolidate them into a single reference file. At present MultiZarrToZarr will only combine from the individual json reference files and can not append to the existing combined reference json.  

In [None]:
json_list = fs.glob(f'{json_dir}*.json') #open all the available individual reference jsons. 
# json_list = sorted(['s3://'+f for f in json_list]) #prepend the AWS S3 protocol

In [None]:
topts = dict(anon=True, skip_instance_cache=True) # target options for opening the reference jsons
ropts = dict(account_name='pm2', skip_instance_cache=True) # remote options for opening the netcdf files 

In [None]:
# create the combined json
mzz = MultiZarrToZarr(json_list, 
    remote_protocol = 'abfs',
    remote_options = ropts,
    target_options = topts,
    concat_dims = ['ocean_time'],
    identical_dims=['lat_psi','lat_rho','lat_u','lat_v',
                    'lon_psi','lon_rho','lon_u','lon_v']
                     )

In [None]:
#compute the combined json and write to storage. At present this can not be run in parallel. 
with fs.open(reference_file, 'wb') as f:
    f.write(ujson.dumps(mzz.translate()).encode())