## Generate some test data

This is an example of scoring data and monitoring with datarobot

In [103]:
import json
import pandas as pd 
import yaml
import datarobot as dr
from datarobot_mlops.mlops import MLOps
import time

example1 = { 
    "claimNumber": 12341234,
    "exposureType": "exposure type a",
    "modelConfidence": "low",  ## high, low, medium
    "claimantInfo": " [\{\}] "
 }

import numpy as np 
def generate_data(n = 100):
    arr = ["low", "medium", "high"]
    modelConfidence = np.random.choice(["low", "medium", "high"], size=n, p = [0.6, 0.3, 0.1], replace=True)
    exposureType = np.random.choice(["exposure type a", "exposure type b", "exposure type c"], size=n, p = [1/3 ,1/3, 1/3], replace=True)
    claimNumber = np.random.randint(11111, 99999, size = n)
    
    payload = pd.DataFrame( dict( claimNumber = claimNumber, exposureType = exposureType, modelConfidence = modelConfidence))
    payload["claimantInfo"] = json.dumps( dict( field1 = "field1", field2 = "field2"))
    return payload

def ohe_prediction(x):
    if x == "low":
        return [1.0, 0.0, 0.0]
    elif x == "medium":
        return [0.0, 1.0, 0.0]
    elif x == "high":
        return [0.0, 0.0, 1.0]
    else:
        return [1/3, 1/3, 1/3]






## Recommended Approach 

1.  Register Payload to DataRobot Dataset Register
2.  Create and trigger a monitoring job - DataRobot will use the uploaded dataset to monitoring inputs and predictions

In [124]:
with open("deployment.yaml", "r") as f:
    deployment_conf = yaml.load(f, Loader = yaml.SafeLoader)
deployment = dr.Deployment.get( deployment_conf.get("deployment_id"))

In [116]:
import datarobot as dr 
import datetime

client = dr.Client() 
payload = generate_data(n = 100)
preds = payload["modelConfidence"].apply(ohe_prediction).values
preds_df = pd.DataFrame(list(preds), columns = ["PREDICITION_LOW", "PREDICTION_MEDIUM", "PREDICTION_HIGH"])
ts = datetime.datetime.now().isoformat()
payload.join(preds_df)
dataset =  dr.Dataset.create_from_in_memory_data(payload.join(preds_df), fname = f"SUBRO monitoring dataset {ts}.csv")

monitoring_job_payload = {
    "deploymentId":deployment.id,
    "intakeSettings":{"type":"dataset","datasetId":dataset.id},
    "name":f"Subro model Job {ts}1 (UTC)",
    "enabled":False,
    "monitoringColumns": {
        "predictionsColumns":[
            {"className":"high","columnName":"PREDICTION_HIGH"},
            {"className":"medium","columnName":"PREDICTION_MEDIUM"},
            {"className":"low","columnName":"PREDICITION_LOW"}
        ]}}

monitoring_job_response = client.post("batchMonitoringJobDefinitions/", data = monitoring_job_payload)
monitoring_job_response.raise_for_status()

job_run_payload = {"jobDefinitionId":monitoring_job_response.json()["id"]}
job_run_response = client.post("batchJobs/fromJobDefinition/", data = job_run_payload)
job_run_response.raise_for_status()

'67e595f68e68d7369dc6fa2e'

## Approach 2: Use the Filesystem as Spooler



In [0]:
service_stats = deployment.get_service_stats()
prediction_count = service_stats.metrics.get("totalPredictions")
print(prediction_count)

In [5]:
from pathlib import Path
from datarobot_mlops.mlops import MLOps
import os 
import glob 
import subprocess   
import time 

## thia is the spooler director that we are creating on the fly
spooler_dir = Path("/tmp/ta")
spooler_dir.mkdir(exist_ok = True)
## the environment variables are a must for the client created on line 20 of this cell
## and for the agent that gets started in a few cells
os.environ["MLOPS_SERVICE_URL"] = "https://app.datarobot.com"
os.environ['MLOPS_API_TOKEN'] = os.environ["DATAROBOT_API_TOKEN"]
os.environ['MLOPS_AGENT_VERIFY_SSL'] = "true"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk/"
os.environ["MLOPS_SPOOLER_TYPE"]="FILESYSTEM"
os.environ["MLOPS_FILESYSTEM_DIRECTORY"] = "/tmp/ta"
os.environ["MLOPS_DEPLOYMENT_ID"] = deployment.id
os.environ["MLOPS_MODEL_ID"] = deployment.model.get("id")


