# 20 km WRF post-processing pipeline

This pipeline restructures raw WRF outputs into more user-friendly files that can be easily imported into popular GIS software. Specifically, the raw WRF outputs processed here are the 20km outputs done for Alaska and surrounding regions using ERA-Interim, GFDL-CM3, and NCAR-CCSM4 data. This pipeline is designed to be executed entirely from this notebook.

This is a rather complicated SNAP data pipeline codebase. It works on a large amount of data, creates a large number of files, and makes use of SLURM, specific directory structure / file management, and asyncronous execution ability (i.e. re-run certain steps, run steps for only certain variables, etc). The "Setup" will guide through how to run it.

# 0 - Setup

This step provides instructions for setting up and running the pipeline.

## 0.1 - Pipeline execution

The pipeline can be executed to restructure all WRF files of interest, a single year's worth for a single variable / model / scenario combination, and other combinations.

The input data are grouped by model name. Thus, everything applies at the model level. 

**Notes to clean up**
Given the large file size/abundance issue, this pipeline is best utilized in an async fashion, with memory management tasks, regular printouts of what's happening and progress on things, what files are where for which groups, etc. Just thinking through good design for this thing. I want to be able supply a list of groups/years to work on be able to check what is submitted and when

!!! note about cells being idempotent, and about options / toggles for running / not running certain groups, and about how there is a lot of data worked on and so some cells exist to process all of the data, and to re-process failed subsets.... oh this is similar to the above comment. 

Make a note about each cell potentially having it's own number of CPUs used, year and variable / models specified


## 0.2 - Environment variables / global arguments

Instead of relying on environment variables, this pipeline utilizes user-supplied parameters specified in the cells of this notebook by simply assigning values to variables.

The following variables apply to the whole pipeline and are set in the code cell below:

* `base_dir` - Full path to the directory that will contain all ancillary and intermediate files that will be kept, such as scripts for slurm / `sbatch`
* `output_dir` - Full path to the directory that will contain the final output data (will be the same as `base_dir` here but specified separately for consistency with other SNAP pipelines)
* `scratch_dir` - Full path to the scratch directory that raw WRF outputs will be copied to prior to processing them
    * This pipelines works with WRF outputs that are on a mounted file system, and so can be copied over to scratch space and removed when done to improve IO and avoid the need to keep them in the `base_dir`.
* `slurm_email` - String containing email address to use for failed slurm notifications

In [None]:
# User-parameters
base_dir = "/rcs/project_data/wrf_data"
output_dir = "/rcs/project_data/wrf_data"
scratch_dir = "/atlas_scratch/kmredilla/WRF/restack-20km-wrf"
slurm_email = "kmredilla@alaska.edu"

## Arguments

The following arguments are required:

- `wrf_dir`: This is the directory containing the WRF files. This codebase is designed for use with hourly output, so this needs to be the `hourly/` directory if there are multiple options (e.g. `daily/`, `monthly/`, etc.).
- `wrf_group`: code specifying the WRF group being worked on, one of `era_interim`, `gfdl_hist`, `ccsm_hist`, `gfdl_rcp85`, `ccsm_rcp85`.
- `years`: a space-separated string of years to work on, such as "1979 1980", or omit to work on all years of a given group.

This pipeline is designed to be run for a single model/scenario combination, or "WRF group." The WRF outputs of interest from different runs of model/scenario may be in separate places, but there is consistency in file structure across all groups - all `hourly` directories have annual subgroups consisting of the WRF outputs to be restacked.

This codebase is currently hard-coded to only work on 5 specific WRF "groups": ERA-Interim, and the historical / RCP 8.5 scenarios for the NCAR-CCSM4 and the GFDL-CM3 models.

## 0.3 - Global imports and filepaths

Set up all filepathing used in the cell below and import all packages used in multiple sections. 

In [3]:
import os
import time
from pathlib import Path
# codebase
import luts

base_dir = Path(base_dir)
output_dir = Path(output_dir)
scratch_dir = Path(scratch_dir)

Environment variable $BASE_DIR set to: /rcs/project_data/wrf_data
Environment variable $SCRATCH_DIR set to: /atlas_scratch/kmredilla/WRF/restack-20km-wrf
Environment variable $SLURM_EMAIL set to: kmredilla@alaska.edu


# 1: Restack the data

This first step is the heaviest lift of the pipeline. It restacks the WRF outputs, which means extracting the data for all variables in a single WRF file and combining them into new files grouped by variable and year.

