## Drought variables across CMIP6 
Research Computing for Earth Sciences Final Project <br>
December 2020 <br>
Miriam Nielsen

### Research Question
What is the relationship between drought-associated variables across CMIP6 Historical Model Experiments?



In [6]:
# Needed Packages
import cartopy.crs as ccrs
import cartopy
import zarr
import fsspec
import intake

import numpy as np
import pandas as pd
import xarray as xr
import dask 
import xesmf as xe
xr.set_options(display_style='html')

from tqdm.autonotebook import tqdm  # progress bars for loops

from dask_gateway import Gateway
from dask.distributed import Client

from matplotlib import pyplot as plt
%matplotlib inline
plt.rcParams['figure.figsize'] = (12, 6)

from IPython import display
display.set_matplotlib_formats('retina')


### Create Dask Cluster

TK description of CMIP6 size

using a dask cluster that (ideally) scales adaptively to parellelize the analysis

In [7]:
gateway = Gateway()
cluster = gateway.new_cluster()
cluster.adapt(minimum=1, maximum=20)
client = Client(cluster)
cluster

ValueError: Cluster ooi-prod.4fe30a2261ab40f3853b80a77bd9c6ce not found

### Data Loading & Organizing

In [None]:
# open pangeo catalogue of CMIP6 experiments
cat_url = "https://storage.googleapis.com/cmip6/pangeo-cmip6.json"

# open using intake
col = intake.open_esm_datastore(cat_url)
#Summary of collection structure
# col

Contains many unique experiments

In [None]:
# print(col.df.experiment_id.unique())
# print(len(col.df.experiment_id.unique()))

I only want to look at models and experiments that include all three variables of interest: `tas`, `tasmax`, and `mrsos`

TK EXPLAIN BELOW

In [None]:
uni_dict = col.unique(['source_id', 'experiment_id', 'table_id'])
models = set(uni_dict['source_id']['values']) # all the models

for variable_id in ['tas', 'tasmax','mrsos']:
    query = dict(variable_id = variable_id)
    cat = col.search(**query)
    models = models.intersection({model for model in cat.df.source_id.unique().tolist()})

models = list(models)

experiments = set(uni_dict['experiment_id']['values'])

for model in models:
    query = dict(variable_id = variable_id)
    cat = col.search(**query)
    experiments = experiments.intersection({experiment for experiment in cat.df.experiment_id.unique().tolist()})

experiments = list(experiments)
print(len(experiments))
print(experiments)
    

In [None]:
query = dict(
    # WHICH EXPERIMENTS SHOULD I USE??? 
    # options: 'historical', 'ssp245', 'ssp585' <- lots for tas, idk about other variables
    
    experiment_id=['1pctCO2', 'piControl'],       # pick the `piControl` and TK TK forcing experiments
    variable_id=['tas', 'tasmax','mrsos'],        # near-surface air temperature (tas), maximum surface air temperature (tasmax), and surface soil moisture (mrsos)
    # WHICH REALIZATION SHOULD I USE?? 
    member_id = 'r1i1p1f1',                       # arbitrarily pick one realization for each model (i.e. just one set of initial conditions)
)

col_subset = col.search(require_all_on=["source_id"], **query)
col_subset.df.groupby("source_id")[
    ["experiment_id", "variable_id", "table_id"]
].nunique()


In [None]:
def drop_all_bounds(ds):
    """Drop coordinates like 'time_bounds' from datasets,
    which can lead to issues when merging."""
    drop_vars = [vname for vname in ds.coords
                 if (('_bounds') in vname ) or ('_bnds') in vname]
    return ds.drop(drop_vars)

def open_dsets(df):
    """Open datasets from cloud storage and return xarray dataset."""
    dsets = [xr.open_zarr(fsspec.get_mapper(ds_url), consolidated=True)
             .pipe(drop_all_bounds)
             for ds_url in df.zstore]
    try:
        ds = xr.merge(dsets, join='exact')
        return ds
    except ValueError:
        return None

def open_delayed(df):
    """A dask.delayed wrapper around `open_dsets`.
    Allows us to open many datasets in parallel."""
    return dask.delayed(open_dsets)(df)


In [None]:
from collections import defaultdict

dsets = defaultdict(dict)
for group, df in col_subset.df.groupby(by=['source_id', 'experiment_id']):
    dsets[group[0]][group[1]] = open_delayed(df)

In [None]:
dsets_ = dask.compute(dict(dsets))[0]

