# Ethiopia Random Forest Model

## 1. Import Libraries

In [1]:
import os
import gc
import sys
import glob
import random
import logging
import argparse
from tqdm import tqdm

import joblib
import numpy as np
import xarray as xr
import pandas as pd
import rasterio as rio
import rasterio.features as riofeat
from osgeo import gdal, osr

from cuml.model_selection import train_test_split
from sklearn.metrics import accuracy_score, \
    precision_score, recall_score, f1_score

import cupy as cp
import cudf as cf
import cuml as cl

from cupyx.scipy.ndimage import median_filter
from cuml.metrics import accuracy_score
from cuml.dask.common import utils as dask_utils
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import dask_cudf

from cuml.dask.ensemble import RandomForestClassifier as cumlDaskRF
from sklearn.ensemble import RandomForestClassifier as sklRF

## Start Dask Cluster

In [2]:
# This will use all GPUs on the local host by default
cluster = LocalCUDACluster(threads_per_worker=1)
c = Client(cluster)

# Query the client for all connected workers
workers = c.has_what().keys()
n_workers = len(workers)
n_streams = 8 # Performance optimization
print(n_workers)

distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize


4


## Define Parameters

In addition to the number of examples, random forest fitting performance depends heavily on the number of columns in a dataset and (especially) on the maximum depth to which trees are allowed to grow. Lower max_depth values can greatly speed up fitting, though going too low may reduce accuracy.

In [3]:
pipeline_step = ['train', 'predict']

In [4]:
# Data parameters
train_csv = '/att/pubrepo/ILAB/projects/Ethiopia/ethiopia-lcluc/data/random_forest/train_data.csv'
seed = 24
train_size = 0.80
max_feat = 'log2'

# Random Forest building parameters
n_trees = 100
max_feat = 'log2'
max_depth = 24 # 12 - bad
n_bins = 16
n_trees = 10000

## Read data csv file

In [5]:
assert os.path.exists(train_csv), f'{train_csv} not found.'
data_df = dask_cudf.read_csv(train_csv, sep=',')
assert not data_df.isnull().values.any(), f'Na found: {train_csv}'
print(data_df.compute(), type(data_df))

           CB     B     G     Y     R    RE  NIR1  NIR2  CLASS
0        1037   870   966  1049  1084  1540  1810  1627      0
1         992   722   614   626   638   755   825   749      0
2        1050   794   828   807   732  1596  2398  2112      0
3        1089  1029  1239  1385  1526  2216  2913  2684      0
4        1122   886   907   948   904  1498  1997  1762      0
...       ...   ...   ...   ...   ...   ...   ...   ...    ...
1746512  1479  1361  1372  1490  1779  2079  2515  2522      5
1746513  1384  1193  1195  1262  1460  1946  2336  2248      5
1746514  1609  1631  1699  1945  2307  2667  3166  3025      5
1746515  1649  1611  1644  1763  2009  2244  2491  2340      5
1746516  1568  1385  1400  1604  1766  1955  2084  1908      5

[1746517 rows x 9 columns] <class 'dask_cudf.core.DataFrame'>


## Shuffle and Split Dataset

In [6]:
data_df = data_df.sample(frac=1).reset_index(drop=True).compute()

In [7]:
# dask_cudf does not support iloc operations, the objects gett converted to plain cudf
x = data_df.iloc[:, :-1].astype(np.float32)
y = data_df.iloc[:, -1].astype(np.int32)

In [8]:
X_train, X_test, y_train, y_test = train_test_split(x, y, train_size=train_size)
del data_df, x, y
print(f'X_train: {X_train.shape[0]} elements')
print(f'X_test:  {X_test.shape[0]} elements')
print(f'y_train: {y_train.shape[0]} elements')
print(f'y_test:  {y_test.shape[0]} elements')

X_train: 1397213 elements
X_test:  349304 elements
y_train: 1397213 elements
y_test:  349304 elements


## Distribute data to worker GPUs

In [9]:
n_partitions = n_workers

def distribute(X, y):

    # Partition with Dask
    # In this case, each worker will train on 1/n_partitions fraction of the data
    X_dask = dask_cudf.from_cudf(X, npartitions=n_partitions)
    y_dask = dask_cudf.from_cudf(y, npartitions=n_partitions)

    # Persist to cache the data in active memory
    X_dask, y_dask = \
      dask_utils.persist_across_workers(c, [X_dask, y_dask], workers=workers)
    
    return X_dask, y_dask

