# Predict carbon with quantile random forest models

In [None]:
# Libraries
import os,  shutil, time
import numpy as np
import pandas as pd
import dask.dataframe as dd
from quantile_forest import RandomForestQuantileRegressor

In [None]:
# Directories
dir02 = '../paper_deficit/output/02_dbase/'
dir03 = '../paper_deficit/output/03_rf/'
dir03p = '../paper_deficit/output/03_rf/files_predicted/'

In [None]:
# Don't show warnings
# Dask gives "UserWarning: Sending large graph ..."
import warnings
warnings.filterwarnings('ignore')

---

In [None]:
# Libraries
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
import dask

# Initialize dask
cluster = SLURMCluster(
    queue='compute',                      # SLURM queue to use
    cores=24,                             # Number of CPU cores per job
    memory='256 GB',                      # Memory per job
    account='bm0891',                     # Account allocation
    interface="ib0",                      # Network interface for communication
    walltime='08:00:00',                  # Maximum runtime per job
    local_directory='../dask/',           # Directory for local storage
    job_extra_directives=[                # Additional SLURM directives for logging
        '-o ../dask/LOG_worker_%j.o',     # Output log
        '-e ../dask/LOG_worker_%j.e'      # Error log
    ]
)

# Scale dask cluster
cluster.scale(jobs=2)

# Configurate dashboard url
dask.config.config.get('distributed').get('dashboard').update(
    {'link': '{JUPYTERHUB_SERVICE_PREFIX}/proxy/{port}/status'}
)

# Create client
client = Client(cluster)

client

In [None]:
# Read dbase file
df_dbase = dd.read_parquet(dir02 + 'df_dbase.parquet') \
    .repartition(partition_size='1000 MiB') \
    .persist()

# Explanatory variables
vars_exp = ['geom90m_convergence', 'geom90m_cti', 'geom90m_eastness',
            'geom90m_northness', 'geom90m_slope', 'geom90m_spi',
            'soilgrids2017_bdricm', 'soilgrids2017_bdrlog',
            'soilgrids2017_bdticm', 
            'soilgrids2020_cec', 'soilgrids2020_cfvo', 'soilgrids2020_clay', 
            'soilgrids2020_phh2o', 'soilgrids2020_sand', 'soilgrids2020_silt',
            'worldclim_bio1', 'worldclim_bio3', 'worldclim_bio4',
            'worldclim_bio5', 'worldclim_bio6', 'worldclim_bio12', 
            'worldclim_bio13', 'worldclim_bio14', 'worldclim_bio15', 
            'worldclim_elev']

In [None]:
def predict_qrfr(var_tar, scen):

    """Predict carbon densities using Quantile Random Forest Regression with 
        best performing parameter combination"""
    
    # Output file name
    file_out = f'df_rfqpred_{var_tar}_{scen}.parquet'

