In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
from glob import glob
from subprocess import check_output

import yaml
import dask

import util

USER = os.environ['USER']

assert os.path.exists('/glade/campaign'), (
    'campaign is not accessible; run on Casper'
)

  from distributed.utils import tmpfile


## Get info on cases to process

In [16]:
restart_campaign = '/glade/campaign/collections/cmip/CMIP6/restarts'
cplhist_campaign = '/glade/campaign/collections/cmip/CMIP6/cpl_hist'

cplhist_stage_root = util.cplhist_stage_root
restart_stage_root = util.restart_stage_root


def find_restart_tar(refcase, refdate):
    output = check_output(['find', restart_campaign, '-name', f'{refcase}.rest.{refdate}.tar'])
    if not output:
        print(f'[WARNING]: restart package not found: {refcase}.rest.{refdate}.tar')
        return
    return output.decode("UTF-8").strip()


with open('cplhist-cases.yml') as fid:
    cplhist_cases = yaml.safe_load(fid)

    
experiments = []
cplhist_case_list = []
restart_cases = []
year_range = []
for exp, case_info in cplhist_cases.items():
    experiments.append(exp)
    cplhist_case_list.append(case_info['case'])
    year_range.append((case_info['yr_lo'], case_info['yr_hi']))
    if exp == 'historical':
        refcase = case_info['parent_experiment']
        refdate = f"{case_info['parent_branch_year']:04d}-01-01-00000"
        restart_cases.append({
            'refcase': refcase,
            'refdate': refdate,
            'tarfile': find_restart_tar(refcase, refdate),
        })
        
        
concat_jobs = []            

## Spin up dask cluster

In [4]:
cluster, client = util.get_ClusterClient(walltime='24:00:00')
cluster.scale(32)

client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mclong/casper/proxy/8787/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mclong/casper/proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.12.206.48:36967,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mclong/casper/proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


## Unpack `CPLHIST` forcing and concatenate daily `CPLHIST` files into monthly files 

In [17]:
streams = ['ha2x', 'ha2x1hi', 'ha2x1h', 'ha2x3h', 'ha2x1d', 'hr2x']

if concat_jobs:
    raise ValueError('confirm that monthly concatenation is complete')

for case, (yr_lo, yr_hi) in zip(cplhist_case_list, year_range):
    print(f'{case}')
    
    dir_daily = f"{cplhist_stage_root}/cpl_hist/{case}/orig"
    os.makedirs(dir_daily, exist_ok=True)
    
    dir_monthly = f"{cplhist_stage_root}/cpl_hist/{case}/monthly"    
    os.makedirs(dir_monthly, exist_ok=True)
        
    for stream in streams:

        tarfiles = sorted(glob(f'{cplhist_campaign}/{case}/*.{stream}.*'))
        if not tarfiles:
            continue
        
        years = [int(f.split('.')[-2]) for f in tarfiles]
        tarfiles = [f for y, f in zip(years, tarfiles) if yr_lo <= y and y <= yr_hi]
        years = [y for y in years if yr_lo <= y and y <= yr_hi]
        
        monthly_files = [
            f'{dir_monthly}/{case}.cpl.{stream}.{y:04d}-{m:02d}.nc' 
            for y in years for m in range(1, 13)
        ]
        print(f'- {stream}', end=': ')        
        
        if not monthly_files:
            print('no data found in year range')
            continue
        
        if all([os.path.exists(f) for f in monthly_files]):
            print('all monthly files present')
            continue
        
        delayed_objs = []
        for tarfile in tarfiles:
            delayed_objs.append(
                dask.delayed(util.extract_tar)(tarfile, dir_daily)
            )

        if delayed_objs:
            print(f'extracting {len(delayed_objs)} files')
            computed_objs = dask.compute(*delayed_objs)

        # call concatenator
        concat_jobs.append(
            util.concat_cplhist_mon(case, stream, yr_lo, yr_hi)
        )

b.e21.BHIST.f09_g17.CMIP6-historical.011
- ha2x1hi: all monthly files present
- ha2x1h: all monthly files present
- ha2x3h: all monthly files present
- ha2x1d: all monthly files present
- hr2x: all monthly files present
b.e21.BSSP585cmip6.f09_g17.CMIP6-SSP5-8.5.102
- ha2x1hi: all monthly files present
- ha2x1h: extracting 86 files
- ha2x3h: extracting 86 files
- hr2x: extracting 86 files


In [18]:
client.close()
cluster.close()

## Copy restarts

In [19]:
for rest_info in restart_cases:
    refcase = rest_info['refcase']
    refdate = rest_info['refdate']
    tarfile = rest_info['tarfile']

    if tarfile is None:
        continue

    dirout = f"{restart_stage_root}/{refcase}"
    os.makedirs(dirout, exist_ok=True)

    if not os.path.exists(f"{dirout}/{refdate}"):
        print(f"unpacking to: {dirout}/{refdate}")
        util.extract_tar_pbs(tarfile, dirout)
    else:
        print(f"exists: {dirout}/{refdate}")


exists: /glade/scratch/mclong/cplhist_data/restarts/b.e21.B1850.f09_g17.CMIP6-piControl.001/0871-01-01-00000