In [5]:
from pathlib import Path
from datarobot_mlops.mlops import MLOps
import os 
import glob 
import subprocess   
import time 
import json

feature_types = [{ "name": "claimNumber", "feature_type": "number"}, {"name":"exposureType", "feature_type":"categorical"}, {"name": "modelConfidence", "feature_type": "categorical"}, {"name": "claimantInfo", "feature_type": "text"}]
## thia is the spooler director that we are creating on the fly
spooler_dir = Path("/tmp/ta")
spooler_dir.mkdir(exist_ok = True)
## the environment variables are a must for the client created on line 20 of this cell
## and for the agent that gets started in a few cells
os.environ["MLOPS_SERVICE_URL"] = "https://app.datarobot.com"
os.environ['MLOPS_API_TOKEN'] = os.environ["DATAROBOT_API_TOKEN"]
os.environ['MLOPS_AGENT_VERIFY_SSL'] = "true"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk/"
os.environ["MLOPS_SPOOLER_TYPE"]="FILESYSTEM"
os.environ["MLOPS_FILESYSTEM_DIRECTORY"] = "/tmp/ta"
os.environ["MLOPS_FEATURE_TYPES_JSON"] = json.dumps(feature_types)
os.environ["MLOPS_DEPLOYMENT_ID"] = deployment.id
os.environ["MLOPS_MODEL_ID"] = deployment.model.get("id")

mlops = MLOps().init()
start = time.time() 
payload = generate_data(10000)
predictions = payload["modelConfidence"].apply(ohe_prediction).tolist()
time.sleep(5)
end = time.time()
## score date 
mlops.report_deployment_stats(payload.shape[0], (end - start)*1000) 
mlops.report_predictions_data(predictions = predictions, class_names = ["low", "medium", "high"])
mlops.shutdown()




In [8]:
mlops.report_predictions_data??

