# Create JSONS with fsspec ReferenceMaker
## 24 hours of GOES data
Needed:
- fsspec-reference-maker
    - `pip install git+https://github.com/intake/fsspec-reference-maker`
- adlfs >= 0.7.7
    - `pip install --upgrade adlfs>=0.7.7`

In [1]:
from fsspec_reference_maker.hdf import SingleHdf5ToZarr 
from fsspec_reference_maker.combine import MultiZarrToZarr

In [2]:
import xarray as xr
import numpy as np
import matplotlib.pyplot as plt
import s3fs
import datetime as dt
import zipfile
import logging
import fsspec
import json
from tqdm import tqdm
from glob import glob
import os

from azure.storage.blob import ContainerClient
import tempfile

import dask

In [14]:
from dask.distributed import Client
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:42095  Dashboard: /proxy/8787/status,Cluster  Workers: 4  Cores: 4  Memory: 32.00 GiB


## Get urls

In [4]:
tempdir = os.path.join(tempfile.gettempdir(), 'goes')
os.makedirs(tempdir, exist_ok=True)

product = 'ABI-L2-MCMIPF'
syear = '2020'; sday='002'; shour = '14'

storage_account_url = 'https://goes.blob.core.windows.net'
container_name = 'noaa-goes16'
goes_blob_root = storage_account_url + '/' + container_name + '/'

goes_container_client = ContainerClient(account_url=storage_account_url, container_name=container_name, credential=None)

def download_url(url):
    url_as_filename = url.replace('://', '_').replace('/','_')
    destination_filename = os.path.join(tempdir, url_as_filename)
    urllib.request.urlretrieve(url, destination_filename)
    return destination_filename

prefix = product + '/' + syear + '/' + sday + '/' 
print('Finding blobs matching prefex: {}'.format(prefix))
generator = goes_container_client.list_blobs(name_starts_with=prefix)
blobs = []
for blob in generator:
    blobs.append(blob.name)

Finding blobs matching prefex: ABI-L2-MCMIPF/2020/002/


In [5]:
urllist = ['az://' + container_name + '/' + u  for u in blobs]

### Generate json function

In [6]:
def gen_json(u):
    so = dict(
        mode="rb", anon=True, default_fill_cache=False, default_cache_type="none"
    )
    with fsspec.open(u, **so, account_name='goeseuwest') as inf:
        h5chunks = SingleHdf5ToZarr(inf, u)
        with open(f"jsons/{u.split('/')[-1]}.json", 'wb') as outf:
            outf.write(json.dumps(h5chunks.translate()).encode())


### Use dask to make jsons

In [7]:
%%time
dask.compute(*[dask.delayed(gen_json)(u) for u in urllist])

CPU times: user 1min 17s, sys: 11.7 s, total: 1min 28s
Wall time: 26min 39s


(None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,

In [8]:
json_list = sorted(glob("jsons/*.json"))

In [15]:
mzz = MultiZarrToZarr(
    json_list,
#     "zip://jsons/*.json::combined.zip",
    remote_protocol='abfs',
    remote_options={
       'account_name' : 'goeseuwest'
    },    
    xarray_kwargs={
        'decode_cf' : False,
        'mask_and_scale' : False,
        'decode_times' : False,
        'use_cftime' : False,
        'decode_coords' : False
    },
    with_mf='t'
)

In [16]:
%%time
mzz.translate('combined.json')

CPU times: user 45min 35s, sys: 1min 21s, total: 46min 56s
Wall time: 55min 6s


In [11]:
client.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError


***
## Processing times:
|Action | Time | Note |
|-------:|:------| :---|
|Make individual jsons | 26min 39s | 4 workers, faster times can be achieved with more dask workers |
|Make combined json | 55min 6s | don't think this can be sped up w/ dask
| __Total__ | __1h 21min__ | 