# Ensembling with Triton-FIL

## Introduction
This notebook will go through step-by-step the process of training an ensemble of models and deploying it to Triton's new FIL backend. Additionally, we will investigate how to handle an ensemble of models with Triton using the python backend.
## Pre-Requisites
This notebook assumes that you have Docker plus a few Python dependencies. To install all of these dependencies in a conda environment, you may make use of the following conda environment file:
```yaml
---
name: triton_ensemble_nb
channels:
  - conda-forge
  - nvidia
  - rapidsai
dependencies:
  - conda-pack
  - cudatoolkit=11.4
  - cudf=21.12
  - cuml=21.12
  - cupy
  - jupyter
  - kaggle
  - matplotlib
  - numpy
  - pandas
  - pip
  - python=3.8
  - scikit-learn
  - pip:
      - treelite=2.3.0
      - tritonclient[all]
      - xgboost>=1.5,<1.6
```

In [None]:
TRITON_IMAGE = 'nvcr.io/nvidia/tritonserver:22.05-py3'

In [None]:
!docker pull {TRITON_IMAGE}

## Fetching Training Data
For this example, we will make use of data from the [IEEE-CIS Fraud Detection](https://www.kaggle.com/c/ieee-fraud-detection/overview) Kaggle competition. You may fetch the data from this competition using the Kaggle command line client using the following commands.


**NOTE**: You will need to make sure that your Kaggle credentials are [available](https://github.com/Kaggle/kaggle-api#api-credentials) either through a kaggle.json file or via environment variables.

In [None]:
!kaggle competitions download -c ieee-fraud-detection
!unzip -u ieee-fraud-detection.zip
train_csv = 'train_transaction.csv'

## Training Example Models
While the IEEE-CIS Kaggle competition focused on a more sophisticated problem involving analysis of both fraudulent transactions and the users linked to those transactions, we will use a simpler version of that problem (identifying fraudulent transactions only) to build our example model. In the following steps, we make use of cuML's preprocessing tools to clean the data and then train two example models using XGBoost. Note that we will be making use of the new categorical feature support in XGBoost 1.5. If you wish to use an earlier version of XGBoost, you will need to perform a [label encoding](https://docs.rapids.ai/api/cuml/stable/api.html?highlight=labelencoder#cuml.preprocessing.LabelEncoder.LabelEncoder) on the categorical features.

In [None]:
import cudf
import cupy as cp
from cuml.preprocessing import SimpleImputer, LabelEncoder
from sklearn.model_selection import train_test_split

SEED=0

In [None]:
# Reading data
data = cudf.read_csv(train_csv)

# Replace NaNs in data
nan_columns = data.columns[data.isna().any().to_pandas()]
float_nan_subset = data[nan_columns].select_dtypes(include='float64')

imputer = SimpleImputer(missing_values=cp.nan, strategy='mean')
data[float_nan_subset.columns] = imputer.fit_transform(float_nan_subset)

obj_nan_subset = data[nan_columns].select_dtypes(include='object')
data[obj_nan_subset.columns] = obj_nan_subset.fillna('UNKNOWN')

In [None]:
# Perform label encoding
cat_columns = data.select_dtypes(include='object')
for col in cat_columns.columns:
    data[col] = LabelEncoder().fit_transform(data[col])

In [None]:
# Split data into training and testing sets
X = data.drop('isFraud', axis=1)
y = data.isFraud.astype(int)
X_train, X_test, y_train, y_test = train_test_split(
    X.to_pandas(), y.to_pandas(), test_size=0.3, stratify=y.to_pandas(), random_state=SEED
)
# Copy data to avoid slowdowns due to fragmentation
X_train = X_train.copy()
X_test = X_test.copy()

In [None]:
import xgboost as xgb

In [None]:
# Define model training functions
def train_model_logistic(num_trees, max_depth):
    model = xgb.XGBClassifier(
        tree_method='gpu_hist',
        enable_categorical=False,
        use_label_encoder=False,
        predictor='gpu_predictor',
        eval_metric='aucpr',
        objective='binary:logistic',
        max_depth=max_depth,
        n_estimators=num_trees
    )
    model.fit(
        X_train,
        y_train,
    )
    return model

In [None]:
from imblearn.over_sampling import RandomOverSampler

In [None]:
def train_model_oversample(num_trees, max_depth):
    model = xgb.XGBClassifier(
        tree_method='gpu_hist',
        enable_categorical=False,
        use_label_encoder=False,
        predictor='gpu_predictor',
        eval_metric='aucpr',
        objective='binary:logistic',
        max_depth=max_depth,
        n_estimators=num_trees
    )
    
    oversample = RandomOverSampler(sampling_strategy=0.5) # Define oversampling strategy
    X_over, y_over = oversample.fit_resample(X_train, y_train)
    
    model.fit(
        X_over,
        y_over,
    )
    return model

In [None]:
from sklearn.ensemble import RandomForestClassifier

In [None]:
def train_model_RFC(num_trees, max_depth):
    model = RandomForestClassifier(
        n_estimators=num_trees,
        max_depth=max_depth,
    )
    model.fit(
        X_train,
        y_train,
    )
    return model

In [None]:
model_logistic = train_model_logistic(1500, 14)

In [None]:
model_oversample = train_model_oversample(500, 14)

In [None]:
model_RFC = train_model_RFC(40, 16)

In [None]:
# Free up some room on the GPU by explicitly deleting dataframes
import gc
del data
del nan_columns
del float_nan_subset
del imputer
del obj_nan_subset
del cat_columns
del X
del y
gc.collect()

## Deploying Models in Triton
Now that we have two example models to work with, let's actually deploy them for real-time serving using Triton. In order to do so, we will need to first serialize the models in the directory structure that Triton expects and then add configuration files to tell Triton exactly how we wish to use these models.

### Model Serialization
Triton models can be stored locally on disk or in S3, Google Cloud Storage, or Azure Storage. For this example, we will stick to local storage, but information about using cloud storage solutions can be found [here](https://github.com/triton-inference-server/server/blob/main/docs/model_repository.md). Each model has a dedicated directory within a main model repository directory. Multiple versions of a model can also be served by Triton, as indicated by numbered directories (see below).

In [None]:
import os
import treelite
import pickle

# Create the model repository directory. The name of this directory is arbitrary.
REPO_PATH = os.path.abspath('model_repository')
os.makedirs(REPO_PATH, exist_ok=True)

# We will use the following variables to record information from the serialization
# process that we will require later
model_path = None
model_format = None

In [None]:
def serialize_model_xgb(model, model_name):
    # The name of the model directory determines the name of the model as reported
    # by Triton
    model_dir = os.path.join(REPO_PATH, model_name)
    # We can store multiple versions of the model in the same directory. In our
    # case, we have just one version, so we will add a single directory, named '1'.
    version_dir = os.path.join(model_dir, '1')
    os.makedirs(version_dir, exist_ok=True)
    
    # The default filename for XGBoost models saved in json format is 'xgboost.json'.
    # It is recommended that you use this filename to avoid having to specify a
    # name in the configuration file.
    model_file = os.path.join(version_dir, 'xgboost.json')
    model.save_model(model_file)
    
    return model_dir

In [None]:
def serialize_model_skl(model, model_name):
    model_dir = os.path.join(REPO_PATH, model_name)
    version_dir = os.path.join(model_dir, '1')
    os.makedirs(version_dir, exist_ok=True)
    
    archival_path = os.path.join(version_dir, 'model.pkl')
    with open(archival_path,"wb") as f:
        pickle.dump(model, f)
    
    # This is the default filename expected for Treelite checkpoint models. It is recommended
    # that you stick with the default to avoid additional configuration.
    model_basename = 'checkpoint.tl'
    model_file = os.path.join(version_dir, model_basename)
        
    tl_model = treelite.sklearn.import_model(model)
    tl_model.serialize(model_file)
    
    return model_dir

We will be deploying two copies of each of our example models: one on CPU and one on GPU. We will use these separate instances to demonstrate the performance differences between GPU and CPU execution later on.

In [None]:
model_logistic_dir = serialize_model_xgb(model_logistic, 'model_logistic')
model_logistic_cpu_dir = serialize_model_xgb(model_logistic, 'model_logistic-cpu')
model_oversample_dir = serialize_model_xgb(model_oversample, 'model_oversample')
model_oversample_cpu_dir = serialize_model_xgb(model_oversample, 'model_oversample-cpu')
model_RFC_dir = serialize_model_skl(model_RFC, 'model_RFC')
model_RFC_cpu_dir = serialize_model_skl(model_RFC, 'model_RFC-cpu')

### The Configuration File
The configuration file associated with a model tells Triton a little bit about the model itself and how you would like to use it. You can read about all generic Triton configuration options [here](https://github.com/triton-inference-server/server/blob/master/docs/model_configuration.md) and about configuration options specific to the FIL backend [here](https://github.com/triton-inference-server/fil_backend#configuration), but we will focus on just a few of the most common and relevant options in this example. Below are general descriptions of these options:
- **max_batch_size**: The maximum batch size that can be passed to this model. In general, the only limit on the size of batches passed to a FIL backend is the memory available with which to process them. For GPU execution, the available memory is determined by the size of Triton's CUDA memory pool, which can be set via a command line argument when starting the server.
- **input**: Options in this section tell Triton the number of features to expect for each input sample.
- **output**: Options in this section tell Triton how many output values there will be for each sample. If the "predict_proba" option (described further on) is set to true, then a probability value will be returned for each class. Otherwise, a single value will be returned indicating the class predicted for the given sample.
- **instance_group**: This determines how many instances of this model will be created and whether they will use the GPU or CPU.
- **model_type**: A string indicating what format the model is in ("xgboost_json" in this example, but "xgboost", "lightgbm", and "tl_checkpoint" are valid formats as well).
- **predict_proba**: If set to true, probability values will be returned for each class rather than just a class prediction.
- **output_class**: True for classification models, false for regression models.
- **threshold**: A score threshold for determining classification. When output_class is set to true, this must be provided, although it will not be used if predict_proba is also set to true.
- **storage_type**: In general, using "AUTO" for this setting should meet most usecases. If "AUTO" storage is selected, FIL will load the model using either a sparse or dense representation based on the approximate size of the model. In some cases, you may want to explicitly set this to "SPARSE" in order to reduce the memory footprint of large models.

Based on this information, let's set up configuration files for our models.

In [None]:
# Maximum size in bytes for input and output arrays. If you are
# using Triton 21.11 or higher, all memory allocations will make
# use of Triton's memory pool, which has a default size of
# 67_108_864 bytes. This can be increased using the
# `--cuda-memory-pool-byte-size` option when the server is
# started, but this notebook should work fine with default
# settings.
MAX_MEMORY_BYTES = 60_000_000

In [None]:
features = X_test.shape[1]
num_classes = cp.unique(y_test).size
bytes_per_sample = (features + num_classes) * 4
max_batch_size = MAX_MEMORY_BYTES // bytes_per_sample

In [None]:
def generate_config(model_dir, model_format, storage_type, deployment_type='gpu'):
    if deployment_type.lower() == 'cpu':
        instance_kind = 'KIND_CPU'
    else:
        instance_kind = 'KIND_GPU'
        
    config_text = f"""backend: "fil"
max_batch_size: {max_batch_size}
input [                                 
 {{  
    name: "input__0"
    data_type: TYPE_FP32
    dims: [ {features} ]                    
  }} 
]
output [
 {{
    name: "output__0"
    data_type: TYPE_FP32
    dims: [ {num_classes} ]
  }}
]
instance_group [{{ kind: {instance_kind} }}]
parameters [
  {{
    key: "model_type"
    value: {{ string_value: "{model_format}" }}
  }},
  {{
    key: "predict_proba"
    value: {{ string_value: "true" }}
  }},
  {{
    key: "output_class"
    value: {{ string_value: "true" }}
  }},
  {{
    key: "threshold"
    value: {{ string_value: "0.5" }}
  }},
  {{
    key: "storage_type"
    value: {{ string_value: "{storage_type}" }}
  }}
]

dynamic_batching {{
  max_queue_delay_microseconds: 100
}}"""
    config_path = os.path.join(model_dir, 'config.pbtxt')
    with open(config_path, 'w') as file_:
        file_.write(config_text)

    return config_path

In [None]:
generate_config(model_logistic_dir, deployment_type='gpu', model_format='xgboost_json', storage_type='AUTO')
generate_config(model_logistic_cpu_dir, deployment_type='cpu', model_format='xgboost_json', storage_type='AUTO')
generate_config(model_oversample_dir, deployment_type='gpu', model_format='xgboost_json', storage_type='SPARSE')
generate_config(model_oversample_cpu_dir, deployment_type='cpu', model_format='xgboost_json', storage_type='SPARSE')
generate_config(model_RFC_dir, deployment_type='gpu', model_format='treelite_checkpoint', storage_type='AUTO')
generate_config(model_RFC_cpu_dir, deployment_type='cpu', model_format='treelite_checkpoint', storage_type='AUTO')

### Python Backend

In [None]:
python_text = f"""import triton_python_backend_utils as pb_utils
from torch.utils.dlpack import from_dlpack
import json
import asyncio

class TritonPythonModel:
    def initialize(self, args):


        # You must parse model_config. JSON string is not parsed here
        self.model_config = json.loads(args['model_config'])

        # Get OUTPUT0 configuration
        self.output0_config = pb_utils.get_output_config_by_name(
            self.model_config, "output__0")
        
        self.output0_dtype = pb_utils.triton_string_to_numpy(
            self.output0_config['data_type'])

    # You must add the Python 'async' keyword to the beginning of `execute`
    # function if you want to use `async_exec` function.
    async def execute(self, requests):
        output0_dtype = self.output0_dtype

        responses = []
        # Every Python backend must iterate over everyone of the requests
        # and create a pb_utils.InferenceResponse for each of them.
        for request in requests:
            # Get INPUT0
            in_0 = pb_utils.get_input_tensor_by_name(request, "input__0")

            # List of awaitables containing inflight inference responses.
            inference_response_awaits = []
            for model_name in ['model_logistic', 'model_oversample', 'model_RFC']:
                # Create inference request object
                infer_request = pb_utils.InferenceRequest(
                    model_name=model_name,
                    requested_output_names=["output__0"],
                    inputs=[in_0])

                # Store the awaitable inside the array. We don't need
                # the inference response immediately so we do not `await`
                # here.
                inference_response_awaits.append(infer_request.async_exec())

            # Wait for all the inference requests to finish. The execution
            # of the Python script will be blocked until all the awaitables
            # are resolved.
            inference_responses = await asyncio.gather(
                *inference_response_awaits)

            for infer_response in inference_responses:
                # Make sure that the inference response doesn't have an error.
                # If it has an error and you can't proceed with your model
                # execution you can raise an exception.
                if infer_response.has_error():
                    raise pb_utils.TritonModelException(
                        infer_response.error().message())

            logistic_tensor = pb_utils.get_output_tensor_by_name(
                inference_responses[0], "output__0")
            logistic_tensor = from_dlpack(logistic_tensor.to_dlpack())

            oversample_tensor = pb_utils.get_output_tensor_by_name(
                inference_responses[1], "output__0")
            oversample_tensor = from_dlpack(oversample_tensor.to_dlpack())
            
            RFC_tensor = pb_utils.get_output_tensor_by_name(
                inference_responses[2], "output__0")
            RFC_tensor = from_dlpack(RFC_tensor.to_dlpack())
            
            ensembled = (logistic_tensor.as_numpy() + oversample_tensor.as_numpy() + RFC_tensor.as_numpy()) / 3
            ensembled_tensor = pb_utils.Tensor("output__0", ensembled.astype(output0_dtype))

            inference_response = pb_utils.InferenceResponse(
                output_tensors=[ensembled_tensor])
            responses.append(inference_response)

        # You should return a list of pb_utils.InferenceResponse. Length
        # of this list must match the length of `requests` list.
        return responses

    def finalize(self):
        print('Cleaning up...')"""

python_dir = os.path.join(REPO_PATH, 'model_ensemble')
python_version_dir = os.path.join(python_dir, '1')
os.makedirs(python_version_dir, exist_ok=True)

python_path = os.path.join(python_version_dir, 'model.py')
with open(python_path, 'w') as file_:
    file_.write(python_text)

### Starting the server
With valid models and configuration files in place, we can now start the server. Below, we do so, use the Python client to wait for it to come fully online, and then check the logs to make sure we didn't get any unexpected warnings or errors while loading the models.

In [None]:
!docker run --gpus all -d -p 8000:8000 -p 8001:8001 -p 8002:8002 -v {REPO_PATH}:/models --name tritonserver {TRITON_IMAGE} tritonserver --model-repository=/models

In [None]:
import tritonclient.grpc as triton_grpc
from tritonclient import utils as triton_utils
HOST = 'localhost'
PORT = 8001

In [None]:
client = triton_grpc.InferenceServerClient(url=f'{HOST}:{PORT}')

In [None]:
import time
time.sleep(5) # Wait for server to come online

In [None]:
!docker logs tritonserver

## Submitting inference requests
With our models now deployed on a running Triton server, let's confirm that we get the same results from the deployed model as we get locally. Note that we will occasionally see slight divergences due to floating point errors during parallel execution, but otherwise, results should match.

### Categorical variables
If you are using a model with categorical features, a certain amount of care must be taken with categorical features, just as if you were executing a model locally. Both XGBoost and LightGBM depend on the input data frames to convert categories into numeric variables. If data is later submitted from a data frame which contains a different subset of categories, this numeric conversion will not be handled properly. In this example, we will use the same dataframe we used during testing, so we need not consider this, but otherwise we would need to note the mapping used for the `.codes` attribute for each categorical feature in the training dataframe and make sure the same codes were used when submitting inference requests.

In [None]:
import pandas as pd
import numpy as np
def convert_to_numpy(df):
    df = df.copy()
    cat_cols = df.select_dtypes('category').columns
    for col in cat_cols:
        df[col] = df[col].cat.codes
    for col in df.columns:
        df[col] =  pd.to_numeric(df[col], downcast='float')
    return df.values

In [None]:
np_data = convert_to_numpy(X_test)

In [None]:
import numpy as np

model_name = "model_ensemble"

input0_data = np_data[0:5]

inputs = [
    triton_grpc.InferInput("input__0", input0_data.shape, 'FP32')
]

inputs[0].set_data_from_numpy(input0_data)

outputs = [
    triton_grpc.InferRequestedOutput("output__0"),
]

response = client.infer(model_name,
                        inputs=inputs,
                        request_id=str(1),
                        outputs=outputs)

result = response.get_response()
triton_result = response.as_numpy("output__0")
print("Result computed on Triton: ")
print(triton_result)

In [None]:
print("Resulted computed locally: ")
local_result = (model_logistic.predict_proba(X_test[0:5]) + model_oversample.predict_proba(X_test[0:5]) + model_RFC.predict_proba(X_test[0:5])) / 3
print(local_result)

In [None]:
# Shut down the server
!docker rm -f tritonserver

### Conclusion