# DRUM - Automated Model Serving Made Easy

 We'll get our hands dirty by 

* Building a simple regression model using Scikit
* Using DRUM for Batch Scoring
* Using DRUM to get a REST API endpoint
* Show a simple example app connected to the REST API
* H2O, Keras, XGBoost, and DataRobot
* Add a DataRobot remote agent if you are interested in further model monitoring


## Build a Model

In [1]:
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
import pickle

## load data

df = pd.read_csv('../data/boston_housing.csv')
df.head()

## set features and target

X = df.drop('MEDV', axis=1)
y = df['MEDV']

## train the model
rf = RandomForestRegressor(n_estimators = 20)
rf.fit(X,y)

## serialize the model

with open('../src/custom_model/rf.pkl', 'wb') as pkl:
    pickle.dump(rf, pkl)

# Batch Scoring with DRUM
<a id="setup_complete"></a>

At this point our model has been written to disk and we want to start making predictions with it.  To do this, we'll leverage DRUM and it's ability to natively handle our scikit learn model, all we need to do is tell DRUM where it resides as well as the data we wish to score.  

There are a lot of frameworks which DRUM supports nateively, but for those which DRUM doesn't support of these shelf, we'll just need to create some custom hooks so DRUM.  In this example, we'll highlight some very simple custom hooks, and will provide links to more complex examples.  

In [2]:
!drum score --code-dir ../src/custom_model --input ../data/boston_housing_inference.csv --output ../data/predictions.csv --target-type regression

Using TensorFlow backend.


In [3]:
pd.read_csv("../data/predictions.csv").head()

Unnamed: 0,Predictions
0,24.475
1,22.285
2,34.935
3,33.865
4,35.685


# Start the inference server locally

Batch scoring can be very useful, but the utility DRUM offers does not stop there.  We can also leverage DRUM to serve our model as a RESTful API endpoint.  The only thing that changes is the way we will structure the command - using the `server` mode instead of `score` model.  We'll also need to provide an address which is NOT in use.  

When starting the server, we'll use `subprocess.Popen` so we may interact with the server in this notebook

In [4]:
import subprocess
import requests
import pandas as pd
from io import BytesIO
import yaml
import time
import os
import datarobot as dr
from pprint import pprint

In [5]:
run_inference_server = ["drum",
              "server",
              "--code-dir","../src/custom_model", 
              "--address", "0.0.0.0:6789", 
              "--show-perf",
              "--target-type", "regression",
              "--logging-level", "info",
              "--show-stacktrace",
#               "--verbose"
              ]

In [6]:
inference_server = subprocess.Popen(run_inference_server, stdout=subprocess.PIPE)

## Ping the Server to make sure it is running

In [7]:
## confirm the server is running
time.sleep(5) ## snoozing before pinging the server to give it time to actually start
print('check status')
requests.request("GET", "http://0.0.0.0:6789/").content

check status


b'{"message":"OK"}\n'

## Send data to server for inference

The request must provide our dataset as form data.  In order to do so, we'll create a simple python function to pass the data over appropriately.  We'll leverage the same function in our simple flask app a little later.  

In [8]:
def score(data):
    b_buf = BytesIO()
    b_buf.write(data.to_csv(index=False).encode("utf-8"))
    b_buf.seek(0)
  
    url = "http://localhost:6789/predict/"
    files = [
        ('X', b_buf)
    ]
    response = requests.request("POST", url, files = files, timeout=None, verify=False)
    return response

In [9]:
# %%timeit
scoring_data = pd.read_csv("../data/boston_housing_inference.csv")
predictions = score(scoring_data).json() ## score entire dataset but only show first 5 records
pprint(predictions)

{'predictions': [24.475,
                 22.285,
                 34.935,
                 33.865,
                 35.685,
                 27.225,
                 21.735,
                 24.675,
                 15.845]}


## Start the Flask App

Now that we testing out our endpoing, now it is time to start up our flask app, but first, we need to set a few environment variables for the flask app.  

In [10]:
os.environ["LC_ALL"] = "C.UTF-8"
os.environ["LANG"] = "C.UTF-8"
os.environ["FLASK_APP"] = "server.app"
os.environ["FLASK_ENV"] = "development"

