In [1]:
import pickle
from datetime import datetime
import traceback
from pathlib import Path

from ecgtools import Builder
from ecgtools.parsers import parse_cmip6
from ecgtools.builder import INVALID_ASSET, TRACEBACK
import fsspec

import xarray as xr
import dask_gateway

In [16]:
gw = dask_gateway.Gateway("https://dask-gateway.jasmin.ac.uk", auth="jupyterhub")

In [18]:
clusters = gw.list_clusters()
if not clusters:
    cluster = gw.new_cluster(options, shutdown_on_close=False)
else:
    cluster = gw.connect(clusters[0].name)


In [20]:
# Create at least one worker, and allow your cluster to scale to three.
cluster.adapt(minimum=1, maximum=3)

# Get a dask client.
client = cluster.get_client()
client

0,1
Connection method: Cluster object,Cluster type: dask_gateway.GatewayCluster
Dashboard: https://dask-gateway.jasmin.ac.uk/clusters/45607653974d4fdc8c40718ead55a600/status,


In [30]:
def data_parser(file):
    file = Path(file)
    #print(file.stem.split('_'))
    info = {}

    try:
        stem = file.stem
        split = stem.split('.')
        #print(split)
        variable = split[-1]

        split = split[0].split('_')
        temporal = split[0]
        source = split[1]
    
        #split = split[-1].split('.')
        #print(split)
        #
    
        split = split[-1].split('-')
        #print(split)
        
        exp = split[1]
        what = split[2]
        ens_name = split[4]
        ens_num = int(ens_name.split('ensemble')[-1])      
         
        #with xr.open_dataset(file, chunks={}, decode_times=False) as ds:
            
        info = {
            'source': source,
            'temporal': temporal,
            'variable': variable,
            'ens': f"{ens_num:02d}",
            'simulation': what,
            'experiment': exp,
            'path': str(file),
        }

        return info

    except Exception:
        return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()}
    

In [29]:
exp = 1
f"{exp:02d}"

'01'

In [104]:
root_path

'/gws/nopw/j04/htmip/'

In [26]:
files = sorted(Path(root_path).rglob('**/**/*.nc'))

In [36]:
files[0]

PosixPath('/gws/nopw/j04/htmip/Exp1and4_coupledocean/waccm6mam/Dailymean_WACCM6MAM_HTHHMOC-Exp1and4-NoVolc-CoupledOcean-ensemble001.chi.nc')

In [38]:
_.as_posix()

'/gws/nopw/j04/htmip/Exp1and4_coupledocean/waccm6mam/Dailymean_WACCM6MAM_HTHHMOC-Exp1and4-NoVolc-CoupledOcean-ensemble001.chi.nc'

In [40]:
files[0].as_posix()

'/gws/nopw/j04/htmip/Exp1and4_coupledocean/waccm6mam/Dailymean_WACCM6MAM_HTHHMOC-Exp1and4-NoVolc-CoupledOcean-ensemble001.chi.nc'

In [86]:
data_parser(files[0].as_posix())

{'source': 'WACCM6MAM',
 'temporal': 'Dailymean',
 'variable': 'chi',
 'ens': 1,
 'simulation': 'NoVolc',
 'experiment': 'Exp1and4',
 'path': '/gws/nopw/j04/htmip/Exp1and4_coupledocean/waccm6mam/Dailymean_WACCM6MAM_HTHHMOC-Exp1and4-NoVolc-CoupledOcean-ensemble001.chi.nc'}

In [36]:
# Location of HTMIP data
root_path = "/gws/nopw/j04/htmip/"

# Set up the ecgtools builder; have to go to a depth of 9
# to get all of the available models. Theoretically, the 
# above root path could be modified to a specific model center
# directory (in which case a depth of 8 would be necessary).
# May need to do this when more SNAPSI data comes in but
# TODO: figure out how to combine catalogs (so that we do not
# have to repeat work)
print(f"Initializing builder object with {root_path}")
builder = Builder(paths=[root_path], extension=".nc", depth=3, njobs=-1)


