In [None]:
import pickle, os, io, time
from ftplib import FTP
import numpy as np
import xarray as xr
import geopandas as gpd
import xagg as xa
import re
from multiprocessing import Pool, cpu_count
import itertools

# read in the ADM1-levle shapefile - NOTE: the augmented shapefile is created in src/prepare_weights_and_shapefiles.R and includes ADM0 territories w/o ADM1 sub-regions
ds_gdf = gpd.read_file("../../sharepoint/Data/GADM/gadm36_levels_simple_augmented/adm1_augmented.shp")

# define an aggregation function
def aggregate(inpath, outpath, weightmap=None, gdf_regions=None):
    if os.path.exists(outpath):
        print('No aggregation carried out as ' + outpath + 'already exists')
        return None
    
    if weightmap is None:
        if gdf_regions is None:
            gdf_regions = gpd.read_file("../../sharepoint/Data/GADM/gadm36_levels_simple_augmented/adm1_augmented.shp")
        
        ds_imp = xr.open_dataset(inpath)
        weightmap = xa.pixel_overlaps(ds_imp, gdf_regions, subset_bbox=False)
                
    try:
        ds_imp = xr.open_dataset(inpath)
        aggregated = xa.aggregate(ds_imp, weightmap)
        aggregated.to_netcdf(outpath)
    except Exception as ex:
        print(f"Could not convert {filepath}.")
        print(ex)
        
    # Test it!
    ds_test = xr.open_dataset(outpath)
    if np.max(getattr(ds_test, list(ds_test.keys())[-1])) > 1e100:
        print("Failed check.")
        exit() # Check this
        os.unlink(outpath)

In [None]:
## create the weightmap - COMMENTED OUT TO AVOID EXPENSIVE RE-RUNS
#ds_example = xr.open_dataset('/net/argon/landclim/fbatibeniz/clim_indices/data/cmip6-ng/pr/ann_totd_regrid/ann_totd_regrid_pr_UKESM1-0-LL_ssp370_r1i1p1f2_g025.nc')
#weightmap_era5_to_adm1 = xa.pixel_overlaps(ds_example, ds_gdf, subset_bbox=False)
#print("weightmap calculated!")
## for saving out
#file_weightmap = open('../../sharepoint/Data/GADM/gadm36_levels_simple_augmented/weightmap_era5_to_adm1.p', 'wb') 
#pickle.dump(weightmap_era5_to_adm1, file_weightmap)

In [None]:
# for loading
file_weightmap = '../../sharepoint/Data/GADM/gadm36_levels_simple_augmented/weightmap_era5_to_adm1.p'
weightmap_era5_to_adm1 = pickle.load(open(file_weightmap, 'rb'))
weightmap_era5_to_adm1

In [None]:
# define a function to find all file names for a specific climate variable
def get_filepaths_per_variable(varname, indir='/net/argon/landclim/fbatibeniz/clim_indices/data/cmip6-ng'):
    # define the variable prefix (pr for precip, tas for temperature) - this is needed to reproduce Fulden's directory system
    if bool(re.search(r'ann_mean_regrid|dtd_var_regrid|bc_ann_mean_regrid|bc_dtd_var_regrid', varname)):
        varname_prefix = 'tas'
    elif bool(re.search(r'ann_totd_regrid|btstrp_pr999_regrid|mon_totd_regrid|wet_days_regrid|bc_ann_totd_regrid|bc_btstrp_pr999_regrid|bc_mon_totd_regrid|bc_wet_days_regrid', varname)):
        varname_prefix = 'pr'
    else:
        raise ValueError('varname did not match any of the reg ex fo climate index variable names')

    # read in all filenames and throw an error if target folder is empty
    indir_varspecific = indir + '/' + varname_prefix + '/' + varname
    filenames = os.listdir(indir_varspecific)
    if len(filenames) == 0:
        raise Exception('The directory ' + indir_varspecific + 'is empty. Please inspect')
        
    # subset to netCDF files based on the ".nc" ending to avoid matching temporary files etc.
    nc_files = [filename for filename in filenames if filename.endswith(".nc")]

    return([indir_varspecific + '/' + x for x in nc_files])

In [None]:
# peak at the files - COMMENTED OUT, ONLY FOR MANUAL INSPECTION
# print(os.listdir('/net/argon/landclim/fbatibeniz/clim_indices/data/cmip6-ng/pr'))
# get_filepaths_per_variable("mon_totd_regrid")

