In [None]:
!pip install datarobot requests-futures==1.0.1 -qqq

## Custom Model Assemlbly to Deployment

create an instance of the datarobot Client

For MTS, your `endpoint = "https://app.datarobot.com/api/v2"`.  You can get your DataRobot Api Token by going to the UI -> Clicking the avatar in the top right -> Developer Tools -> Crate New Key or use an existing one

In [None]:
import datarobot as dr
client = dr.Client(endpoint = os.environ["DATAROBOT_ENDPOINT"], token = os.environ["DATAROBOT_API_TOKEN"])

## Grab an appropriate environment

In [2]:
environment = dr.ExecutionEnvironment.list("scikit").pop()
environment

ExecutionEnvironment('[DataRobot] Python 3.9 Scikit-Learn Drop-In')

## Load the training dataset to DataRobot Registry

In [None]:
training_data = dr.Dataset.create_from_file("training_data.csv")


## Create an entry for the model in the Custom Model Workshop

In [4]:
cm = dr.CustomInferenceModel.create(
    "Python regression", 
    target_name="charges",
    target_type= dr.enums.TARGET_TYPE.REGRESSION)

## Add a version to the entry 

In [5]:
cmv = dr.CustomModelVersion.create_clean(cm.id, 
                                        base_environment_id = environment.id,
                                        folder_path = "./python-version", 
                                        training_dataset_id=training_data.id
                                        )

## or 
# custom_model_version = dr.CustomModelVersion.create_from_previous(..)

## Build the custom model environment if necessary

this would be required if you added a  `requirements.txt`


In [6]:
build = dr.CustomModelVersionDependencyBuild.start_build(cm.id, cmv.id, max_wait = 1200)

Check the build status

In [8]:
build.build_status

'success'

## Load the test dataset we will use to ensure DataRobot can make predictions with you model

In [3]:
cm_test_data = dr.Dataset.create_from_file("test_data.csv")

## Run Tests

In [4]:
custom_model_test = dr.CustomModelTest.create(
    cm.id, 
    cmv.id, 
    dataset_id = cm_test_data.id, 
    network_egress_policy = dr.enums.NETWORK_EGRESS_POLICY.PUBLIC)

## Check Test Status

In [5]:
custom_model_test.detailed_status

{'error_check': {'status': 'succeeded', 'message': ''},
  'message': 'Model cannot impute null values for the following columns:\nage, bmi, children'},
 'long_running_service': {'status': 'succeeded', 'message': ''},
 'side_effects': {'status': 'succeeded', 'message': ''},
 'prediction_verification_check': {'status': 'skipped', 'message': ''},
 'performance_check': {'status': 'skipped', 'message': ''},
 'stability_check': {'status': 'skipped', 'message': ''}}

In [6]:
custom_model_test.overall_status



## Register the Model

In [7]:
## register the custom model version in the dr model registry 
registered_model_version = dr.RegisteredModelVersion.create_for_custom_model_version(
    custom_model_version_id =  cmv.id, 
    name = "Python Regression", 
    description = "some description"
)

In [20]:
registered_model_version.build_status

Wait for the model package to be built

In [26]:
import time 
buildStatus = "inProgress"
while buildStatus == "inProgress":
    buildStatus = client.get(f"registeredModels/{registered_model_version.registered_model_id}/versions/{registered_model_version.id}").json()["buildStatus"]
    time.sleep(5)

## Deploy the model

In [13]:
## this will grab the first serverless prediction environment returned
prediction_environment = [ pe for pe in dr.PredictionEnvironment.list() if pe.platform == "datarobotServerless"].pop()

In [67]:
deployment = dr.Deployment.create_from_registered_model_version(
    registered_model_version.id,
    prediction_environment_id=prediction_environment.id,
    label = "Python Regression",
)

## Update Accuracy Tracking by defined association id

in our case, the association id will be a field in the data call `ASSOCIATION_ID`

In [68]:
deployment.update_association_id_settings(["ASSOCIATION_ID"], required_in_prediction_requests=False)

## Update Drift Tracking

In [69]:
deployment.update_drift_tracking_settings(target_drift_enabled=True, feature_drift_enabled=True)

## Realtime Scoring

In [None]:
import requests 
import uuid 
import pandas as pd 
import os
from io import StringIO