X_train_dask, y_train_dask = distribute(X_train, y_train)
X_test_dask, y_test_dask = distribute(X_test, y_test)

## Train the distributed cuML model

In [10]:
%%time

cuml_model = cumlDaskRF(max_depth=max_depth, n_estimators=n_trees, n_bins=n_bins, n_streams=n_streams)
cuml_model.fit(X_train_dask, y_train_dask)
wait(cuml_model.rfs) # Allow asynchronous training tasks to finish

CPU times: user 4.49 s, sys: 2.02 s, total: 6.51 s
Wall time: 1min 13s


DoneAndNotDoneFutures(done={<Future: finished, type: cuml.ensemble.randomforestclassifier.RandomForestClassifier, key: _construct_rf-78ea3495-b750-42b1-97b4-e8153f5b3fb5>, <Future: finished, type: cuml.ensemble.randomforestclassifier.RandomForestClassifier, key: _construct_rf-10dc7cad-abad-4692-965c-3f16af863a93>, <Future: finished, type: cuml.ensemble.randomforestclassifier.RandomForestClassifier, key: _construct_rf-bf153b50-3587-4076-bb15-bcf5a5f66003>, <Future: finished, type: cuml.ensemble.randomforestclassifier.RandomForestClassifier, key: _construct_rf-68c62995-9682-454c-9233-44db633384b5>}, not_done=set())

## Predict and Validate Accuracy

In [None]:
cuml_y_pred = cuml_model.predict(X_test_dask).compute().to_array()

# Due to randomness in the algorithm, you may see slight variation in accuracies
print("CuML accuracy:     ", accuracy_score(y_test, cuml_y_pred))

In [None]:
print(cuml.__version__)

In [None]:
"""
        # ------------------------------------------------------------------
        # 3. Instantiate RandomForest object - FIX this area
        # ------------------------------------------------------------------
        if args.has_gpu:  # run using RAPIDS library

            # initialize cudf data and log into GPU memory
            logging.info('Training model via RAPIDS.')

            # single gpu setup
            x_train = cf.DataFrame.from_pandas(x_train)
            x_test = cf.DataFrame.from_pandas(x_test)
            y_train = cf.Series(y_train.values)
            rf_funct = cumlRFC  # RF Classifier

            # TODO: multi gpu setup
            # https://github.com/rapidsai/cuml/blob/branch-21.12/notebooks/random_forest_mnmg_demo.ipynb
            # cluster = LocalCUDACluster(
            # threads_per_worker=1, n_workers=n_workers)
            # c = Client(cluster)
            # workers = c.has_what().keys()
            # rf_funct = cumlRFC_mg


        # ------------------------------------------------------------------
        # 4. Fit Model
        # ------------------------------------------------------------------
        # fit model to training data and predict for accuracy score
        rf_model.fit(x_train, y_train)

        if args.has_gpu:
            acc_score = accuracy_score(
                y_test, rf_model.predict(x_test).to_array())
            p_score = precision_score(
                y_test, rf_model.predict(x_test).to_array(), average='macro')
            r_score = recall_score(
                y_test, rf_model.predict(x_test).to_array(), average='macro')
            f_score = f1_score(
                y_test, rf_model.predict(x_test).to_array(), average='macro')
        else:
            acc_score = accuracy_score(y_test, rf_model.predict(x_test))
            p_score = precision_score(y_test, rf_model.predict(x_test), average='macro')
            r_score = recall_score(y_test, rf_model.predict(x_test), average='macro')
            f_score = f1_score(y_test, rf_model.predict(x_test), average='macro')

        logging.info(f'Test Accuracy:  {acc_score}')
        logging.info(f'Test Precision: {p_score}')
        logging.info(f'Test Recall:    {r_score}')
        logging.info(f'Test F-Score:   {f_score}')

        # make output directory
        os.makedirs(
            os.path.dirname(os.path.realpath(args.output_pkl)), exist_ok=True)

        # export model to file
        try:
            joblib.dump(rf_model, args.output_pkl)
            logging.info(f'Model has been saved as {args.output_pkl}')
        except Exception as e:
            logging.error(f'ERROR: {e}')
"""