In [None]:
# write a wrapper around aggregate to facilitate parallel processing
def aggregate_parallel(inpath, outdir='../../sharepoint/Data/Climate projections/cmip6_ng_adm1_new', weightmap=None, gdf_regions=None):
    # extract the variable name based on a regex
    varname = re.findall('(?<=\\/pr\\/|tas\\/).+(?=\\/)', inpath)[0]
    filename = re.findall('(?<=' + varname + '\\/).+$', inpath)[0]
    
    # create a folder named to the value of varname under the outdir directory unless it already exists
    try:
        os.mkdir(outdir + '/' + varname)
    except:
        pass
    
    # aggregate
    aggregate(inpath, outdir + '/' + varname + '/' + filename, weightmap, gdf_regions)

In [None]:
# run the aggregate_parallel() function in parallel on all filenames - NOTE: warnings by the xagg package about NaN values in the time dimension are limited to the CAMS model which features NA values in 2100 only
# NOTE: using parallel processing for monthly precip totals maxes out the memory, hence we omit this indicator here and use a for-loop below
pool = Pool()
files_bc = [*get_filepaths_per_variable('bc_ann_mean_regrid'),
            *get_filepaths_per_variable('bc_dtd_var_regrid'),
            *get_filepaths_per_variable('bc_ann_totd_regrid'),
            #*get_filepaths_per_variable('bc_mon_totd_regrid'),
            *get_filepaths_per_variable('bc_wet_days_regrid'),
            *get_filepaths_per_variable('bc_btstrp_pr999_regrid')]
pool.starmap(aggregate_parallel, zip(files_bc, itertools.repeat('../../sharepoint/Data/Climate projections/cmip6_ng_adm1_new'), itertools.repeat(weightmap_era5_to_adm1)))
pool.close()

In [None]:
# repeat for raw CMIP6 data w/o bias correction - NOTE: warnings by the xagg package about NaN values in the time dimension are limited to the CAMS model which features NA values in 2100 only
# NOTE: using parallel processing for monthly precip totals maxes out the memory, hence we omit this indicator here and use a for-loop below
pool = Pool()
files_original = [*get_filepaths_per_variable('ann_mean_regrid'),
            *get_filepaths_per_variable('dtd_var_regrid'),
            *get_filepaths_per_variable('ann_totd_regrid'),
            #*get_filepaths_per_variable('mon_totd_regrid'),
            *get_filepaths_per_variable('wet_days_regrid'),
            *get_filepaths_per_variable('btstrp_pr999_regrid')]

pool.starmap(aggregate_parallel, zip(files_original, itertools.repeat('../../sharepoint/Data/Climate projections/cmip6_ng_adm1_new'), itertools.repeat(weightmap_era5_to_adm1)))
pool.close()

In [None]:
# run a for loop for monthly totals (raw CMIP6)
for file_jj in [*get_filepaths_per_variable('mon_totd_regrid')]:
    print(file_jj)
    aggregate_parallel(file_jj, outdir='../../sharepoint/Data/Climate projections/cmip6_ng_adm1_new', weightmap = weightmap_era5_to_adm1)

In [None]:
# run a for loop for bias-corrected monthly totals
for file_jj in [*get_filepaths_per_variable('bc_mon_totd_regrid')]:
    print(file_jj)
    aggregate_parallel(file_jj, outdir='../../sharepoint/Data/Climate projections/cmip6_ng_adm1_new', weightmap = weightmap_era5_to_adm1)

In [None]:
# re-run w/o parallel processing to ensure that all files were captured (will skip files that already exist, so runtime is minimal unless some files were missed in the previous steps)
for file_jj in [*get_filepaths_per_variable('ann_mean_regrid'),
            *get_filepaths_per_variable('dtd_var_regrid'),
            *get_filepaths_per_variable('ann_totd_regrid'),
            *get_filepaths_per_variable('mon_totd_regrid'),
            *get_filepaths_per_variable('wet_days_regrid'),
            *get_filepaths_per_variable('btstrp_pr999_regrid'),
            *get_filepaths_per_variable('bc_ann_mean_regrid'),
            *get_filepaths_per_variable('bc_dtd_var_regrid'),
            *get_filepaths_per_variable('bc_ann_totd_regrid'),
            *get_filepaths_per_variable('bc_mon_totd_regrid'),
            *get_filepaths_per_variable('bc_wet_days_regrid'),
            *get_filepaths_per_variable('bc_btstrp_pr999_regrid')]:
    print(file_jj)
    aggregate_parallel(file_jj, outdir='../../sharepoint/Data/Climate projections/cmip6_ng_adm1_new', weightmap = weightmap_era5_to_adm1)