In [4]:
# Arguments
wrf_dir = Path("/storage01/pbieniek/ccsm/hist/hourly")
group = "ccsm_hist"
years = [2004]
variable = "lh"
clobber = False

## 1.1: Copy WRF data to scratch space 

If not present on the filesystem (as is the at the time of developing the current code) then the WRF data need to be copied over. 

Copy the annual subdirectory (or directories) containing the WRF outputs for all specified years to scratch space for efficient reading. 

Prior to executing this cell, specify the following arguments at the top:
- `wrf_dir`: This is the directory containing the annual subdirectories of WRF output files. This codebase is designed for use with hourly output, so this needs to be the `hourly/` directory if there are multiple options (e.g. `daily/`, `monthly/`, etc.).
- `wrf_group`: code specifying the WRF group being worked on, one of `erain_hist`, `gfdl_hist`, `ccsm_hist`, `gfdl_rcp85`, `ccsm_rcp85`.
- `years`: a list of integer years to work on, or omit to work on all years of a given group.


In [4]:
def validate_group(wrf_dir, group):
    """Check that the WRF directory matches the group
    """
    parent = wrf_dir.parent
    model, scenario = group.split("_")
    
    #   a bit wonky because in the current setup,
    # ERA-Interim is the only model that does not 
    # have a "scenario" subfolder
    if model == "erain":
        if parent.name == model:
            return True
        else:
            return False
    elif scenario != parent.name:
        return False
    elif model != parent.parent.name:
        return False
    else:
        return True

def get_wrf_fps(wrf_dir, group, years):
    """Make the WRF filepaths based on the WRF dir, group,
    and 
    
    """
    if years is None:
        years = luts.groups[group].years
    
    wrf_fps = []
    for year in years:
        wrf_fps.extend(wrf_dir.joinpath(str(year)).glob("*.nc"))
    
    return wrf_fps


def test_file_equivalence(fp1, fp2, heads=False, detail=False):
    """Check that fp1 and fp2 have the same 
    size (and "header" info if heads), 
    return bool (or dict of bools if detail)
    """
    if fp1 == fp2:
        raise ValueError("fp1 and fp2 must be different")
        
    # check file sizes
    try:
        sizes_equal = fp1.stat().st_size == fp2.stat().st_size
    # except error for non-existing file error
    except FileNotFoundError:
        return False
    
    if heads:
        # check file heads
        with xr.open_dataset(fp1) as ds1:
            with xr.open_dataset(fp2) as ds2:
                heads_equal = str(ds1) == str(ds2)
                
        if not detail:
            return np.all([sizes_equal, heads_equal])
        else:
            return {"sizes_equal": sizes_equal, "heads_equal": heads_equal}

    else:
        return sizes_equal


def sbatch_copyto_scratch(fp_pairs, group, years, base_dir, scratch_dir, slurm_email, ncpus=5):
    """Submit a batch job to copy the required 
    WRF output files from wrf_dir to $SCRATCH_DIR
    
    Args:
        fp_pairs (list): list of 2-tuples of source 
            and destination filepaths for cp command
        group (str): name of WRF group being worked on
        years (list): list of years to be copied
        base_dir (PosixPath): path for $BASE_DIR, where 
            slurm scripts will be written
        scratch_dir (PosixPath): path for $SCRATCH_DIR, where 
            WRF files will be copied
        slurm_email (str): email address for Slurm failures
        ncpus (int): number of CPUS to use for multiprocessing
            
    Returns:
        job number for submitted sbatch job
    """
    # pickle the list of fp_pairs for SBATCHing
    # use years to give meaningful name
    years = sorted(years)
    if len(years) == 1:
        years_fn = years[0]
    else:
        years_fn = f"{years[0]}-{years[-1]}"
    pairs_fp = scratch_dir.joinpath(f"tmp/cp_scratch_{group}_{years_fn}_fp_pairs.pickle")
    with open(pairs_fp, "wb") as f:
        pickle.dump(fp_pairs, f)
    
    cp_script = Path(os.getcwd()).joinpath("mp_cp.py")
    
    slurm_dir = base_dir.joinpath("slurm/cp_scratch")
    slurm_dir.mkdir(exist_ok=True, parents=True)
    
    # setup command
    sbatch_out_str = f"{str(slurm_dir)}/slurm_cp_%j_{group}_{years_fn}.out"
    head = (
        "#!/bin/sh\n"
        "#SBATCH --nodes=1\n"
        f"#SBATCH --cpus-per-task={ncpus}\n"
        "#SBATCH --account=snap\n"
        "#SBATCH --mail-type=FAIL\n"
        f"#SBATCH --mail-user={slurm_email}\n"
        "#SBATCH -p main\n"
        f'#SBATCH --output {sbatch_out_str}\n'
        'eval "$(conda shell.bash hook)"\n'
        # base conda env has anaconda-project installed
        "conda activate\n"
    )
    command = f"anaconda-project run python {cp_script} -fp {pairs_fp} -n {ncpus}\n"
    
    # write to .slurm script
    sbatch_fp = slurm_dir.joinpath(pairs_fp.name.replace(".pickle", ".slurm"))
    with open(sbatch_fp, "w") as f:
        f.write(head + command)
        
    out = subprocess.check_output(["sbatch", sbatch_fp])
    
    return {"subprocess_out": out, "sbatch_out_str": sbatch_out_str}