In [None]:
DEPLOYMENT_URL = client.endpoint + '/deployments/{deployment_id}/predictions'    # noqa
API_KEY = os.environ["DATAROBOT_API_TOKEN"]
df = pd.read_csv("test_data.csv").sample(n = 100)
df["ASSOCIATION_ID"] = [str(uuid.uuid1()) for i in range(df.shape[0])]

In [62]:
def score(data, DEPLOYMENT_ID): 

    headers = {
        'Content-Type': 'text/plain; charset=UTF-8',
        'Accept': "text/csv",
        'Authorization': 'Bearer {}'.format(os.environ["DATAROBOT_API_TOKEN"]),
    }

    url = DEPLOYMENT_URL.format(deployment_id=DEPLOYMENT_ID)
    params = {
        'maxExplanations': 3,
    }
    # Make API request for predictions
    predictions_response = requests.post(
        url,
        data=data.to_csv(index = False),
        headers=headers,
        params=params,
    )
    return predictions_response

In [63]:
python_preds = score(df, deployment.id)

In [64]:
pd.read_csv(StringIO(python_preds.content.decode()))

Unnamed: 0,charges_PREDICTION,EXPLANATION_1_FEATURE_NAME,EXPLANATION_1_STRENGTH,EXPLANATION_1_ACTUAL_VALUE,EXPLANATION_1_QUALITATIVE_STRENGTH,EXPLANATION_2_FEATURE_NAME,EXPLANATION_2_STRENGTH,EXPLANATION_2_ACTUAL_VALUE,EXPLANATION_2_QUALITATIVE_STRENGTH,EXPLANATION_3_FEATURE_NAME,EXPLANATION_3_STRENGTH,EXPLANATION_3_ACTUAL_VALUE,EXPLANATION_3_QUALITATIVE_STRENGTH,DEPLOYMENT_APPROVAL_STATUS
0,12379.470418,age,5273.522395,57,+++,smoker,-2771.419939,no,--,children,-688.398586,0,-,APPROVED
1,13634.823174,smoker,-7163.552877,no,---,age,3228.725241,52,++,children,1375.622394,3,++,APPROVED
2,3341.578530,smoker,-7986.832376,no,---,age,-3860.499073,22,--,children,-1935.580462,0,--,APPROVED
3,14022.648730,smoker,-6858.183988,no,---,age,4479.145863,54,+++,children,1235.436226,3,++,APPROVED
4,3054.940903,age,-4052.632783,21,---,smoker,-3129.920263,no,--,children,-2008.962518,0,--,APPROVED
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,7460.149538,smoker,-6056.440237,no,---,age,-2796.579113,22,--,children,2365.391561,3,++,APPROVED
96,3193.682712,smoker,-7848.451982,no,---,age,-4061.200152,18,--,children,-1991.432483,0,--,APPROVED
97,12407.629910,smoker,-7575.006055,no,---,age,5205.552307,55,+++,children,-706.759040,0,-,APPROVED
98,15257.219585,age,7256.454740,63,+++,smoker,-2160.300449,no,--,children,420.268642,1,+,APPROVED


In [65]:
import datarobot as dr
client = dr.Client(token = API_KEY, endpoint = "https://app.datarobot.com/api/v2")


def report_actuals(df, DEPLOYMENT_ID):
    deployment = dr.Deployment.get(DEPLOYMENT_ID)
    col_mapping =  {"charges": "actual_value", "ASSOCIATION_ID": "association_id"}
    df_actuals = df.rename(columns = col_mapping)
    deployment.submit_actuals(df_actuals[col_mapping.values()])

report_actuals(df, deployment.id)

## For Datasets larger than 10 MB there are two approaches

* use batch prediction api (this is preferred when you dataset is on the order of 500 MB or more)
* using Futures, chunk data locally and use realtime prediction api (a version of this could be used when datasets is between 10 MB and 500 MB)

### Using Batch

In [26]:
df = pd.read_csv("test_data.csv")
deployment = dr.Deployment.get("67a61c8ac68789e5dd8507dc")
## make the dataframe larger than 10 MB to demostrate
for i in range(13):
    df = pd.concat([df, df])
used_bytes = sys.getsizeof(df.to_csv(index = False)) 
    # logger.info(f"memory usage is {used_bytes} bytes")    