In [None]:
def quantile_random_forest_predict(var_tar, scen):

    """Predict carbon densities using Quantile Random Forest Regression with 
        best performing parameter combination"""
    
    # Output file name
    file_out = f'df_rfqpred_{var_tar}_{scen}.parquet'
    # Delete intermediate file directory from previous run if exist
    if os.path.exists(os.path.join(dir03p, 'interm')):
        shutil.rmtree(os.path.join(dir03p, 'interm'))    
    # Create intermediate file directory
    os.mkdir(os.path.join(dir03p, 'interm'))
    # Delete qrfr file from old run if exists
    if os.path.exists(os.path.join(dir03p, file_out)):
        os.remove(os.path.join(dir03p, file_out))
    
    # Get training data
    df_train = df_dbase[df_dbase['train_' + scen] == True]
        
    # Define x_train and y_train 
    X_train = df_train[vars_exp].to_dask_array(lengths=True).compute()
    y_train = df_train[var_tar].to_dask_array(lengths=True).compute()
    
    npartitions = 5000 #5000
    npgroups = 50 #50
    # For testing uncomment next two lines 
    #npartitions = 200
    #df_dbase = dd.from_pandas(df_dbase.head(2000000), npartitions = npartitions)
    
    # Extract variables, repartition, and export
    df_dbase[vars_exp] \
        .repartition(npartitions=npartitions) \
        .to_parquet(os.path.join(dir03p,'interm', 'df_dbase_qrfr_interm.parquet')) 
    
    # Read extratcted variables
    X_predict = dd.read_parquet(
        os.path.join(dir03p,'interm', 'df_dbase_qrfr_interm.parquet')) \
        .to_dask_array(lengths=True) \
        .persist()
    
    # Get parameters of model with best performance
    file_params =  f'df_params_rank_{var_tar}_{scen}.csv'
    df_params = pd.read_csv(os.path.join(dir03, 'files_params', file_params))
    
    df_params_sel = df_params[df_params.rank_test_score == 1]
    
    # Define random forest regressor
    rfqr = RandomForestQuantileRegressor(
        min_samples_leaf=df_params_sel.min_samples_leaf.item(),
        max_features=df_params_sel.max_features.item(),
        n_estimators=df_params_sel.n_estimators.item(),
        random_state=df_params_sel.random_state.item(),
        n_jobs=-1, 
        criterion='squared_error')
    
    # Fit random forest regressor
    rfqr.fit(X_train, y_train, sparse_pickle=True)
    
    # List with each partition
    list_partitions = [X_predict.partitions[i] 
                       for i in range(0, X_predict.npartitions)]
    
    # Group partitions in groups of npgroups
    list_partition_groups = [list_partitions[i:i + npgroups] 
                             for i in range(0, len(list_partitions), npgroups)]
    
    # predict values of one partition
    def predict_partition(p):
        return rfqr.predict(p, quantiles=[0.05, 0.1, 0.5, 0.9, 0.95])
    
    # predict values of partition group (npgroups partitions) and return dataframe
    def predict_pgroup(pgroup):
        a = dask.compute([dask.delayed(predict_partition)(i.persist()) for i in pgroup])
        b = a[0][0]
    
        for i in range(1, len(a[0])):
            b = np.append(b, a[0][i],  axis=0)
    
        return pd.DataFrame(b, columns=['qrfr_005', 'qrfr_010', 'qrfr_050', 'qrfr_090', 'qrfr_095'])
    
    # predict partition groups
    for i in range(0, len(list_partition_groups)):
        file_interm = f'df_qrfr_xpredict_interm_{i}.parquet'
        predict_pgroup(list_partition_groups[i]) \
            .to_parquet(os.path.join(dir03p, 'interm', file_interm))
    
    # Concat and export
    def qrfr_interm_concat():
    
        # Import and concat interm files
        a = pd.read_parquet(
            os.path.join(dir03p, 'interm', f"df_qrfr_xpredict_interm_0.parquet"))
        
        for i in range(1, len(list_partition_groups)):
            file_in = f'df_qrfr_xpredict_interm_{i}.parquet'
            b = pd.read_parquet(
                    os.path.join(dir03p, 'interm', file_in))
            a = pd.concat([a, b], ignore_index=True)
        
        # Dataframe with lat and lon and qrfr values
        c = df_dbase[['lat', 'lon']].compute().reset_index().drop('index', axis=1)
        c['qrfr_005'] = a.qrfr_005
        c['qrfr_010'] = a.qrfr_010
        c['qrfr_050'] = a.qrfr_050
        c['qrfr_090'] = a.qrfr_090
        c['qrfr_095'] = a.qrfr_095
        
        # Export
        c.to_parquet(os.path.join(dir03p, file_out))
    
    qrfr_interm_concat()

In [None]:
# Predict values of agbc
for var_tar in ['agbc_min', 'agbc_mean', 'agbc_max']:
    for scen in ['prim', 'secd']:
        %time quantile_random_forest_predict(var_tar, scen)

In [None]:
# Predict values of bgbc
for var_tar in ['bgbc_min', 'bgbc_mean', 'bgbc_max']:
    for scen in ['prim', 'secd']:
        %time quantile_random_forest_predict(var_tar, scen)

In [None]:
# Predict values of soc
for var_tar in ['soc_min', 'soc_mean', 'soc_max']:
    for scen in ['prim', 'secd']:
        %time quantile_random_forest_predict(var_tar, scen)

In [None]:
cluster.close()