In [1]:
from xmip.utils import google_cmip_col
from xmip.preprocessing import combined_preprocessing
col = google_cmip_col()

In [2]:
#Other catalog
import intake
col = intake.open_esm_datastore(
    "https://storage.googleapis.com/leap-persistent-ro/data-library/catalogs/cmip6-test/leap-pangeo-cmip6-test.json"
)
cat = col.search(variable_id=['tos', 'sos', 'chl', 'mlotst', 'spco2'], table_id='Omon', source_id=['MRI-ESM2-0'],
                 experiment_id=['historical', 'ssp245'])

In [2]:
# filter the full catalog for data we could use
cat = col.search(
    variable_id=['tos', 'sos', 'chl', 'mlotst', 'spco2'],
    table_id='Omon', # monthly ocean output only
    experiment_id=['historical', 'ssp245'],
    source_id=['CanESM5'],
    member_id=['r8i1p2f1'],
    # I used ssp245 as example but we should probably use the one that is closest to the data from 2014-2023
    require_all_on=['source_id', 'member_id', 'grid_label'] # this ensures that results will have all variables and experiments available
)

In [3]:
cat.df.groupby(['source_id', 'grid_label'])[['member_id']].nunique()

Unnamed: 0_level_0,Unnamed: 1_level_0,member_id
source_id,grid_label,Unnamed: 2_level_1
CanESM5,gn,1


In [4]:
#check which members are there
cat.df['member_id'].unique()

array(['r8i1p2f1'], dtype=object)

In [5]:
from xmip.preprocessing import combined_preprocessing
from xmip.postprocessing import concat_experiments, merge_variables

ddict = cat.to_dataset_dict(
    preprocess=combined_preprocessing,
    xarray_open_kwargs=dict(use_cftime=True),
    aggregate=False
)


--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.institution_id.source_id.experiment_id.member_id.table_id.variable_id.grid_label.zstore.dcpp_init_year.version'


    invalid units for variable 'sos': 0.001 (attribute) (reason: Unit expression cannot have a scaling factor.)
    invalid units for variable 'sos': 0.001 (attribute) (reason: Unit expression cannot have a scaling factor.)


In [10]:
ds = ddict_final['CanESM5.gn.Omon.r10i1p2f1']

NameError: name 'ddict_final' is not defined

In [8]:
ds = ddict

### Regrid

In [6]:
import xesmf as xe
import xarray as xr
from xmip.utils import cmip6_dataset_id


def replace_calendar(ds:xr.Dataset) -> xr.Dataset:
    year = ds.time.data[0].year
    month = ds.time.data[0].month
    start_date = f'{year}-{month:0>2}-01'
    new_monthly_time = xr.cftime_range(start_date, periods=len(ds.time), freq='1MS')
    ds = ds.assign_coords(time=new_monthly_time)
    return ds

#TODO:  create a regridder dict per source_id (faster)


target_grid = xe.util.grid_global(1,1)
def regrid(ds:xr.Dataset) -> xr.Dataset:
    #FIXME: This should not be done for every dataset
    regridder = xe.Regridder(ds, target_grid, 'bilinear', ignore_degenerate=True, periodic=True) #TODO: Check if this should be conservative?
    ds_regridded = regridder(ds, keep_attrs=True)
    return ds_regridded

def full_testbed_processing(ds: xr.Dataset) -> xr.Dataset:
    ds = ds.squeeze(drop=True)
    # select surface depth (for chl, TODO: Check if surface chlorophyll is available)
    ds = ds.isel(lev=0).drop('lev')
    
    ds = ds.sel(time=slice('1850', '2100'))
    
    # testing
    assert len(ds.time) == 3012
    assert ds.time.data[0].year == 1850
    
    # Processing
    ds_regridded = regrid(ds)
    ds_new_cal = replace_calendar(ds_regridded)

    return ds_new_cal