df["ASSOCIATION_ID"] = [str(uuid.uuid1()) for i in range(df.shape[0])]
used_bytes = sys.getsizeof(df.to_csv(index = False)) 
    # logger.info(f"memory usage is {used_bytes} bytes")    
used_megabytes = (used_bytes / 1e6)
print(used_megabytes)

84.975723


In [16]:
job = dr.BatchPredictionJob.score(
    deployment=deployment.id ,
    intake_settings={
        'type': 'localFile',
        'file': df,
    },
    output_settings={
        'type': 'localFile',
        #'path': output_file,
    },
    passthrough_columns=['region'], ## set passthrough_columns = "all" if you would like all columns returned
    download_timeout=60 * 60 * 24,
    download_read_timeout=60 * 60 * 24,
    upload_read_timeout=60 * 60 * 24,
    # If explanations are required, uncomment the line below
    # max_explanations=3,
    # If text explanations are required, uncomment the line below.
    # max_ngram_explanations='all',
    # Uncomment this for Prediction Warnings, if enabled for your deployment.
    # prediction_warning_enabled=True
)
result = job.get_result_when_complete(max_wait=60 * 60 * 24).decode()
df_pred = pd.read_csv(StringIO(result))

In [22]:
df_pred.head()

Unnamed: 0,charges_PREDICTION,DEPLOYMENT_APPROVAL_STATUS,region,ASSOCIATION_ID
0,15271.252333,APPROVED,northwest,76e4025a-e571-11ef-9977-aa34fbca8d65
1,13041.333337,APPROVED,northeast,76e4035e-e571-11ef-9977-aa34fbca8d65
2,6910.410814,APPROVED,southeast,76e4037c-e571-11ef-9977-aa34fbca8d65
3,6726.029534,APPROVED,northwest,76e403ae-e571-11ef-9977-aa34fbca8d65
4,4672.434832,APPROVED,northwest,76e403c2-e571-11ef-9977-aa34fbca8d65


### Using Futures

This is an illustrative example.  not production ready

In [27]:
import logging
import sys
from requests_futures.sessions import FuturesSession
from io import BytesIO, StringIO
import numpy as np
import pandas as pd
import datarobot as dr
import uuid
import time 
import os
logging.basicConfig(
    level=logging.INFO,
    stream=sys.stdout,
    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s',
)
logger = logging.getLogger(__name__)
API_KEY = os.environ["DATAROBOT_API_TOKEN"]
DEPLOYMENT_URL = 'https://app.datarobot.com/api/v2/deployments/{deployment_id}/predictions'    # noqa

In [28]:
def retry_any_bad_future(future):
    result = future.result() 
    status_code = result.status_code 
    if status_code < 300:
        return future 
    elif status_code == 503:
        # 503 is associated with DataRobot spinning up more inference resources.
        logger.warning(result.content.decode("UTF-8"))
        logger.warning("retrying request. hang tight")
        orig_request = result.request
        url = orig_request.url 
        headers = orig_request.headers 
        data = orig_request.body 
        new_future = session.post(**dict(url = url, headers = headers), data=data)
        return new_future
    else:
        return future
    
def wait_for_futures(futures):
    while not all( [r.done() for r in futures]):
        time.sleep(10)
        if np.mod(c, 500000) == 0:
            logger.info(f"number of completed batches {sum([1 if r.done() else 0 for r in futures])}")
            logger.info(f"still running. currently {(time.time_ns() - fstart) / 1e9} seconds and counting")
    successful = [future.result().status_code < 300 for future in futures]
    if all(successful):
        return futures
    else:
        futures = [ retry_any_bad_future(future) for future in futures ]
        return wait_for_futures(futures)

In [29]:
MAX_MB_PER_REQUEST = 10
used_bytes = sys.getsizeof(df.to_csv(index = False)) 
    # logger.info(f"memory usage is {used_bytes} bytes")    
