In [8]:
! pip install git+https://github.com/rabernat/pystac.git@optional-timezone-check

Collecting git+https://github.com/rabernat/pystac.git@optional-timezone-check
  Cloning https://github.com/rabernat/pystac.git (to revision optional-timezone-check) to /tmp/pip-req-build-ax1z6w3r
Building wheels for collected packages: pystac
  Building wheel for pystac (setup.py) ... [?25ldone
[?25h  Created wheel for pystac: filename=pystac-0.5.3-py3-none-any.whl size=126100 sha256=74781b568b00a1d532e366d1e39d505254c5886220c9ef3c7e7655bbbdf9e5d3
  Stored in directory: /tmp/pip-ephem-wheel-cache-rh_dlngg/wheels/39/c0/c0/1c85781caa1bdc4c2e34e00257ed99702130229c0ce0ae60af
Successfully built pystac
Installing collected packages: pystac
  Attempting uninstall: pystac
    Found existing installation: pystac 0.5.2
    Uninstalling pystac-0.5.2:
      Successfully uninstalled pystac-0.5.2
Successfully installed pystac-0.5.3


In [1]:
from datetime import datetime, timezone
from tqdm.auto import trange, tqdm
import pystac
import cftime
import zarr

import concurrent.futures

In [2]:
from distributed import Client, get_client, secede, rejoin

In [3]:
try:
    from dask_gateway import Gateway
    g = Gateway()
    c = g.list_clusters()[0]
    cluster = g.connect(c.name)
    client = Client(cluster)
except IndexError:
    client = Client()

client

0,1
Client  Scheduler: gateway://traefik-gcp-uscentral1b-prod-dask-gateway.prod:80/prod.a31755a6ef9c46b09c62dd769a904e40  Dashboard: /services/dask-gateway/clusters/prod.a31755a6ef9c46b09c62dd769a904e40/status,Cluster  Workers: 3  Cores: 6  Memory: 25.77 GB


In [4]:
import subprocess
import sys

def install_on_client():
    package = 'git+https://github.com/rabernat/pystac.git@optional-timezone-check'
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

results = client.run(install_on_client)
results

{'tls://10.36.174.216:33317': None,
 'tls://10.36.50.2:36953': None,
 'tls://10.36.51.2:37321': None,
 'tls://10.39.162.128:43209': None,
 'tls://10.39.162.129:35283': None}

In [6]:
import gcsfs
fs = gcsfs.GCSFileSystem()

In [7]:
_GLOBAL_EXTENT = pystac.SpatialExtent([[-180, -90, 180, 90]])

In [8]:
_DEFAULT_TEMPORAL_EXTENT = pystac.TemporalExtent([[datetime.min, datetime.max]])

def get_temporal_extent_from_zarr_group(path):
    zgroup = zarr.open_consolidated(fs.get_mapper(path))
    try:
        time_arr = zgroup['time']
    except KeyError:
        return _DEFAULT_TEMPORAL_EXTENT
    
    date_min, date_max = [
        cftime.num2date(num, time_arr.attrs['units'], calendar=time_arr.attrs['calendar'])
        for num in (time_arr[0], time_arr[-1])
    ]   
    
    return pystac.TemporalExtent([[date_min, date_max]])

In [9]:
providers = [
    pystac.Provider("World Climate Research Programme",
        roles=["producer", "licensor"],
        url="https://www.wcrp-climate.org/wgcm-cmip/wgcm-cmip6"
    ),
    pystac.Provider("Pangeo",
        roles=["processor"],
        url="https://pangeo.io/"
    ),
    pystac.Provider("Google Cloud",
        roles=["host"],
        url="https://cloud.google.com/storage"
    )
]
providers

[<pystac.collection.Provider at 0x7ff13c73ca60>,
 <pystac.collection.Provider at 0x7ff13c73cac0>,
 <pystac.collection.Provider at 0x7ff13c73cbb0>]

In [10]:
extensions = ["collection-assets"]

license_link = pystac.Link(
    "license",
    "https://pcmdi.llnl.gov/CMIP6/TermsOfUse/TermsOfUse6-1.html",
    media_type="text/html",
    title="CMIP6: Terms of Use"
)
license_link

def make_asset_path(path,
                    prefix="https://storage.googleapis.com",
                    suffix="/.zmetadata"):
    return prefix + path + suffix


def collection_from_zarr_group(path):
    coll_id = path.split('/')[-1]
    description = "Auto-generated description"
    temporal_extent = get_temporal_extent_from_zarr_group(path)
    extent = pystac.Extent(_GLOBAL_EXTENT, temporal_extent)
    
    
    assets =  {
        "zmetadata": {
          "href": make_asset_path(path),
          "description": "Consolidated metadata file for Zarr store",
          "type": "application/json",
          "roles": [
            "metadata",
            "zarr-v2-consolidated-metadata"
          ]
        }
    }
    
    coll = pystac.Collection(
        coll_id,
        description,
        extent,
        stac_extensions=extensions,
        providers=providers,
        extra_fields={"assets": assets}
    )
    coll.add_link(license_link)
    return coll

def catalog(path):
    cat_id = path.split('/')[-1]
    description = "Auto-generated description"
    cat = pystac.Catalog(cat_id, description,
                         catalog_type=pystac.CatalogType.SELF_CONTAINED)
    return cat

In [11]:
def list_files_and_subdirs(path):
    subdirs = {}
    files = {}
    
    listing = fs.ls(path, detail=True)
    for info in listing:
        pathname = info["name"].rstrip("/")
        name = pathname.rsplit("/", 1)[-1]
        if info["type"] == "directory" and pathname != path:
            # do not include "self" path
            #full_dirs[pathname] = info
            subdirs[pathname] = info
        elif pathname == path:
            # file-like with same name as give path
            files[""] = info
        else:
            files[name] = info
    
    return list(files), list(subdirs), 

In [12]:
list_files_and_subdirs('cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon')

([],
 ['cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/clivi',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/clt',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/clwvi',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/co2mass',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/evspsbl',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/hfls',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/hur',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/hus',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/pr',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/prc',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/prsn',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/prw',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/ps',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/psl',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/rlut',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon/rlutcs',
  'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon

In [13]:
def walk_recursive(path, depth=0, max_depth=None):
    files, subdirs = list_files_and_subdirs(path)
    #if depth < 3:
    #    print(path)
    if '.zmetadata' in files:
        return collection_from_zarr_group(path)
    elif max_depth and depth >= max_depth:
        return none
    else:
        cat = catalog(path)
        items = [
            walk_recursive(
                sd,
                depth=(depth + 1),
                max_depth=max_depth
            )
            for sd in subdirs # tqdm(subdirs, desc=path, leave=(depth<1))
        ]
        cat.add_children([item for item in items if item])
        return cat

In [14]:
from dask import delayed, compute

In [15]:
@delayed
def walk_recursive_delayed(path, depth=0, max_depth=None):
    files, subdirs = list_files_and_subdirs(path)
    if '.zmetadata' in files:
        #print(path)
        return collection_from_zarr_group(path)
    elif max_depth and depth >= max_depth:
        return none
    else:
        cat = catalog(path)
        kwargs = dict(depth=(depth + 1), max_depth=max_depth)
        items = [walk_recursive_delayed(s, **kwargs) for s in subdirs]
        items = compute(*items)
        cat.add_children([item for item in items if item])
        return cat

In [16]:
@delayed
def fib_delayed(n):
    if n < 2:
        return n
    # We can use dask.delayed and dask.compute to launch
    # computation from within tasks
    a = fib(n - 1)  # these calls are delayed
    b = fib(n - 2)
    a, b = compute(a, b)  # execute both in parallel
    return a + b

def fib_client(n):
    if n < 2:
        return n
    client = get_client()
    a_future = client.submit(fib_client, n - 1)
    b_future = client.submit(fib_client, n - 2)
    a, b = client.gather([a_future, b_future])
    return a + b

def fib_client_sr(n):
    if n < 2:
        return n
    client = get_client()
    a_future = client.submit(fib_client_sr, n - 1)
    b_future = client.submit(fib_client_sr, n - 2)
    secede()
    a, b = client.gather([a_future, b_future])
    rejoin()
    return a + b

In [17]:
import os

def walk_recursive_client(path, depth=0, max_depth=None, client_depth=2,
                          root=None):
    if root is None:
        root = path
        
    files, subdirs = list_files_and_subdirs(path)
    if '.zmetadata' in files:
        #print(path)
        return collection_from_zarr_group(path)
    elif max_depth and depth >= max_depth:
        return none
    else:
        cat = catalog(path)
        kwargs = dict(
            depth=(depth + 1),
            max_depth=max_depth,
            client_depth=client_depth,
            root=root
        )
        
        
        if depth <= client_depth:
            client = get_client()
            futures = [client.submit(
                            walk_recursive_client,
                            s,
                            #key=os.path.relpath(s, start=root),
                            **kwargs)
                       for s in subdirs]
            secede()
            items = compute(*client.gather(futures))
            rejoin()
        else:
            items = [walk_recursive_client(s, **kwargs)
                       for s in subdirs]

        cat.add_children([item for item in items if item])
        return cat

In [153]:
client.restart()



0,1
Client  Scheduler: tcp://127.0.0.1:39875  Dashboard: /user/rabernat/proxy/8787/status,Cluster  Workers: 1  Cores: 1  Memory: 4.29 GB


In [18]:
futures = client.submit(fib_client_sr, 20)
%time compute(client.gather(futures))[0]

CPU times: user 8.9 ms, sys: 4.18 ms, total: 13.1 ms
Wall time: 535 ms


6765

In [19]:
futures = client.submit(walk_recursive_client,
                        'cmip6/CMIP/AS-RCEC/TaiESM1/1pctCO2/r1i1p1f1/Amon')
compute(client.gather(futures))

(<Catalog id=Amon>,)

In [20]:
futures = client.submit(walk_recursive_client,
                        'cmip6/CMIP/AS-RCEC/TaiESM1')
compute(client.gather(futures))

(<Catalog id=TaiESM1>,)

In [58]:
client.restart()



0,1
Client  Scheduler: tcp://127.0.0.1:39875  Dashboard: /user/rabernat/proxy/8787/status,Cluster  Workers: 1  Cores: 1  Memory: 4.29 GB


In [21]:
futures = client.submit(walk_recursive_client,
                        'cmip6/CMIP/AWI', client_depth=5)
%time cat = compute(client.gather(futures))[0]

CPU times: user 102 ms, sys: 17.9 ms, total: 120 ms
Wall time: 9.45 s


In [22]:
cat.describe()

* <Catalog id=AWI>
    * <Catalog id=AWI-CM-1-1-MR>
        * <Catalog id=1pctCO2>
            * <Catalog id=r1i1p1f1>
                * <Catalog id=Amon>
                    * <Catalog id=clivi>
                        * <Collection id=gn>
                    * <Catalog id=clt>
                        * <Collection id=gn>
                    * <Catalog id=clwvi>
                        * <Collection id=gn>
                    * <Catalog id=evspsbl>
                        * <Collection id=gn>
                    * <Catalog id=hfls>
                        * <Collection id=gn>
                    * <Catalog id=hur>
                        * <Collection id=gn>
                    * <Catalog id=hus>
                        * <Collection id=gn>
                    * <Catalog id=huss>
                        * <Collection id=gn>
                    * <Catalog id=pr>
                        * <Collection id=gn>
                    * <Catalog id=prc>
                        * <Collection id=

In [23]:
futures = client.submit(walk_recursive_client,
                        'cmip6/CMIP/NCAR', client_depth=5)
%time cat = compute(client.gather(futures))[0]

KeyboardInterrupt: 

In [24]:
client.restart()

0,1
Client  Scheduler: gateway://traefik-gcp-uscentral1b-prod-dask-gateway.prod:80/prod.a31755a6ef9c46b09c62dd769a904e40  Dashboard: /services/dask-gateway/clusters/prod.a31755a6ef9c46b09c62dd769a904e40/status,Cluster  Workers: 5  Cores: 10  Memory: 42.95 GB
