# Demo 3: Accelerating ML inference via distributed processing and/or Triton inference servers

In this demo we show how analysis workflows can be accelerated by parallelizing processing, or by outsourcing the ML inference to Triton servers with GPUs.

First, let's load the pre-selected dimuon events.

In [None]:
import numpy as np
import pandas as pd
import torch

from python.event_selection import load_events
from python.dnn_model import NeuralNet

sources = ["data", "ttbar", "dy"]
server = "file:/depot/cms/purdue-af/demos/"
model_dir = "/depot/cms/purdue-af/demos/"
dfs = {}

features = ['mu1_pt', 'mu1_eta', 'mu2_pt', 'mu2_eta', 'dimuon_mass', 'met']

# load datasets for inference
for src in sources:
    dfs[src] = load_events(f"{server}/{src}.root")[features]


## 1. Distributed processing with Dask
The `dask.distributed` package provides a quick way to process embarassingly parallel workflows using multiple local or remote computing nodes. This is done by spawning a *cluster* of workers (for remote workers - using a batch submission system like SLURM), and then creating a *client* to interact with that cluster.

Here we demonstrate two ways to use Dask clusters - create a local cluster within a notebook, or connect to a cluster created elsewhere. 

### Create a local cluster
The local cluster can be scaled up to the number of CPUs selected at session start (up to 32). In this case, all Dask workers will be running on the same node.

In [None]:
from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=4)
client = Client(cluster)
client

### Connect to an existing cluster
In this case, the cluster is launched elsewhere (e.g. in a different notebook, terminal, or through Dask JupyterLab extension), and only its address is needed to create the client.

Work is currently in progress to develop reliable cluster setups that will allow to use more than 32 CPUs and utilize more than one computing node at a time.

In [None]:
# from dask.distributed import Client

# client = Client("tcp://127.0.0.1:42573")
# client

### Example parallelization
In order to test the distributed processing setup, we run a simple DNN inference for three small datasets in parallel. Here DNN inference is just an example of a processing code that can be parallelized.

In [None]:
# Check if there are any local GPUs available
if torch.cuda.is_available():
    print("Will use GPU for inference.")
else:
    print("Will use CPUs for inference.")

# The main processing function that will be executed in parallel on multiple datasets
def inference(inp):
    label = inp[0]
    df = inp[1]
    model_path=model_dir+"/model.ckpt"
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    model = NeuralNet(6, [16, 8], 1).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    df = torch.from_numpy(df.values).to(device).float()
    scores = model(df) 
    scores = scores.cpu().detach().numpy()
    return {
        "label": label,
        "output": scores.ravel()
    }

# Distribute the datasets to workers
scattered_data = client.scatter(list(dfs.items()))

# Process the datasets in parallel and return the results
futures = client.map(inference, scattered_data)
results = client.gather(futures)

print("\nInference outputs:")
for res in results:
    print(res["label"], res["output"])

## 2. Outsourcing ML inference to remote GPUs via Triton servers
Machine learning inference is known to run much faster on GPUs as compared to CPUs. However, computing clusters are usually limited in number of GPUs, therefore it is not possible to ensure full access to GPUs for all users at all times.

An approach allowing to use the power of GPUs to accelerate inference without blocking the GPU nodes is to use dedicated inference servers which are always connected to GPUs.

In order to be able to evaluate a model via a Triton server, the model has to be saved in a special way: [see example how to do that in PyTorch](https://medium.com/@furcifer/deploying-triton-inference-server-in-5-minutes-67aa09a84ca6).

The saved models must be put into a repository visible to the Triton server(s), which in our case is `/depot/cms/purdue-af/triton/models/`.

At the moment, we provide several Triton servers corresponding to different GPUs / GPU partitions. To select a particular server, simply uncomment the corresponding address:

In [None]:
triton_address = '128.211.160.154:8001' # Partition of A100 GPU with 5gb RAM
#triton_address = '128.211.160.153:8001' # Partition of A100 GPU with 10gb RAM
#triton_address = '128.211.160.147:8001' # Partition of A100 GPU with 20gb RAM
#triton_address = 'hammer-f000.rcac.purdue.edu:8001' # T4 GPU located at a different cluster

In [None]:
import tritonclient.grpc as grpcclient

print(f"Connecting to Triton inference sever at {triton_address}")

keepalive_options = grpcclient.KeepAliveOptions(
    keepalive_time_ms=2**31 - 1,
    keepalive_timeout_ms=20000,
    keepalive_permit_without_calls=False,
    http2_max_pings_without_data=2
)

def inference_triton(inp):
    # Create Triton client
    try:
        triton_client = grpcclient.InferenceServerClient(
            url=triton_address,
            verbose=False,
            keepalive_options=keepalive_options
        )
    except Exception as e:
        print("Channel creation failed: " + str(e))
        sys.exit()
    
    label= inp[0]
    df = inp[1]
    
    # Inputs and outputs should be compatible with model metadata
    # stored in /depot/cms/purdue-af/triton/models/test-model/config.pbtxt
    inputs = [grpcclient.InferInput('INPUT__0', df.shape, "FP64")]
    outputs = [grpcclient.InferRequestedOutput('OUTPUT__0')]
    
    # Load input data
    inputs[0].set_data_from_numpy(df.values)
    
    # Run inference on Triton server.
    # Models are stored in /depot/cms/purdue-af/triton/models/
    results = triton_client.infer(
        model_name="test-model",
        inputs=inputs,
        outputs=outputs,
        headers={'test': '1'},
    )

    output = results.as_numpy('OUTPUT__0')
    return {
        "label": label,
        "output": output.flatten()
    }

results = []
for label, df in dfs.items():
    results.append(inference_triton([label, df]))


print("\nInference outputs:")
for res in results:
    print(res["label"], res["output"])

DNN inference via Triton servers can also be parallelized using Dask:

In [None]:
# scattered_data = client.scatter(list(dfs.items()))
# futures = client.map(inference_triton, scattered_data)
# results = client.gather(futures)

## Plotting DNN outputs
Run this cell after either Dask parallelization example or after Triton example to plot the DNN outputs (note that the models are different in these examples, so the outputs will not look the same). The models are generic and not meant to provide any physics meaning.

In [None]:
import matplotlib.pyplot as plt
bins = np.linspace(0, 1, 100)
plt.figure(figsize=(5,4))

dnn = {res["label"]: res["output"] for res in results}

plt.hist(dnn["dy"], bins, alpha=0.3, label='dy', density=True)
plt.hist(dnn["ttbar"], bins, alpha=0.3, label='ttbar', density=True)
plt.hist(dnn["data"], bins, alpha=0.3, label='data', density=True)
plt.xlabel('DNN Score')
plt.ylabel('Events')
leg = plt.legend(loc='upper left')