In [7]:
from dask.diagnostics import ProgressBar
for k,ds in ddict.items():
    print(f"Processing {k}")
    ds_out = full_testbed_processing(ds)
    
    ds_id = cmip6_dataset_id(ds_out, id_attrs=[
    'source_id',
    'variant_label',
    'table_id',
    'grid_label',
    'version',
    ])
    save_path = f"gs://leap-scratch/jbusecke/pco2-testing/{ds_id}"
    print(f"Writing to {save_path = }")
    with ProgressBar():
        ds_out.chunk({'time':200}).to_zarr(save_path, mode='w')

Processing ScenarioMIP.CCCma.CanESM5.ssp245.r8i1p2f1.Omon.sos.gn.gs://cmip6/CMIP6/ScenarioMIP/CCCma/CanESM5/ssp245/r8i1p2f1/Omon/sos/gn/v20190429/.20190429


ValueError: Dimensions {'lev'} do not exist. Expected one or more of Frozen({'y': 291, 'x': 360, 'time': 1032, 'vertex': 4, 'bnds': 2})

In [1]:
#Select CanESM5 only
import intake
col = intake.open_esm_datastore(
    "https://storage.googleapis.com/leap-persistent-ro/data-library/catalogs/cmip6-test/leap-pangeo-cmip6-test.json"
)
cat = col.search(variable_id=['tos', 'sos', 'chl', 'mlotst', 'spco2'], table_id='Omon', source_id=['CanESM5'],
                 experiment_id=['historical', 'ssp245'])

In [5]:
#Select all models
import intake
col = intake.open_esm_datastore(
    "https://storage.googleapis.com/leap-persistent-ro/data-library/catalogs/cmip6-test/leap-pangeo-cmip6-test.json"
)
cat = col.search(variable_id=['tos', 'sos', 'chl', 'mlotst', 'spco2'], table_id='Omon',
                 experiment_id=['historical', 'ssp245'])

In [3]:
#check which members are there
cat.df['member_id'].unique()

array(['r37i1p2f1', 'r31i1p2f1', 'r29i1p2f1', 'r36i1p2f1', 'r26i1p2f1',
       'r27i1p2f1', 'r35i1p2f1', 'r33i1p2f1', 'r40i1p2f1', 'r32i1p2f1',
       'r30i1p2f1', 'r34i1p2f1', 'r28i1p2f1', 'r38i1p2f1', 'r39i1p2f1'],
      dtype=object)

In [3]:
cat.df.groupby(['source_id', 'grid_label'])[['member_id']].nunique()

Unnamed: 0_level_0,Unnamed: 1_level_0,member_id
source_id,grid_label,Unnamed: 2_level_1
ACCESS-CM2,gn,2
ACCESS-ESM1-5,gn,37
CAS-ESM2-0,gn,2
CMCC-CM2-HR4,gn,1
CMCC-CM2-SR5,gn,11
CMCC-ESM2,gn,1
CNRM-CM6-1,gn,1
CNRM-ESM2-1,gn,5
CanESM5,gn,15
CanESM5-1,gn,2


xCO2 for each model? Should be the same if the same ssp scenario is used?

For each member dataset
- concatenate historical and ssp245
- homogenize time calendar 
- merge variables (SST, SSS, mld, chl, pCO2)
- regrid 1x1 degree, monthly






In [None]:
dsets = cat.to_dataset_dict(zarr_kwargs={'consolidated': True},
                                   storage_options={'token': 'anon'})
# list all merged datasets
[key for key in dsets.keys()]

In [None]:
ds = dsets['ScenarioMIP.THU.CIESM.ssp585.Amon.gr']
ds

In [None]:
from cmip6_preprocessing.preprocessing import combined_preprocessing

In [None]:
dsets = col_subset.to_dataset_dict(
  zarr_kwargs={'consolidated': True, 'decode_times':False},
  aggregate=True,
  preprocess=combined_preprocessing,
  storage_options={'token': 'anon'}
)