[31mSignature:[39m
mlops.report_predictions_data(
    features_df=[38;5;28;01mNone[39;00m,
    predictions=[38;5;28;01mNone[39;00m,
    association_ids=[38;5;28;01mNone[39;00m,
    class_names=[38;5;28;01mNone[39;00m,
    deployment_id=[38;5;28;01mNone[39;00m,
    model_id=[38;5;28;01mNone[39;00m,
    skip_drift_tracking=[38;5;28;01mFalse[39;00m,
    skip_accuracy_tracking=[38;5;28;01mFalse[39;00m,
    batch_id=[38;5;28;01mNone[39;00m,
)
[31mSource:[39m   
    [38;5;28;01mdef[39;00m report_predictions_data(
        self,
        features_df=[38;5;28;01mNone[39;00m,
        predictions=[38;5;28;01mNone[39;00m,
        association_ids=[38;5;28;01mNone[39;00m,
        class_names=[38;5;28;01mNone[39;00m,
        deployment_id=[38;5;28;01mNone[39;00m,
        model_id=[38;5;28;01mNone[39;00m,
        skip_drift_tracking=[38;5;28;01mFalse[39;00m,
        skip_accuracy_tracking=[38;5;28;01mFalse[39;00m,
        batch_id=[38;5;28;01mNone[39;00m,
    )

In [6]:
mlops = MLOps().init() 
start = time.time() 
payload = generate_data(10000)
predictions = payload["modelConfidence"].apply(ohe_prediction).tolist()
time.sleep(5)
end = time.time()
## score date 

mlops.report_deployment_stats(payload.shape[0], (end - start)*1000) 
mlops.report_predictions_data(features_df = payload, predictions = predictions, class_names = ["low", "medium", "high"])
mlops.shutdown()

In [6]:
## start the agent!  this will use the environments variables set up above
agents_dir = glob.glob("./datarobot_mlops*").pop(0)
try: 
    os.remove(os.path.join(agents_dir, "bin", "PID.agent"))
except Exception as e:
    print(e)

subprocess.call("{}/bin/start-agent.sh".format(agents_dir))

INFO: MLOPS_AGENT_CONFIG_YAML=/home/notebooks/storage/datarobot_mlops_package-11.0.1/conf/mlops.agent.conf.yaml
INFO: MLOPS_AGENT_LOG_PROPERTIES=/home/notebooks/storage/datarobot_mlops_package-11.0.1/conf/mlops.log4j2.properties
INFO: MLOPS_AGENT_JVM_OPT=-Xmx1G
INFO: AGENT_CLASSPATH='/home/notebooks/storage/datarobot_mlops_package-11.0.1/lib/spooler-kafka-11.0.1.jar:/home/notebooks/storage/datarobot_mlops_package-11.0.1/lib/spooler-pubsub-11.0.1.jar:/home/notebooks/storage/datarobot_mlops_package-11.0.1/lib/spooler-rabbitmq-11.0.1.jar:/home/notebooks/storage/datarobot_mlops_package-11.0.1/lib/spooler-sqs-11.0.1.jar:/home/notebooks/storage/datarobot_mlops_package-11.0.1/lib/mlops-agent-11.0.1.jar'
INFO: AGENT_LOG_PATH=/home/notebooks/storage/datarobot_mlops_package-11.0.1/logs/mlops.agent.log

Running MLOps-Agent as a service


DataRobot MLOps-Agent is running.


0

## Give it some time to report back predictions

in a scheduled run, we need to block to make sure all predictions are reported back, otherwise the job will terminate before the spooler flushes all records to datarobot

In [8]:
predictions_reported = service_stats.metrics.get("totalPredictions") - prediction_count 
# predictions_reported != payload.shape[0]
while predictions_reported != payload.shape[0]:
    service_stats = deployment.get_service_stats()
    predictions_reported = service_stats.metrics.get("totalPredictions") - prediction_count 
print("all predictions reported")
print(prediction_count)
print(service_stats.metrics.get("totalPredictions"))


all predictions reported
140200
150200


## Using API as spooler

Recommended when you must want to aggregate statistcs and report.  not meant for sending through big payloads

In [4]:
!pip install datarobot-mlops-stats-aggregator

Collecting datarobot-mlops-stats-aggregator
  Downloading datarobot_mlops_stats_aggregator-10.1.2-py3-none-any.whl.metadata (2.2 kB)
Collecting drfaster~=9.0.0 (from datarobot-mlops-stats-aggregator)
  Downloading drfaster-9.0.0.tar.gz (108 kB)
  Installing build dependencies ... [?25l- \ | / done
[?25h  Getting requirements to build wheel ... [?25l- \ | / done
[?25h  Preparing metadata (pyproject.toml) ... [?25l- done
Downloading datarobot_mlops_stats_aggregator-10.1.2-py3-none-any.whl (42 kB)
Building wheels for collected packages: drfaster
  Building wheel for drfaster (pyproject.toml) ... [?25l- \ | / - \ | done
[?25h  Created wheel for drfaster: filename=drfaster-9.0.0-cp311-cp311-linux_x86_64.whl size=565558 sha256=6637bc4e9ef0e63aa0ef067f79467ce687dea404b48328cb67c2a77e93849f9b
  Stored in directory: /tmp/pip-ephem-wheel-cache-ln6gmjs5/wheels/3a/b9/24/3ec6904bd61e4661590d38a78c95dfd39b7d2d8273c64746a6
Successfully built drfaster

In [33]:
from pathlib import Path
import vcr

from datarobot_mlops.mlops import MLOps
import os 
import glob 
import subprocess   
import time 
import json

feature_types = [{ "name": "claimNumber", "feature_type": "number"}, {"name":"exposureType", "feature_type":"categorical"}, {"name": "modelConfidence", "feature_type": "categorical"}, {"name": "claimantInfo", "feature_type": "text"}]
## thia is the spooler director that we are creating on the fly
spooler_dir = Path("/tmp/ta")
spooler_dir.mkdir(exist_ok = True)
## the environment variables are a must for the client created on line 20 of this cell
## and for the agent that gets started in a few cells
os.environ["MLOPS_SERVICE_URL"] = "https://app.datarobot.com"
os.environ['MLOPS_API_TOKEN'] = os.environ["DATAROBOT_API_TOKEN"]
os.environ['MLOPS_AGENT_VERIFY_SSL'] = "true"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk/"
os.environ["MLOPS_SPOOLER_TYPE"]="API"
os.environ["MLOPS_FILESYSTEM_DIRECTORY"] = ""
os.environ["MLOPS_FEATURE_TYPES_JSON"] = json.dumps(feature_types)
os.environ["MLOPS_DEPLOYMENT_ID"] = deployment.id
os.environ["MLOPS_MODEL_ID"] = deployment.model.get("id")

mlops = MLOps().init()
start = time.time() 
payload = generate_data(5)
predictions = payload["modelConfidence"].apply(ohe_prediction).tolist()
time.sleep(5)
end = time.time()
## score date 
with vcr.use_cassette('fixtures/vcr_cassettes/report_deployment_stats.yaml'):
    mlops.report_deployment_stats(payload.shape[0], (end - start)*1000) 

with vcr.use_cassette('fixtures/vcr_cassettes/report_prediction_data.yaml'):
    mlops.report_predictions_data(features_df = payload, predictions = predictions, association_ids = ["uid1", "uid2", "uid3", "uid4", "uid5"], class_names = ["low", "medium", "high"],     skip_drift_tracking=False,
    skip_accuracy_tracking=False)

mlops.shutdown()




In [16]:
mlops.report_predictions_data?

[31mSignature:[39m
mlops.report_predictions_data(
    features_df=[38;5;28;01mNone[39;00m,
    predictions=[38;5;28;01mNone[39;00m,
    association_ids=[38;5;28;01mNone[39;00m,
    class_names=[38;5;28;01mNone[39;00m,
    deployment_id=[38;5;28;01mNone[39;00m,
    model_id=[38;5;28;01mNone[39;00m,
    skip_drift_tracking=[38;5;28;01mFalse[39;00m,
    skip_accuracy_tracking=[38;5;28;01mFalse[39;00m,
    batch_id=[38;5;28;01mNone[39;00m,
)
[31mDocstring:[39m
Report features and predictions to DataRobot MLOps for tracking and monitoring.

:param features_df: Dataframe containing features to track and monitor.  All the features
    in the dataframe are reported.  Omit the features from the dataframe that do not need
    reporting.
:type features_df: pandas dataframe, optional
:param predictions: List of predictions.  For Regression deployments, this is a 1D list
    containing prediction values.  For Classification deployments, this is a 2D list, in
    which the inne

In [34]:
import yaml 
import json
with open("/home/notebooks/storage/fixtures/vcr_cassettes/report_prediction_data.yaml", "r") as f:
    report_preds = yaml.load(f, Loader = yaml.SafeLoader)
with open("/home/notebooks/storage/fixtures/vcr_cassettes/report_deployment_stats.yaml") as f:
    report_stats = yaml.load(f, Loader = yaml.SafeLoader)

In [35]:
json.loads(report_stats["interactions"][0]["request"]["body"])

{'data': [{'timestamp': '2025-03-27 17:11:09.171+0000',
   'modelId': '67e32c08babf26d876865a84',
   'numPredictions': 5,
   'executionTime': 5002.68292427063,
   'userError': False,
   'systemError': False}]}

In [0]:


predictions_data_dict = json.loads(report_preds["interactions"][0]["request"]["body"])

In [45]:
predictions_data_dict

{'data': [{'timestamp': '2025-03-27 17:13:29.362+0000',
   'modelId': '67e32c08babf26d876865a84',
   'features': [{'name': 'claimNumber',
     'values': [58928, 81888, 33617, 63454, 25663]},
    {'name': 'exposureType',
     'values': ['exposure type a',
      'exposure type b',
      'exposure type a',
      'exposure type c',
      'exposure type a']},
    {'name': 'modelConfidence', 'values': ['low', 'low', 'low', 'low', 'low']},
    {'name': 'claimantInfo',
     'values': ['{"field1": "field1", "field2": "field2"}',
      '{"field1": "field1", "field2": "field2"}',
      '{"field1": "field1", "field2": "field2"}',
      '{"field1": "field1", "field2": "field2"}',
      '{"field1": "field1", "field2": "field2"}']}],
   'predictions': [[1.0, 0.0, 0.0],
    [1.0, 0.0, 0.0],
    [1.0, 0.0, 0.0],
    [1.0, 0.0, 0.0],
    [1.0, 0.0, 0.0]],
   'associationIds': ['uid1', 'uid2', 'uid3', 'uid4', 'uid5'],
   'classNames': ['low', 'medium', 'high']}]}

In [47]:
data = """
{"data":[{"timestamp":"2025-03-27 17:13:29.362+0000","modelId":"67e32c08babf26d876865a84","features":[{"name":"claimNumber","values":[58928,81888,33617,63454,25663]},{"name":"exposureType","values":["exposure
      type a","exposure type b","exposure type a","exposure type c","exposure type
      a"]},{"name":"modelConfidence","values":["low","low","low","low","low"]},{"name":"claimantInfo","values":["{\"field1\":
      \"field1\", \"field2\": \"field2\"}","{\"field1\": \"field1\", \"field2\": \"field2\"}","{\"field1\":
      \"field1\", \"field2\": \"field2\"}","{\"field1\": \"field1\", \"field2\": \"field2\"}","{\"field1\":
      \"field1\", \"field2\": \"field2\"}"]}],"predictions":[[1.0,0.0,0.0],[1.0,0.0,0.0],[1.0,0.0,0.0],[1.0,0.0,0.0],[1.0,0.0,0.0]],"associationIds":["uid1","uid2","uid3","uid4","uid5"],"classNames":["low","medium","high"]}]}"""
   

In [80]:
import datetime 
now = datetime.datetime.now()
print(now.isoformat())
print('2025-03-27 17:13:29.362+0000\n')
print(now.strftime("%Y-%m-%d %H:%M:%t.%s+0000"))
print(now.strftime("%Y-%m-%d %H:%M:%S.%fZ"))

2025-03-27T17:33:36.423549
2025-03-27 17:13:29.362+0000

2025-03-27 17:33:	.1743096816+0000
2025-03-27 17:33:36.423549Z


In [58]:
import pprint
pprint.pprint(yaml.dump(predictions_data_dict))

('data:\n'
 '- associationIds:\n'
 '  - uid1\n'
 '  - uid2\n'
 '  - uid3\n'
 '  - uid4\n'
 '  - uid5\n'
 '  classNames:\n'
 '  - low\n'
 '  - medium\n'
 '  - high\n'
 '  features:\n'
 '  - name: claimNumber\n'
 '    values:\n'
 '    - 58928\n'
 '    - 81888\n'
 '    - 33617\n'
 '    - 63454\n'
 '    - 25663\n'
 '  - name: exposureType\n'
 '    values:\n'
 '    - exposure type a\n'
 '    - exposure type b\n'
 '    - exposure type a\n'
 '    - exposure type c\n'
 '    - exposure type a\n'
 '  - name: modelConfidence\n'
 '    values:\n'
 '    - low\n'
 '    - low\n'
 '    - low\n'
 '    - low\n'
 '    - low\n'
 '  - name: claimantInfo\n'
 '    values:\n'
 '    - \'{"field1": "field1", "field2": "field2"}\'\n'
 '    - \'{"field1": "field1", "field2": "field2"}\'\n'
 '    - \'{"field1": "field1", "field2": "field2"}\'\n'
 '    - \'{"field1": "field1", "field2": "field2"}\'\n'
 '    - \'{"field1": "field1", "field2": "field2"}\'\n'
 '  modelId: 67e32c08babf26d876865a84\n'
 '  predictions:\n'

In [52]:
import requests 
import os

ENDPOINT = "https://app.datarobot.com/api/v2/deployments"

PREDICTION_INPUT_FROM_JSON = "predictionInputs/fromJSON"

response = requests.post( os.path.join(ENDPOINT, deployment.id, PREDICTION_INPUT_FROM_JSON), 
            headers = {
                "Authorization": f"Bearer {os.environ['DATAROBOT_API_TOKEN']}",
                "Content-Type": "application/json"
            },
            data = json.dumps(predictions_data_dict))


In [53]:
import requests 
import os
ENDPOINT = "https://app.datarobot.com/api/v2/deployments"
PREDICTION_REQUESTS_FROM_JSON = "predictionRequests/fromJSON"   

response = requests.post( os.path.join(ENDPOINT, deployment.id, PREDICTION_INPUT_FROM_JSON), 
            headers = {
                "Authorization": f"Bearer {os.environ['DATAROBOT_API_TOKEN']}",
                "Content-Type": "application/json"
            },
            data = json.dumps(predictions_data_dict))

{'message': 'ok'}

In [26]:
mlops.report_predictions_data?

[31mSignature:[39m
mlops.report_predictions_data(
    features_df=[38;5;28;01mNone[39;00m,
    predictions=[38;5;28;01mNone[39;00m,
    association_ids=[38;5;28;01mNone[39;00m,
    class_names=[38;5;28;01mNone[39;00m,
    deployment_id=[38;5;28;01mNone[39;00m,
    model_id=[38;5;28;01mNone[39;00m,
    skip_drift_tracking=[38;5;28;01mFalse[39;00m,
    skip_accuracy_tracking=[38;5;28;01mFalse[39;00m,
    batch_id=[38;5;28;01mNone[39;00m,
)
[31mDocstring:[39m
Report features and predictions to DataRobot MLOps for tracking and monitoring.

:param features_df: Dataframe containing features to track and monitor.  All the features
    in the dataframe are reported.  Omit the features from the dataframe that do not need
    reporting.
:type features_df: pandas dataframe, optional
:param predictions: List of predictions.  For Regression deployments, this is a 1D list
    containing prediction values.  For Classification deployments, this is a 2D list, in
    which the inne