Running the flask app as follows will lock the interpreter and only return control once you interupt the kernal.  Be advised that interupting the kernel via the `stop` button will kill the flask app AND the inference server.  Once you execute the following cell, go over to the [app](http://localhost:8080/frontend)

In [11]:
!cd ../src && python -m flask run --host 0.0.0.0 --port 8080

 * Serving Flask app "server.app" (lazy loading)
 * Environment: development
 * Debug mode: on
 * Running on http://0.0.0.0:8080/ (Press CTRL+C to quit)
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 208-528-333
127.0.0.1 - - [26/Oct/2020 12:45:45] "[37mGET /frontend HTTP/1.1[0m" 200 -
      CRIM    ZN  INDUS  CHAS    NOX  ...  RAD    TAX  PTRATIO      B  LSTAT
0  0.00632  18.0   2.31   0.0  0.538  ...  1.0  296.0     15.3  396.9   4.98

[1 rows x 13 columns]
making request
prediciton [24.475]
heylksdfmlsdmsdflklmsdfsdf
127.0.0.1 - - [26/Oct/2020 12:45:51] "[37mPOST /frontend HTTP/1.1[0m" 200 -
^C


In [12]:
inference_server.terminate()
# inference_server.stdout.readlines()

In [14]:
# requests.request("GET", "http://0.0.0.0:6789/").content

## Value Prop

One may ask, what is the benefit to be had here?  Well, first of, there is not need for me to write an api to get the model up and running.  Second, DRUM allows me to abstract the framework away (provided I'm using one that is natively supported, or I can write enough python so that DRUM understands how to hook up to the model.  

For example, I could hot swap models as I see fit (see exampels in `./src/other_models`)

While we will run through several other frameworks with in `score` you can bet they are supported in `server` mode as well!

#### H2O Mojo

In [15]:
!drum score --code-dir ../src/other_models/h2o_mojo/regression --input ../data/boston_housing_inference.csv --target-type regression


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
   Predictions
0    24.504000
1    22.492000
2    34.554001
3    34.420001
4    35.289001
5    28.394001
6    21.936000
7    23.451000
8    17.065000


#### Keras

In [16]:
!drum score --code-dir ../src/other_models/python3_keras_joblib --input ../data/boston_housing_inference.csv --target-type regression


2020-10-26 12:46:32.054222: I tensorflow/core/platform/cpu_feature_guard.cc:143] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2020-10-26 12:46:32.102805: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x7fe33085c970 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-10-26 12:46:32.102939: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
Using TensorFlow backend.
   Predictions
0    23.668932
1    23.421118
2    31.283527
3    33.996525
4    33.757942
5    28.036715
6    20.675852
7    19.578415
8    19.676756


#### XGBoost

Requires XGBoost

In [17]:
!drum score --code-dir ../src/other_models/python3_xgboost --input ../data/boston_housing_inference.csv --target-type regression


Using TensorFlow backend.
   Predictions
0    24.541843
1    21.260277
2    34.018497
3    32.569200
4    34.248066
5    27.282364
6    20.803959
7    19.645220
8    16.968880


#### DataRobot Codegen

In [18]:
!drum score --code-dir ../src/other_models/dr_codegen --input ../data/boston_housing_inference.csv --target-type regression


   Predictions
0    24.258228
1    24.258228
2    32.451515
3    32.451515
4    32.451515
5    24.258228
6    21.078378
7    13.107812
8    13.107812


# Monitoring Deployments

What follows will require a DataRobot account.  You can get a trial account at [https://www.datarobot.com/trial/](https://www.datarobot.com/trial/)

Also, JDK 11 or 12 will be required.

The main idea: we'll will start an agent service locally.  This agent will be monitoring a spooler.  The spooler could be something as simple as local file system, or a little more realistic like a message broker (pubsub, rabbitmq, sqs).  

Once, this agent is spun up locally, we'll enable a few environment variables to let DRUM know that there is an agent present and that it needs to buffer data to defined spool.  

## Getting the monitoring agents



Currently - have to go in through the [UI](https://app2.datarobot.com/account/developer-tools) to grab the agents 

In [58]:
token = "your-token"
endpoint = "https://app2.datarobot.com"
## connect to DataRobot platform with python client. 
client = dr.Client(token, "{}/api/v2".format(endpoint))
# mlops_agents_tb = client.get("mlopsInstaller")
# with open("../mlops-agent.tar.gz", "wb") as f:
#     f.write(mlops_agents_tb.content)

In [43]:
# !tar -xf ../mlops-agent.tar.gz -C ..
!tar -xf ../datarobot-mlops-agent-6.2.4-399.tar.gz -C ..
!

tar: Failed to set default locale


## Configuring the Agent

When we'll configure the agent, we just need to define the DataRobot MLOPS location, our api token.  By default, the agent will expect the data to be spooled on the local file system.  Specifically, the default location will be `/tmp/ta` so we just need to make sure that location exists

In [44]:
!mkdir -p /tmp/ta

In [46]:
agents_dir

'datarobot-mlops-agent-6.2.4-399.tar.gz'

In [49]:
agents_dir = next(filter(lambda x: "datarobot-mlops-agent-6.2.4" == x, os.listdir("..")))
with open(r'../{}/conf/mlops.agent.conf.yaml'.format(agents_dir)) as file:
    documents = yaml.load(file, Loader=yaml.FullLoader)
## configure the loaction of the mlops instance with which we'll communcate
documents['mlopsUrl'] = endpoint
# Set your API token
documents['apiToken'] = token
## write the configuration back to disk
with open('../{}/conf/mlops.agent.conf.yaml'.format(agents_dir), "w") as f:
    yaml.dump(documents, f)

## Start the Agent Service

Checking to make sure we can start up the agents service.  

This will require a JDK - tested with 11 and 12

In [50]:
## run agents service
subprocess.call("../{}/bin/start-agent.sh".format(agents_dir))

0

In [51]:
## check status
check = subprocess.Popen(["../{}/bin/status-agent.sh".format(agents_dir)], stdout=subprocess.PIPE)
print(check.stdout.readlines())
check.terminate()

[b'DataRobot MLOps-Agent is running as a service.\n']


In [52]:
## check log to see that the agent connected to DR MLOps
check = subprocess.Popen(["cat", "../{}/logs/mlops.agent.log".format(agents_dir)], stdout=subprocess.PIPE)
for line in check.stdout.readlines():
    print(line)
check.terminate()

b'2020-10-26 13:11:05,215 INFO  com.datarobot.mlops.agent.config.channels.YamlBuilder        [] - Found spooler of type FILESYSTEM\n'
b'2020-10-26 13:11:05,218 INFO  com.datarobot.mlops.agent.config.channels.YamlBuilder        [] - Setting directory = /tmp/ta\n'
b'2020-10-26 13:11:05,218 INFO  com.datarobot.mlops.agent.config.channels.YamlBuilder        [] - Setting CHANNEL_NAME = filesystem\n'
b'2020-10-26 13:11:05,904 ERROR com.datarobot.mlops.agent.Agent                              [] - Error during agent execution - Fail to get version from server  - {"message": "Invalid Authorization header"}\n'
b'2020-10-26 13:12:38,272 INFO  com.datarobot.mlops.agent.config.channels.YamlBuilder        [] - Found spooler of type FILESYSTEM\n'
b'2020-10-26 13:12:38,275 INFO  com.datarobot.mlops.agent.config.channels.YamlBuilder        [] - Setting directory = /tmp/ta\n'
b'2020-10-26 13:12:38,275 INFO  com.datarobot.mlops.agent.config.channels.YamlBuilder        [] - Setting CHANNEL_NAME = filesys

## DataRobot MLOps - Deploying External Model 

To communication with DataRobot MLOps, with need to MLOps python client installed which came in the downloaded tarball

In [53]:
!pip install ../datarobot-mlops-*/lib/datarobot_mlops-*.whl -q

You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [54]:
from datarobot.mlops.mlops import MLOps
from datarobot.mlops.common.enums import OutputType
from datarobot.mlops.connected.client import MLOpsClient
from datarobot.mlops.common.exception import DRConnectedException
from datarobot.mlops.constants import Constants

In [55]:
DEPLOYMENT_NAME="Boston Housing Prices ODSC"
TRAINING_DATA = '../data/boston_housing.csv'

In [56]:
model_info = {
        "name": "Boston Housing Pricins",
        "modelDescription": {
            "description": "prediction price of home"
        },
        "target": {
            "type": "Regression",
            "name": "MEDV",
        }
}

In [60]:
# Create connected client
mlops_client = MLOpsClient(endpoint, token)

# Add training_data to model configuration
print("Uploading training data - {}. This may take some time...".format(TRAINING_DATA))
dataset_id = mlops_client.upload_dataset(TRAINING_DATA)
print("Training dataset uploaded. Catalog ID {}.".format(dataset_id))
model_info["datasets"] = {"trainingDataCatalogId": dataset_id}

# Create the model package
print('Create model package')
model_pkg_id = mlops_client.create_model_package(model_info)
model_pkg = mlops_client.get_model_package(model_pkg_id)
model_id = model_pkg["modelId"]

# Deploy the model package
print('Deploy model package')
deployment_id = mlops_client.deploy_model_package(model_pkg["id"],
                                                            DEPLOYMENT_NAME)

# Enable data drift tracking
print('Enable feature drift')
enable_feature_drift = TRAINING_DATA is not None
mlops_client.update_deployment_settings(deployment_id, target_drift=True,
                                                  feature_drift=enable_feature_drift)
_ = mlops_client.get_deployment_settings(deployment_id)

print("\nDone.")
print("DEPLOYMENT_ID=%s, MODEL_ID=%s" % (deployment_id, model_id))

DEPLOYMENT_ID = deployment_id
MODEL_ID = model_id

Uploading training data - ../data/boston_housing.csv. This may take some time...
Training dataset uploaded. Catalog ID 5f9703db10582b0047159b04.
Create model package
Deploy model package
Enable feature drift

Done.
DEPLOYMENT_ID=5f9703fe4e13cd0177cb7eda, MODEL_ID=5f9703fc77a58303862423d7


# Adding Monitoring with MLOps Monitoring Agents

## Monitoring With DRUM

There are a few addition parameters we should set for the command line utility, or we may just create environment variables, and allow the drum utility to pick up the details from there.  

```
  --monitor             Monitor predictions using DataRobot MLOps. True or
                        False. (env: MONITOR).Monitoring can not be used in
                        unstructured mode.
  --deployment-id DEPLOYMENT_ID
                        Deployment id to use for monitoring model predictions
                        (env: DEPLOYMENT_ID)
  --model-id MODEL_ID   MLOps model id to use for monitoring predictions (env:
                        MODEL_ID)
  --monitor-settings MONITOR_SETTINGS
                        MLOps setting to use for connecting with the MLOps
                        Agent (env: MONITOR_SETTINGS)
```
For today, we'll set environment variables to add monitoring. 


In [61]:
os.environ["MONITOR"] = "True"
os.environ["DEPLOYMENT_ID"] = deployment_id
os.environ["MODEL_ID"] = model_id
os.environ["MONITOR_SETTINGS"] = "spooler_type=filesystem;directory=/tmp/ta;max_files=5;file_max_size=1045876000"

In [64]:
"{}/deployments/{}/overview".format(endpoint, deployment_id)

'https://app2.datarobot.com/deployments/5f9703fe4e13cd0177cb7eda/overview'

In [68]:
inference_server_with_monitoring = subprocess.Popen(run_inference_server, stdout=subprocess.PIPE)

In [None]:
!cd ../src && python -m flask run --host 0.0.0.0 --port 8080

In [None]:
subprocess.call("../{}/bin/stop-agent.sh".format(agents_dir))

In [None]:
## check that agent is stopped 
check = subprocess.Popen(["../{}/bin/status-agent.sh".format(agents_dir)], stdout=subprocess.PIPE)
print(check.stdout.readlines())
check.terminate()

In [None]:
deployment = dr.Deployment.get(deployment_id)
deployment
deployment.get_service_stats()

In [None]:
service_stats = deployment.get_service_stats()
service_stats.metrics