Initializing builder object with /gws/nopw/j04/htmip/


In [37]:
builder.get_assets()


Builder(paths=['/gws/nopw/j04/htmip/'], storage_options={}, depth=3, exclude_patterns=[], include_patterns=[], joblib_parallel_kwargs={})

In [7]:
assets_start = datetime.now()
print(f"Getting target assets, starting {assets_start}")
builder.get_assets()
assets_end = datetime.now()
print(f"Finished obtaining assets at {assets_end}; duration {assets_end-assets_start}")

Getting target assets, starting 2024-03-27 10:15:30.923505
Finished obtaining assets at 2024-03-27 10:15:31.007555; duration 0:00:00.084050


In [38]:
builder.build(parsing_func=data_parser)

Builder(paths=['/gws/nopw/j04/htmip/'], storage_options={}, depth=3, exclude_patterns=[], include_patterns=[], joblib_parallel_kwargs={})

In [74]:
temp_file_name = builder.invalid_assets['INVALID_ASSET'].iloc[0].as_posix()

In [39]:
builder.invalid_assets#['TRACEBACK'].iloc[0]

In [10]:
builder.invalid_assets['TRACEBACK'].iloc[0]

'Traceback (most recent call last):\n  File "/tmp/ipykernel_516/3630927177.py", line 28, in data_parser\n    with xr.open_dataset(file, chunks={}, decode_times=False) as ds:\nNameError: name \'xr\' is not defined\n'

In [89]:
 builder.invalid_assets['INVALID_ASSET'].iloc[-1].as_posix()

'/gws/nopw/j04/htmip/Exp1and4_coupledocean/waccm6mam/Monthlymean_WACCM6MAM_HTHHMOC-Exp1and4-SO2andH2O-CoupledOcean-ensemble010.wtpct_mode3.nc'

In [40]:
builder.df.head()

Unnamed: 0,source,temporal,variable,ens,simulation,experiment,path
0,WACCM6MAM,Dailymean,chi,1,NoVolc,Exp1and4,/gws/nopw/j04/htmip/Exp1and4_coupledocean/wacc...
1,WACCM6MAM,Dailymean,epfy,1,NoVolc,Exp1and4,/gws/nopw/j04/htmip/Exp1and4_coupledocean/wacc...
2,WACCM6MAM,Dailymean,fyy,1,NoVolc,Exp1and4,/gws/nopw/j04/htmip/Exp1and4_coupledocean/wacc...
3,WACCM6MAM,Dailymean,utendepfd,1,NoVolc,Exp1and4,/gws/nopw/j04/htmip/Exp1and4_coupledocean/wacc...
4,WACCM6MAM,Dailymean,vtem,1,NoVolc,Exp1and4,/gws/nopw/j04/htmip/Exp1and4_coupledocean/wacc...


In [None]:
builder.

In [8]:
builder.df.experiment.unique()

array(['Exp1and4'], dtype=object)

In [None]:
xr.combine_by_coords()

In [41]:
builder.save(
    # File path - could save as .csv (uncompressed csv) or .csv.gz (compressed csv)
    name = "htmip_catalog",
    #directory=root_path,
    #catalog_type = 'dict',
    # Column name including filepath
    path_column_name='path',
    # Column name including variables
    variable_column_name='variable',
    # Data file format - could be netcdf or zarr (in this case, netcdf)
    data_format="netcdf",
    # Which attributes to groupby when reading in variables using intake-esm
    groupby_attrs=["source", "temporal", "simulation", "ens", "experiment"],
    # Aggregations which are fed into xarray when reading in data using intake
    aggregations=[
        {
            "type": "join_new",
            "attribute_name": "ens",
            "options": {"dim": "time", "coords": "minimal", "compat": "override"},
        }
    ],
)

Successfully wrote ESM catalog json file to: file:///home/users/akuchar/htmip_catalog.json
