In [1]:
#****************************************************************************
# (C) Cloudera, Inc. 2020-2023
#  All rights reserved.
#
#  Applicable Open Source License: GNU Affero General Public License v3.0
#
#  NOTE: Cloudera open source products are modular software products
#  made up of hundreds of individual components, each of which was
#  individually copyrighted.  Each Cloudera open source product is a
#  collective work under U.S. Copyright Law. Your license to use the
#  collective work is as provided in your written agreement with
#  Cloudera.  Used apart from the collective work, this file is
#  licensed for your use pursuant to the open source license
#  identified above.
#
#  This code is provided to you pursuant a written agreement with
#  (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
#  this code. If you do not have a written agreement with Cloudera nor
#  with an authorized and properly licensed third party, you do not
#  have any rights to access nor to use this code.
#
#  Absent a written agreement with Cloudera, Inc. (“Cloudera”) to the
#  contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
#  KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
#  WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
#  IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
#  FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
#  AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
#  ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
#  OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
#  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
#  CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
#  RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
#  BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
#  DATA.
#
# #  Author(s): Paul de Fusco
#***************************************************************************/

In [5]:
!pip3 install open-inference-openapi 
#!pip3 install httpx uvicorn



### Deploy the Model to CAI Inference Service

In [6]:
#cdp iam generate-workload-auth-token --workload-name DE

In [7]:
import os
myToken = os.environ["CDP_TOKEN"]
modelId = "tnlc-1e37-szwj-cz1j"

In [8]:
import json
import httpx

#JWT = json.load(open("/tmp/jwt"))["access_token"]
JWT = myToken
headers = {'Authorization': 'Bearer ' + JWT,
           'Content-Type': 'application/json'}

httpx_client = httpx.Client(headers=headers)

In [9]:
CAII_DOMAIN_URL = 'https://ml-684f8ec6-5d9.pdf-jul.a465-9q4k.cloudera.site'

deploy_payload = {
 "namespace": "serving-default",
 "name": "fraud-model-onnx-xgboost",
 "source": {
   "registry_source": {
      "model_id": modelId,
      "version": 1
    }
  },
 "resources": {
    "req_cpu": "2",
    "req_memory": "4Gi"
  },
  "api_standard": "oip",
  "has_chat_template": False,
  "metric_format": "triton",
  "task": "INFERENCE",
  "instance_type": "m6a.8xlarge",
 "autoscaling": {
    "min_replicas": "1",
    "max_replicas": "2"
  }
}

r = httpx_client.post(CAII_DOMAIN_URL+'/api/v1alpha1/deployEndpoint', json=deploy_payload)
r

<Response [200 OK]>

In [10]:
CAII_DOMAIN_URL = 'https://ml-684f8ec6-5d9.pdf-jul.a465-9q4k.cloudera.site'
# Wait for the endpoint to be ready
describe_payload = {
    "namespace": "serving-default",
    "name": "fraud-model-onnx-xgboost"
}

r = httpx_client.post(CAII_DOMAIN_URL+'/api/v1alpha1/describeEndpoint', json=describe_payload)
status = r.json()['status']
response = json.dumps(r.json(), indent=2)
print(response)

# Want target_model_state to get to 'Loaded'
status['target_model_state']

