In [2]:
#****************************************************************************
# (C) Cloudera, Inc. 2020-2023
#  All rights reserved.
#
#  Applicable Open Source License: GNU Affero General Public License v3.0
#
#  NOTE: Cloudera open source products are modular software products
#  made up of hundreds of individual components, each of which was
#  individually copyrighted.  Each Cloudera open source product is a
#  collective work under U.S. Copyright Law. Your license to use the
#  collective work is as provided in your written agreement with
#  Cloudera.  Used apart from the collective work, this file is
#  licensed for your use pursuant to the open source license
#  identified above.
#
#  This code is provided to you pursuant a written agreement with
#  (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
#  this code. If you do not have a written agreement with Cloudera nor
#  with an authorized and properly licensed third party, you do not
#  have any rights to access nor to use this code.
#
#  Absent a written agreement with Cloudera, Inc. (“Cloudera”) to the
#  contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
#  KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
#  WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
#  IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
#  FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
#  AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
#  ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
#  OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
#  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
#  CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
#  RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
#  BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
#  DATA.
#
# #  Author(s): Paul de Fusco
#***************************************************************************/


In [3]:
!pip3 install open-inference-openapi
!pip3 install openai



## Deploy Model Endpoint from AI Registry

In [4]:
import os
import httpx
import ssl

CAII_DOMAIN = os.envion["INFERENCE_SERVICE_DOMAIN"] # Obtain from the Inference Service UI.
ENDPOINT_NAME = "my-llama3b-instruct" # Arbitrarily assigned by user.
MODEL_ID = "zdr1-6xe2-ark0-uvic" # Obtain from the AI Registry UI.
MODEL_VERSION = 1 # Obtain from the AI Registry UI. Default to 1 if you've only registered the model once.
ENDPOINT_NAME = "my-llama3b-instruct-endpoint" # Arbitrarily assigned by user.
CDP_TOKEN = os.environ["CDP_TOKEN"]

In [3]:
def deploy_model_to_caii(caii_domain, cdp_token, model_id, model_version, endpoint_name):
    # Method to deploy an AI Registry model to AI Inference Service

    REQUESTS_CA_BUNDLE="/etc/ssl/certs/ca-certificates.crt"    
    headers = {'Authorization': 'Bearer ' + cdp_token,
           'Content-Type': 'application/json'}

    ctx = ssl.create_default_context(cafile=os.environ.get("REQUESTS_CA_BUNDLE"))
    client = httpx.Client(headers=headers, verify=ctx)
    
    r = client.get(os.environ["MODEL_REGISTRY_URL"]+'/api/v2/models')
    
    # Deploy the model endpoint. Note that "serving-default" is the only valid
    # namespace. Adjust resources and autoscaling parameters as you need. Also note
    # that we're not requesting a GPU for the model deployment. If your model requires GPUs,
    # you can add it to the "resources" section, e.g.
    # "resources": {
    #     "num_gpus": "2",
    #     "req_cpu": "4",
    #     "req_memory": "8Gi"
    #  }
    #
    deploy_payload = {
        "namespace": "serving-default",
        "name": f"{endpoint_name}",
        "source": {
            "registry_source": {
                "model_id": f"{model_id}",
                "version": f"{model_version}"
            }
        },
        "resources": {
            "req_cpu": "2",
            "req_memory": "4Gi",
            "num_gpus": "1"
        },
        "autoscaling": {
            "min_replicas": "1",
            "max_replicas": "2"
        }
    }
    try:
        response = client.post(deploy_url, json=deploy_payload)
        response.raise_for_status()
        print(f"Deployed {endpoint_name} successfully!")
    except httpx.HTTPStatusError as e:
        print(f"HTTP {e.response.status_code}: {e.response.text}")
    except httpx.RequestError as e:
        print(f"Error deploying {endpoint_name}: {e}")

In [42]:
deploy_model_to_caii(CAII_DOMAIN, CDP_TOKEN, MODEL_ID, MODEL_VERSION, ENDPOINT_NAME)

Deployed my-llama3b-instruct-endpoint successfully!


## Validate Model Endpoint is Ready

In [2]:
def endpoint_is_ready(caii_domain, cdp_token, endpoint_name):
    
    headers = {'Authorization': 'Bearer ' + cdp_token,
           'Content-Type': 'application/json'}
    
    REQUESTS_CA_BUNDLE="/etc/ssl/certs/ca-certificates.crt"
        
    ctx = ssl.create_default_context(cafile=os.environ.get("REQUESTS_CA_BUNDLE"))

    url = f"https://{caii_domain}/api/v1alpha1/describeEndpoint"
    payload = {"namespace": "serving-default", "name": f"{endpoint_name}"}
    
    client = httpx.Client(headers=headers, verify=ctx)
    
    r = client.get(os.environ["MODEL_REGISTRY_URL"]+'/api/v2/models')
    
    try:
        response = client.post(url, json=payload)
        response.raise_for_status()
        return response.json()['status']['active_model_state'] == 'Loaded'
    except httpx.HTTPStatusError as e:
        print(f"HTTP {e.response.status_code}: {e.response.text}")
    except httpx.RequestError as e:
        print(f"Error describing {endpoint_name}: {e}")

In [3]:
endpoint_is_ready(CAII_DOMAIN, CDP_TOKEN, ENDPOINT_NAME)

True

## Test Inference

In [5]:
from openai import OpenAI
import json

MODEL_ID = "meta-llama/Llama-3.1-8B-Instruct"

client = OpenAI(
	base_url="https://ml-f884fd9f-fe8.apps.cdppvc.ares.olympus.cloudera.com/namespaces/serving-default/endpoints/my-llama3b-instruct-endpoint/openai/v1",
	api_key=os.environ["CDP_TOKEN"],
)

completion = client.chat.completions.create(
	model=MODEL_ID,
	messages=[{"role": "user", "content": "Write a one-sentence definition of GenAI."}],
	temperature=0.2,
	top_p=0.7,
	max_tokens=1024,
	stream=True
)

for chunk in completion:
	if chunk.choices[0].delta.content is not None:
		print(chunk.choices[0].delta.content, end="")

APIConnectionError: Connection error.