### Global Mean Comparison
Reduce data with global mean, weighted by a factor proportional to cos(lat)

In [None]:
def get_lat_name(ds):
    """Figure out what is the latitude coordinate for each dataset."""
    for lat_name in ['lat', 'latitude']:
        if lat_name in ds.coords:
            return lat_name
    raise RuntimeError("Couldn't find a latitude coordinate")

def global_mean(ds):
    """Return global mean of a whole dataset."""
    lat = ds[get_lat_name(ds)]
    weight = np.cos(np.deg2rad(lat))
    weight /= weight.mean()
    other_dims = set(ds.dims) - {'time'}
    return (ds * weight).mean(other_dims)

In [None]:
expts = ['piControl', '1pctCO2']
expt_da = xr.DataArray(expts, dims='experiment_id',
                       coords={'experiment_id': expts})

dsets_aligned = {}

for k, v in tqdm(dsets_.items()):
    expt_dsets = v.values()
    if any([d is None for d in expt_dsets]):
        print(f"Missing experiment for {k}")
        continue

    for ds in expt_dsets:
        ds.coords['year'] = ds.time.dt.year - ds.time.dt.year[0]

    # workaround for
    # https://github.com/pydata/xarray/issues/2237#issuecomment-620961663
    dsets_ann_mean = [v[expt].pipe(global_mean)
                             .swap_dims({'time': 'year'})
                             .drop('time')
                             .coarsen(year=12).mean()
                      for expt in expts]

    # align everything
    dsets_aligned[k] = xr.concat(dsets_ann_mean, join='right',
                                 dim=expt_da)

In [None]:
dsets_aligned_ = dask.compute(dsets_aligned)[0]

In [None]:
source_ids = list(dsets_aligned_.keys())
source_da = xr.DataArray(source_ids, dims='source_id',
                         coords={'source_id': source_ids})

big_ds = xr.concat([ds.reset_coords(drop=True)
                    for ds in dsets_aligned_.values()],
                   dim=source_da)
big_ds

In [None]:
source_ids = list(dsets_.keys())
source_da = xr.DataArray(source_ids, dims='source_id',
                         coords={'source_id': source_ids})

big_ds = xr.concat([ds.reset_coords(drop=True)
                    for ds in dsets_.values()],
                   dim=source_da)
big_ds

### TK SOME ANALYSIS

gridpoint correlations xr.corr(ds1, ds2, dim=‘time’)
pearsons corr coefficients


In [None]:
ds_mean = big_ds[['tas','tasmax','mrsos']].sel(experiment_id='piControl').mean(dim='year')
ds_anom = big_ds[['tas','tasmax','mrsos']] - ds_mean

# add some metadata
ds_anom.tas.attrs['long_name'] = 'Global Mean Surface Temp Anom'
ds_anom.tas.attrs['units'] = 'K'
ds_anom.tasmax.attrs['long_name'] = 'Global Mean Maximum Surface Temperature Anom'
ds_anom.tasmax.attrs['units'] = 'K'
ds_anom.mrsos.attrs['long_name'] = 'Global Mean Surface Soil Moisture Anom'
# figure out what units mrsos is in 

ds_anom

In [None]:
drought_vars = ['tas','tasmax','mrsos']
var_names = ['Temperature', 'Maxiumum Temperature', 'Surface Soil Moisture']

# TO DO - MAKE THIS ALL ONE FIGURE WITH A SINGLE LEGEND 
count = 0

for i in drought_vars:
    ds_anom[i].plot.line(col='source_id', x='year', col_wrap = 4) # expand col_wrap with more experiments
    plt.suptitle(var_names[count])
    count += 1
    plt.tight_layout()

### correlation


In [None]:
correlation of time series

In [None]:
# overall pearsons r 
ds_anom

In [None]:
ds_corr = ds_anom
ds_corr['corr'] = xr.corr(ds_anom['tasmax'], ds_anom['mrsos'], dim= 'year')
ds_corr

In [None]:
ds_corr.plot()

### Discussion

**TK**

TK

**TK**

TK

In [None]:
# Gracefully destroy/close our cluster
client.close()
cluster.close()

In [None]:
df_drought = df[(df.table_id == 'Lmon') & (df.variable_id == 'mrsos') & (df.activity_id == 'CMIP')]
len(df_drought)

In [None]:
run_counts = df_drought.groupby(['source_id', 'experiment_id'])['zstore'].count()
run_counts