In [26]:
# Arguments
wrf_dir = Path("/storage01/pbieniek/ccsm/rcp85/hourly")
group = "ccsm_rcp85"
years = [2089]
variable = "lh"
clobber = False

In [55]:
# Copy an annual subdirectory containing the WRF outputs to scratch disk space

ncpus = 5
clobber = False


import pickle
import datetime as dt
from multiprocessing import Pool
from subprocess import check_output
import numpy as np


group_is_valid = validate_group(wrf_dir, group)
if not group_is_valid:
    raise SystemExit("Invalid group / directory combination")

# get all WRF files needed for supplied parameters
wrf_fps = get_wrf_fps(wrf_dir, group, years)

# make yearly directories in SCRATCH_DIR as needed
group_dir = scratch_dir.joinpath(group)
_ = [
    group_dir.joinpath(str(year)).mkdir(exist_ok=True, parents=True) 
    for year in years
]

# build args for Pooling
fp_pairs = [(fp, group_dir.joinpath(fp.parent.name, fp.name)) for fp in wrf_fps]

# If clobber is not set, remove all files that have been copied over
# based on matching filesize and creation date
if not clobber:
    fp_pairs = [pair for pair in fp_pairs if not test_file_equivalence(pair[0], pair[1])]

if len(fp_pairs) != 0:
    print((
        f"Submitting job to copy files from {wrf_dir} to {group_dir} at "
        f"{dt.datetime.utcfromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')}"
    ))
    sbatch_copy_out = sbatch_copyto_scratch(
        fp_pairs, 
        group, 
        years, 
        base_dir, 
        scratch_dir, 
        slurm_email, 
        ncpus=5
    )
    # print a helpful message
    subprocess_str = sbatch_copy_out["subprocess_out"].decode().replace("\n", "")
    sbatch_job_id = subprocess_str.split(" ")[-1]
    sbatch_copy_fp = sbatch_copy_out["sbatch_out_str"].replace("%j", sbatch_job_id)
    print(subprocess_str)
    print(f"SBATCH output file: {sbatch_copy_fp}")


Submitting job to copy files from /storage01/pbieniek/ccsm/rcp85/hourly to /atlas_scratch/kmredilla/WRF/restack-20km-wrf/ccsm_rcp85 at 2021-10-28 03:34:57
Submitted batch job 3873684
SBATCH output file: /rcs/project_data/wrf_data/slurm/slurm_3873684_ccsm_rcp85_2089.out


### Check progress of copying to scratch

Run the cell below to check the progress of the copy for the current arguments:

In [57]:
# Check to see percentage of required files for the supplied parameters are
wrf_fps = get_wrf_fps(wrf_dir, group, years)
existing_scratch_fps = []
for fp in wrf_fps:
    scratch_fp = scratch_dir.joinpath(group, fp.parent.name, fp.name)
    if scratch_fp.exists():
        if test_file_equivalence(fp, scratch_fp):
            existing_scratch_fps.append(scratch_fp)
        
print(f"{len(existing_scratch_fps)} of {len(wrf_fps)} required WRF output files found in $SCRATCH_DIR.")

6742 of 8760 required WRF output files found in $SCRATCH_DIR.


## 1.2: Restack the data

Now that the WRF outputs are available on the filesystem for faster access, run the restacking scripts.

This is done by creating and submitting `sbatch` jobs as well.

In [None]:
# how to ensure files aren't opened at same time by different processes? Is that bad anyway? Is there handling for that currently?