{
  "namespace": "serving-default",
  "name": "fraud-model-onnx-xgboost",
  "url": "",
  "conditions": [
    {
      "type": "LatestDeploymentReady",
      "status": "Unknown",
      "severity": "Info",
      "last_transition_time": "1754008840",
      "reason": "PredictorConfigurationReady not ready",
      "message": ""
    },
    {
      "type": "PredictorConfigurationReady",
      "status": "Unknown",
      "severity": "Info",
      "last_transition_time": "1754008840",
      "reason": "",
      "message": ""
    },
    {
      "type": "PredictorReady",
      "status": "Unknown",
      "severity": "",
      "last_transition_time": "1754008840",
      "reason": "RevisionMissing",
      "message": "Configuration \"fraud-model-onnx-xgboost-predictor\" is waiting for a Revision to become ready."
    },
    {
      "type": "PredictorRouteReady",
      "status": "Unknown",
      "severity": "Info",
      "last_transition_time": "1754008840",
      "reason": "RevisionMissing",
      "mess

'Pending'

In [11]:
#!pip3 install open-inference-openapi tritonclient[all]

In [16]:
# Get BASE_URL of the model endpoint. For predictive models, this is the endpoint URL preceding '/v2'
from urllib.parse import urlparse

r = httpx_client.post(CAII_DOMAIN_URL+'/api/v1alpha1/describeEndpoint', json=describe_payload)
url = r.json()['url']

# Remove last 4 path components to get the base url
parsed_url = urlparse(url)
path_parts = parsed_url.path.split("/")
new_path = "/".join(path_parts[:-4])
modified_url = parsed_url._replace(path=new_path).geturl()
modified_url

'https://ml-684f8ec6-5d9.pdf-jul.a465-9q4k.cloudera.site/namespaces/serving-default/endpoints/fraud-model-onnx-xgboost'

In [17]:
from open_inference.openapi.client import OpenInferenceClient, InferenceRequest
import httpx
import requests
import json

httpx_client = httpx.Client(headers=headers)
client = OpenInferenceClient(base_url=modified_url, httpx_client=httpx_client)
client.check_server_readiness()
metadata = client.read_model_metadata(modelId)
metadata_str = json.dumps(json.loads(metadata.json()), indent=2)
print(metadata_str)

{
  "name": "tnlc-1e37-szwj-cz1j",
  "versions": [
    "1"
  ],
  "platform": "onnxruntime_onnx",
  "inputs": [
    {
      "name": "input",
      "datatype": "FP32",
      "shape": [
        -1,
        14
      ]
    }
  ],
  "outputs": [
    {
      "name": "probabilities",
      "datatype": "FP32",
      "shape": [
        -1,
        2
      ]
    },
    {
      "name": "label",
      "datatype": "INT64",
      "shape": [
        -1,
        1
      ]
    }
  ]
}


In [18]:
import os, warnings, sys, logging
import mlflow
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, recall_score
import mlflow.sklearn
from xgboost import XGBClassifier
from datetime import date
import cml.data_v1 as cmldata
import pyspark.pandas as ps

USERNAME = os.environ["PROJECT_OWNER"]
DBNAME = os.environ["DBNAME_PREFIX"]+"_"+USERNAME
CONNECTION_NAME = os.environ["SPARK_CONNECTION_NAME"]

DATE = date.today()
EXPERIMENT_NAME = "xgb-cc-fraud-{0}".format(USERNAME)

mlflow.set_experiment(EXPERIMENT_NAME)

conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

df_from_sql = ps.read_table('{0}.transactions_{1}'.format(DBNAME, USERNAME))
df = df_from_sql.to_pandas()
#df = df.drop(columns=["job"])

test_size = 0.3
X_train, X_test, y_train, y_test = train_test_split(df.drop("fraud_trx", axis=1), df["fraud_trx"], test_size=test_size)

Spark Application Id:spark-1eb74276fed94c438efa1c670bfed719


                                                                                

In [21]:
import time

data = X_train.iloc[0].values.tolist()
payload = {
    "parameters": {
        "content_type": "pd"
    },
    "inputs": [
        {
            "name": "input",
            "datatype": "FP32",
            "shape": [1, 14],
            "data": data
        }
    ]
}
start = time.time()
pred = client.model_infer(
    modelId,
    request=InferenceRequest(
        inputs=payload["inputs"]
    ),
)

end = time.time()

json_resp_str = json.dumps(json.loads(pred.json()), indent=2)
print(json_resp_str)
print(f"latency={end-start}")

{
  "model_name": "tnlc-1e37-szwj-cz1j",
  "model_version": "1",
  "outputs": [
    {
      "name": "label",
      "shape": [
        1,
        1
      ],
      "datatype": "INT64",
      "data": [
        0.0
      ]
    },
    {
      "name": "probabilities",
      "shape": [
        1,
        2
      ],
      "datatype": "FP32",
      "data": [
        0.9340843558311462,
        0.06591564416885376
      ]
    }
  ]
}
latency=0.04808354377746582
