# Pre-process wflow_sbm forcing for CAMELS-GB in parallel
## CEH-GEAR: pr, CHESS-PE: pet, CHESS-met: tas

In [1]:
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

In [2]:
from glob import glob
from pathlib import Path

import os
import iris
import xarray as xr
import pandas as pd
from esmvalcore.preprocessor import regrid
from pathos.threading import ThreadPool as Pool
from dask.diagnostics import ProgressBar

## Set Paths

In [3]:
# Snellius cluster paths
ROOT = Path('/gpfs/work1/0/wtrcycle/users/jaerts/model_refinement_pub/')
AUXDATA = Path(f"{ROOT}/aux_data")
FORCING = Path(f'{ROOT}/model_forcing/')
MODELS = Path(f'{ROOT}/model_parameters/wflow_sbm/')

# Config

In [4]:
#Time Period
start_year = "2000"
end_year = "2017"

# Get available basin IDs wflow_sbm
basin_dirs = glob(f'{MODELS}/*')
basin_ids = [s.split('/')[-1] for s in basin_dirs]
basin_ids.sort()

# Amount of available cores
cores_available = 30

# Preprocessor Function

In [7]:
# Prepare forcing function
def prep_forcing(basin_id, start_year, end_year):
    print(basin_id)
    # Create lists
    output = []
    output_da = []
    
    # Set basin directory
    BASINDIR = f'{MODELS}/{basin_id}/'
       
    # Open netCDF file as an example grid from the model directory
    cube_example = iris.load(f'{BASINDIR}/staticmaps.nc')[1]

    # Guess bounds   
    cube_example.coord('y').guess_bounds()
    cube_example.coord('x').guess_bounds()
    
    # Rename Coords
    cube_example.coord('y').rename('latitude')
    cube_example.coord('x').rename('longitude')
    
    cube_example.coord('latitude').units = 'degrees'
    cube_example.coord('longitude').units = 'degrees'
    
    # Loop forcing variables
    for variable in ['pr','tas','pet']:

        # Load forcing file
        cube_forcing = iris.load(glob(f'{FORCING}/*{variable}*')[0])[0]
        
        # Guess bounds
        cube_forcing.coord('latitude').guess_bounds()
        cube_forcing.coord('longitude').guess_bounds()

        # Regrid forcing file to example grid using conservative method
        cube_out = regrid(cube_forcing, cube_example, scheme='area_weighted')
        
        # Rename Coords
        cube_out.coord('latitude').rename('lat')
        cube_out.coord('longitude').rename('lon')
        
        # Convert to xarray and append to list
        da = xr.DataArray.from_iris(cube_out)
        output_da.append(da)
    
    # Change annoying long_name time
    for da in output_da:
        da.time.attrs = {'standard_name': 'time',
                         'long_name': 'time in days since 1961-01-01 00:00:00 UTC'}
        output.append(da)
    
    # Merge output variables
    ds = xr.merge(output)#, dim='time')
    ds = ds.rename({'lat': 'y', 'lon':'x', 'pr':'precip','tas': 'temp'})
    
    # Output filename
    output_fname = f'{BASINDIR}/ceh-gear_chess_camels-gb_{basin_id}_2000_2017.nc'
    
    # Remove existing file
    if output_fname:
        OUTPUT = Path(output_fname)
        OUTPUT.unlink(output_fname)
        
    # Save to netcdf
    write_job = ds.to_netcdf(output_fname, compute=False)
    with ProgressBar():
        write_job.compute()

    # ds.to_netcdf(output_fname)
    return print(f'{basin_id} finished: {output_fname}')

# Parallel Run function

In [8]:
def parallel_run(
    basin_ids,
    start_years,
    end_years, 
    threads=cores_available,
    ):
    
    # Set number of threads (cores) used for parallel run and map threads
    if threads is None:
        pool = Pool()
    else:
        pool = Pool(nodes=threads)
    # Run parallel models
    pool.map(
        prep_forcing,
        basin_ids,
        start_years,
        end_years,
        )
    return

## Sort basins by size for lazy parallel

In [9]:
# Sort by basin size
def sort_basin_ids_by_size(basin_ids):
    sizes = []
    for basin_id in basin_ids:
        size = os.path.getsize(f'{MODELS}/{basin_id}/staticmaps.nc')
        sizes.append(size)

    df = pd.DataFrame()
    df['basin_id'] = basin_ids
    df['size'] = sizes
    df = df.sort_values('size')

    basin_ids = df.basin_id.to_list()
    
    return basin_ids

basin_ids_sorted = sort_basin_ids_by_size(basin_ids)

## Create lists for lazy parallel

In [10]:
# Create lists for parallel runs
start_years = [start_year] * len(basin_ids_sorted)
end_years = [end_year] * len(basin_ids_sorted)

# Run parallel function

In [None]:
# Run function
parallel_run(basin_ids_sorted, start_years, end_years)

4700118011

25001
21008
28093
28074
27087
39006
54008
55007
40003
4001
27007
67015
62001
27080
84004
27021
67025
56001
71001
28012
27071
54029
8005
33039
27002
85001
39008
21006
[                                        ] | 0% Completed | 20.22 mss
[                                        ] | 0% Completed | 77.23 sss
[                                        ] | 0% Completed | 148.95 ss
[#                                       ] | 4% Completed | 194.97 ss
[                                        ] | 0% Completed | 180.91 ss
[                                        ] | 0% Completed | 24.51 s s
[########################################] | 100% Completed | 13.19 s
[#######                                 ] | 17% Completed | 260.39 s
[                                        ] | 0% Completed | 17.20 s s

# Check output completed runs

In [11]:
df = pd.DataFrame()
basins = []
exists = []

for basin_id in basin_ids_sorted:
    basins.append(basin_id)
   
    # check if file exists
    file = Path(f'{MODELS}/{basin_id}/ceh-gear_chess_camels-gb_{basin_id}_2000_2017.nc')
    exists.append(file.is_file())
    
df['basin_id'] = basins
df['completed'] = exists
df = df[df['completed'] == False]    
df 

Unnamed: 0,basin_id,completed
