In [None]:
from glob import glob
import xarray as xr
import cf_xarray # use cf-xarray so that we can use CF attributes
import pandas as pd
import matplotlib.pyplot as plt
import regionmask
import cartopy.crs as ccrs
import nc_time_axis
import numpy as np
import warnings
# To access collection
import dask
import intake
from dask_jobqueue import PBSCluster
from dask.distributed import Client, LocalCluster, futures_of
from dask.diagnostics import ProgressBar
from tqdm import tqdm 
import regionmask

## Spin up Dask cluster

In [None]:
# Create our NCAR Cluster - which uses PBSCluster under the hood
num_jobs = 10
cluster = PBSCluster(
    job_name = 'valencig-dask-hpc',
    cores = 1,
    memory = '10GiB',
    processes = 1,
    local_directory = '/glade/u/home/valencig/spilled/',
    log_directory = '/glade/u/home/valencig/worker-logs/',
    resource_spec = 'select=1:ncpus=1:mem=15GB',
    queue = 'casper',
    walltime = '02:00:00', # Change wall time if needed
    interface = 'ext'
)


# Spin up workers
cluster.scale(jobs=num_jobs)

# Assign the cluster to our Client
client = Client(cluster)

# Block progress until workers have spawned
client.wait_for_workers(num_jobs)

In [None]:
client

In [None]:
cluster.get_logs()

### Commands for managing dask workers

https://arc.ucar.edu/knowledge_base/68878389

In [None]:
# See the workers in the job scheduler
!qstat -u $USER

# Kill all running or pending jobs
# !qdel `qselect -u $USER`

## Read in the catalog

In [None]:
# 'cesm.json' is copy of '/glade/collections/cmip/catalog/intake-esm-datastore/catalogs/glade-cesm2-le.json'
# Comment out "options": null in aggregation_controls.aggregations.0 in order to get intake-esm to work
# cat = intake.open_esm_datastore('cesm2.json')
cat = intake.open_esm_datastore('/glade/collections/cmip/catalog/intake-esm-datastore/catalogs/glade-cmip6.json')
cat

## Querying for desired variable

https://www.cesm.ucar.edu/community-projects/lens/data-sets

CMIP6 variable list --> https://na-cordex.org/variable-list.html

Also --> https://wcrp-cmip.github.io/CMIP6_CVs/docs/CMIP6_experiment_id.html

In [None]:
# cat.search(component='atm', long_name=['wind*', 'Wind*']).df.long_name.unique()
cat.search(variable_id=['wind*', 'Wind*']).df.variable_id.unique()

## Query and subset data catalog

Overview found [here](https://www2.cesm.ucar.edu/projects/CMIP6/):

ScenarioMIP: "Will provide multi-model climate projections based on alternative scenarios of future emissions and land use changes produced with integrated assessment models. The design consists of eight alternative 21st century scenarios plus one large initial condition ensemble and a set of long-term extensions. Climate model projections will facilitate integrated studies of climate change as well as address targeted scientific questions."

Citation: O'Neill, B. C., Tebaldi, C., van Vuuren, D.P., Eyring, V., Friedlingstein, P., Hurtt, G., Knutti, R., Kriegler, E., Lamarque, J.-F., Lowe, J., Meehl, G.A., Moss, R., Riahi, K., and Sanderson, B. M. 2016. The Scenario Model Intercomparison Project (ScenarioMIP) for CMIP6. Geosci. Model Dev., 9, 3461-3482.

In [None]:
cesm2 = cat.search(
    variable_id='sfcWind', # near surface wind
    source_id='CESM2',
    experiment_id='ssp*',
    # experiment_id='historical', # all historical forcings
    table_id='day', # day is highest resolution
    activity_id='ScenarioMIP'
)

In [None]:
cesm2.keys_info()

## Read in using ```.to_dataset_dict()```

https://stackoverflow.com/questions/67813208/xarray-open-mfdataset-doesnt-work-if-dask-distributed-client-has-been-created

In [None]:
with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    dsets = cesm2.to_dataset_dict()

Let’s take a look at the keys - these are defined by the `groupby` attributes in the catalog. The groupby attributes in this case are:

https://www2.cesm.ucar.edu/experiments/cesm1.2/GLE/GLENS_output_fields/

`component.experiment.stream.forcing_variant.control_branch_year.variable`

- Component - which component this output is from (ex. atm represents the atmosphere)
- Experiment - which experiment this is from, in this case, this is `ssp370` which is one of the CMIP6 future experiments
- Stream - which stream this output is from, in this case, this is `cam.h1`, which represents daily output
- Control Branch Year - which year the ensemble branched off from, these are described within the [CESM2-LE documentation page](https://www.cesm.ucar.edu/community-projects/lens2)
- Variable - which variable you are working with


component = atm (atmosphere), lnd (land), ocn (ocean), ice

frequency = monthly, daily, or hourly6

experiment = historical (1850 to 2015) or ssp370 (2015 to 2100)

forcing_variant = the biomass forcing variant, cmip6 (the default in the cmip6 runs) or smbb (smoothed biomass burning)

variable = one of the variable names listed in the tables below

## Process Data

Time period for historical data is `1978` till `2014`.

In [None]:
def subset_ds(ds, task):
    states = regionmask.defined_regions.natural_earth_v5_0_0.us_states_50
    # Hawaii and Alaska are not included in the mask
    good_keys = [
        k for k in states.regions.keys() 
        if k not in states.map_keys(['Hawaii', 'Alaska'])
    ]
    mask = states.mask(ds.lon, ds.lat).isin(good_keys)
    da = ds.where(mask, drop=True).sfcWind.sel(time=slice('1978', '2100')) # Last time (2100) is wonky
    if task == 'mean':
        result = da.resample(time='1Y').mean(dim='time')
    elif task == 'anomaly':
        # Anomaly is x-x_mean 
        # Then get average anomaly on a yearly basis
        result = (da - da.mean('time')).resample(time='1Y').mean('time')
    return result

Tasks are `mean` or `anomaly`

In [None]:
task = 'mean'
print(f'Running task: {task}')
for key in tqdm(list(dsets.keys()), desc='Processing Data...'):
    ds = dsets[key].chunk({'time':365}) # Get into approx 100 mb chunks
    da = subset_ds(ds, task=task).persist()
    da.compute().to_netcdf('/glade/u/home/valencig/wind-trend-analysis/data/'+key+'.'+task+'.nc')

## Restart dask cluster

In [None]:
client.restart()

## Close dask cluster

In [None]:
client.shutdown()