In [None]:
def sbatch_restack(group, year, variable):
    """Restack the WRF outputs by creating and 
    submitting a sbatch job for a single group,
    year, and variable
    """
#     # pickle the list of fp_pairs for SBATCHing
#     # use years to give meaningful name
#     years = sorted(years)
#     if len(years) == 1:
#         years_fn = years[0]
#     else:
#         years_fn = f"{years[0]}-{years[-1]}"
#     pairs_fp = scratch_dir.joinpath(f"tmp/cp_scratch_{group}_{years_fn}_fp_pairs.pickle")
#     with open(pairs_fp, "wb") as f:
#         pickle.dump(fp_pairs, f)
    
    restack_script = Path(os.getcwd()).joinpath("restack.py")
    
    slurm_dir = base_dir.joinpath("slurm/restack")
    slurm_dir.mkdir(exist_ok=True, parents=True)
    
    # setup command
    sbatch_out_str = f"{str(slurm_dir)}/slurm_restack_%j_{group}_{year}.out"
    head = (
        "#!/bin/sh\n"
        "#SBATCH --nodes=1\n"
        f"#SBATCH --cpus-per-task={ncpus}\n"
        "#SBATCH --account=snap\n"
        "#SBATCH --mail-type=FAIL\n"
        f"#SBATCH --mail-user={slurm_email}\n"
        "#SBATCH -p main\n"
        f'#SBATCH --output {sbatch_out_str}\n'
        'eval "$(conda shell.bash hook)"\n'
        # base conda env has anaconda-project installed
        "conda activate\n"
    )
    command = f"anaconda-project run python {restack_script} -fp {} -n {ncpus}\n"
    
    # write to .slurm script
    sbatch_fp = slurm_dir.joinpath(pairs_fp.name.replace(".pickle", ".slurm"))
    with open(sbatch_fp, "w") as f:
        f.write(head + command)
        
    out = subprocess.check_output(["sbatch", sbatch_fp])
    
    return {"subprocess_out": out, "sbatch_out_str": sbatch_out_str}

In [12]:
index(y)

array([1, 2, 3, 4, 5])

In [14]:
np.interp(index(nans), index(~nans), y[~nans])

array([5.])

In [15]:
test = lambda z: z.nonzero()[0]

<function __main__.<lambda>(z)>

### 1.3 Move stacked from scratch

Should this step be kept?? Could run improve step while on scratch as well...

### 1.4 Remove the WRF outputs from scratch

To clear up space in a $SCRATCH_DIR with limited capacity, remove the WRF outputs that have been completed.

## 2: Improve the data

In [65]:
group_dir.joinpath(str(year)).mkdir(exist_ok=True, parents=True) 

In [51]:
wrf_dir.parent.name

'hist'

In [48]:
test = []
test

True

In [49]:
test

[PosixPath('/storage01/pbieniek/ccsm/hist/hourly/2000/WRFDS_d01.2000-01-01_00.nc'),
 PosixPath('/storage01/pbieniek/ccsm/hist/hourly/2000/WRFDS_d01.2000-01-01_01.nc'),
 PosixPath('/storage01/pbieniek/ccsm/hist/hourly/2000/WRFDS_d01.2000-01-01_02.nc'),
 PosixPath('/storage01/pbieniek/ccsm/hist/hourly/2000/WRFDS_d01.2000-01-01_03.nc'),
 PosixPath('/storage01/pbieniek/ccsm/hist/hourly/2000/WRFDS_d01.2000-01-01_04.nc'),
 PosixPath('/storage01/pbieniek/ccsm/hist/hourly/2000/WRFDS_d01.2000-01-01_05.nc'),
 PosixPath('/storage01/pbieniek/ccsm/hist/hourly/2000/WRFDS_d01.2000-01-01_06.nc'),
 PosixPath('/storage01/pbieniek/ccsm/hist/hourly/2000/WRFDS_d01.2000-01-01_07.nc'),
 PosixPath('/storage01/pbieniek/ccsm/hist/hourly/2000/WRFDS_d01.2000-01-01_08.nc'),
 PosixPath('/storage01/pbieniek/ccsm/hist/hourly/2000/WRFDS_d01.2000-01-01_09.nc'),
 PosixPath('/storage01/pbieniek/ccsm/hist/hourly/2000/WRFDS_d01.2000-01-01_10.nc'),
 PosixPath('/storage01/pbieniek/ccsm/hist/hourly/2000/WRFDS_d01.2000-01-01_1