In [None]:
source_ids = []
experiment_ids = ['historical', 'ssp585']
for name, group in df_drought.groupby('source_id'):
    if all([expt in group.experiment_id.values
            for expt in experiment_ids]):
        source_ids.append(name)
source_ids

In [None]:
uni_dict = col.unique(['source_id', 'experiment_id', 'table_id'])

models = set(uni_dict['source_id']['values']) # all the models

for variable_id in ['tas', 'tasmax', 'mrsos']:
    query = dict(table_id=['Amon','Lmon'], 
                 variable_id=variable_id, activity_id ='CMIP')  
    cat = col.search(**query)
    models = models.intersection({model for model in cat.df.source_id.unique().tolist()})

models = list(models)
print(models)



In [None]:
# open pangeo catalogue of CMIP6 experiments
cat_url = "https://storage.googleapis.com/cmip6/pangeo-cmip6.json"

# open using intake
col = intake.open_esm_datastore(cat_url)
print(col) #Summary of collection structure


# create a dictionary of source, experiment, and variable information for easy access later
uni_dict = col.unique(['source_id', 'experiment_id', 'table_id'])

# Find all CMIP6 historical experiments that include all three variables of interest
models = set(uni_dict['source_id']['values']) # all the models

for variable_id in ['tas', 'tasmax', 'mrsos']:
    query = dict(experiment_id='historical', table_id=['Amon','Lmon'], 
                 variable_id=variable_id, activity_id ='CMIP')  
    cat = col.search(**query)
    models = models.intersection({model for model in cat.df.source_id.unique().tolist()})

models = list(models)
print("Relevant historical experiments:", models)

In [None]:
# Create a catalogue of all relevant models 
#cat = col.search(experiment_id='historical', table_id=['Amon','Lmon'], 
#                 variable_id=['tas', 'tasmax', 'mrsos'], activity_id ='CMIP', source_id=models)
#cat.df

# create a catalogue of a smaller list of models 
cat = col.search(experiment_id = 'historical', table_id = ['Amon', 'Lmon'], 
                 variable_id = ['tas', 'tasmax', 'mrsos'], 
                 activity_id = 'CMIP', 
                 source_id = ['GFDL-CM4', 'GISS-E2-1-H', 'GISS-E2-1-G'])

In [None]:
# avoid creating the large chunks when creating a dictionary of experiments
dask.config.set({"array.slicing.split_large_chunks": False})

# create dictionary of experiments
dset_dict = cat.to_dataset_dict(zarr_kwargs={'consolidated': True}, 
                                cdf_kwargs={'chunks': {}})

dset_dict.keys()

In [None]:
print(len(dset_dict.keys()))

In [None]:
cat

In [None]:
# mask out land areas to compare soil moisture to temperature data
# mask = regionmask.defined_regions.natural_earth.land_110.mask(ds)

In [None]:
contour_levels = [-6, -5, -4, -3, -2, -1.5, -1, -0.5, 0, 0.5, 1, 1.5, 2, 3, 4, 5, 6]

def make_map_plot(nplot_rows, nplot_cols, plot_index, data, plot_label):
    """ Create a single map subplot. """
    ax = plt.subplot(nplot_rows, nplot_cols, plot_index, projection = ccrs.Robinson(central_longitude = 180))
    cplot = plt.contourf(lons, lats, data, 
                         levels = contour_levels, 
                         cmap = color_map, 
                         extend = 'both', 
                         transform = ccrs.PlateCarree())
    ax.coastlines(color = 'grey')
    ax.text(0.01, 0.01, plot_label, fontSize = 14, transform = ax.transAxes)
    return cplot, ax


In [None]:
# Generate plot (may take a while as many individual maps are generated)
numPlotRows = 8
numPlotCols = 4
figWidth = 20 
figHeight = 30 
 
fig, axs = plt.subplots(numPlotRows, numPlotCols, figsize=(figWidth,figHeight))


#### References

This project uses code from several Pangeo CMIP Tutorials including: <br>
[Drake and Abernathey, Estimating Equilibrium Climate Sensitivity (ECS) in CMIP6 models](http://gallery.pangeo.io/repos/pangeo-gallery/cmip6/ECS_Gregory_method.html)

In [None]:
# i think i can get rid of this
import warnings
warnings.filterwarnings("ignore")

# Silence dask.distributed logs
import logging
logger = logging.getLogger("distributed.utils_perf")
logger.setLevel(logging.ERROR)
import dask.config
dask.config.set({'distributed.logging.distributed': 'error'})