used_megabytes = (used_bytes / 1e6)
realtime_batches = int(np.ceil(used_megabytes/MAX_MB_PER_REQUEST))
realtime_concurrency = 4
logger.info(f"dataset size is {used_megabytes:1.3} MB")
headers = {
    'Content-Type': 'text/plain; charset=UTF-8',
        # 'Content-Type': 'application/json; charset=UTF-8',
    'Authorization': 'Bearer {}'.format(API_KEY),
    "Accept": "text/csv"
}
params = {
    "passthroughColumns": ['region']
}
URL = DEPLOYMENT_URL.format(deployment_id = deployment.id)
bad_requests = []
if realtime_batches == 1:
    fstart = time.time_ns()
    result = requests.post(
        URL, 
        headers = headers, 
        data = df.to_csv(index = False),
        params = params
    )
    if result.status_code < 300:
        logger.info(f"prediction request status code: {result.status_code}")
    else: 
        logger.error(f"prediction request status code: {result.status_code}")
        logger.error(result.content.decode("UTF-8"))
        raise Exception(result.content.decode("UTF-8"))
    try:
        df_pred = pd.read_csv(BytesIO(result.content))
    except Exception as e:
        logger.error(e)
    fend = time.time_ns() 
else:
    logger.info(f"making concurrent calls for scoring")
    logger.info(f"batches: {realtime_batches}, max workers: {realtime_concurrency}")
    responses = []
    df_pred = []
    df_batches = np.array_split(df, realtime_batches)
    session = FuturesSession(max_workers=realtime_concurrency)
    url_payload = dict(url = URL, headers = headers, params = params)
    for b, df_batch in enumerate(df_batches):
        logger.info(f"scoring {df_batch.shape[0]} records in batch {b}")
        responses.append(session.post(**url_payload, data=df_batch.to_csv(index=False)))
    fstart = time.time_ns()
    c = 0
    responses = wait_for_futures(responses)
    logger.info(f"number of completed batches {sum([1 if r.done() else 0 for r in responses])}")
    
    for i, future in enumerate(responses):
        result = future.result() 
        status_code =result.status_code
        if status_code < 300:
            logger.info(f"batch {i} status code: {status_code}")
        else:
            logger.warning(f"batch {i} status code: {status_code}")
            logger.warning(result.content.decode("UTF-8"))
            bad_requests.append(future.result())
        raw_bytes = result.content
        # Get results in CSV format
        so = StringIO(raw_bytes.decode())
        # Display results
        df_pred.append( pd.read_csv(so) )               
    fend = time.time_ns() 
    try:
        df_pred = pd.concat(df_pred)
    except Exception as e:
        logger.error(e)
logger.info("="*30)
logger.info(f"number of seconds to complete scoring: {(fend - fstart)/1e9}")
logger.info("="*30)

2025-02-07 12:38:56,476 746344414.py:7 INFO dataset size is 85.0 MB
2025-02-07 12:38:56,476 746344414.py:39 INFO making concurrent calls for scoring
2025-02-07 12:38:56,477 746344414.py:40 INFO batches: 9, max workers: 4
2025-02-07 12:38:56,584 746344414.py:47 INFO scoring 121970 records in batch 0
2025-02-07 12:38:56,779 746344414.py:47 INFO scoring 121970 records in batch 1
2025-02-07 12:38:56,976 746344414.py:47 INFO scoring 121970 records in batch 2
2025-02-07 12:38:57,174 746344414.py:47 INFO scoring 121970 records in batch 3
2025-02-07 12:38:57,366 746344414.py:47 INFO scoring 121970 records in batch 4
2025-02-07 12:38:57,569 746344414.py:47 INFO scoring 121970 records in batch 5
2025-02-07 12:38:57,771 746344414.py:47 INFO scoring 121970 records in batch 6
2025-02-07 12:38:57,972 746344414.py:47 INFO scoring 121969 records in batch 7
2025-02-07 12:38:58,171 746344414.py:47 INFO scoring 121969 records in batch 8
2025-02-07 12:39:08,373 1900053571.py:23 INFO number of completed ba

In [30]:
df_pred.head()

Unnamed: 0,charges_PREDICTION,DEPLOYMENT_APPROVAL_STATUS,region,ASSOCIATION_ID
0,15271.252333,APPROVED,northwest,421623ec-e57a-11ef-9977-aa34fbca8d65
1,13041.333337,APPROVED,northeast,4216246e-e57a-11ef-9977-aa34fbca8d65
2,6910.410814,APPROVED,southeast,42162482-e57a-11ef-9977-aa34fbca8d65
3,6726.029534,APPROVED,northwest,42162496-e57a-11ef-9977-aa34fbca8d65
4,4672.434832,APPROVED,northwest,421624a0-e57a-11ef-9977-aa34fbca8d65
