In [None]:
# Import pieces from codeflare-sdk
from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration
from codeflare_sdk.cluster.auth import TokenAuthentication
from codeflare_sdk.job.jobs import DDPJobDefinition

In [None]:
# Create authentication object for oc user permissions
auth = TokenAuthentication(
    token = "sha256~6b0qBf0X1S2e-IrV4mhtU1fLdwxlm70qbrsG5QK8jK0",
    server = "https://api.mini2.mydomain.com:6443",
    skip_tls=True
)
auth.login()

Here, we want to define our cluster by specifying the resources we require for our batch workload. Below, we define our cluster object (which generates a corresponding AppWrapper).

In [None]:
# Create our cluster and submit appwrapper (reduce specs as desired)
cluster = Cluster(ClusterConfiguration(
    name='mnisttest',
    namespace='batch-mnist',
    image="quay.io/thinkahead/base:ray2.1.0-py38-gpu-pytorch1.12.0cu117-20230419-1",
    min_worker=2,
    max_worker=3,
    min_cpus=8,
    max_cpus=8,
    min_memory=16,
    max_memory=16,
    gpu=1,
    instascale=False # Can be set to false if scaling not needed
))

Next, we want to bring our cluster up, so we call the `up()` function below to submit our cluster AppWrapper yaml onto the MCAD queue, and begin the process of obtaining our resource cluster.

In [None]:
# Bring up the cluster
cluster.up()

Now, we want to check on the status of our resource cluster, and wait until it is finally ready for use.

In [None]:
cluster.status()

In [None]:
cluster.wait_ready()

In [None]:
cluster.status()

Let's quickly verify that the specs of the cluster are as expected.

In [None]:
cluster.details()

Now that our resource cluster is ready, we can directly submit our batch job (model training on two workers with four gpus each) to the cluster via torchx.

In [None]:
jobdef = DDPJobDefinition(
    name="mnisttest",
    script="mnist.py",
    #scheduler_args={"requirements": "requirements.txt"}
)
job = jobdef.submit(cluster)

Now we can go ahead and look at the status and logs of our batch job.

In [None]:
job.status()

In [None]:
print(job.logs())

Finally, we bring our resource cluster down and release/terminate the associated resources, bringing everything back to the way it was before our cluster was brought up.

In [None]:
cluster.down()

In [None]:
auth.logout()

# Fetch the Mnist_784 dataset

In [None]:
from sklearn.datasets import fetch_openml
import numpy as np
import json
import requests
X, y = fetch_openml('mnist_784', return_X_y=True, parser='auto')

In [None]:
#model_name='mnist1' # torch.onnx.export
#model_name='mnist2' # Pytorch Lightning model.to_onnx
#model_name='mnist3' # torch.onnx.export with batch
model_name='mnist4' # Pytorch Lightning model.to_onnx with batch

model_file_name=model_name+".onnx"

In [None]:
import onnx
import onnxruntime
import torch
import numpy as np
session = onnxruntime.InferenceSession(model_file_name, None)
input_name = session.get_inputs()[0].name
print("input name", input_name)
input_shape = session.get_inputs()[0].shape
print("input shape", input_shape)
input_type = session.get_inputs()[0].type
print("input type", input_type)
print([i.name for i in session.get_outputs()])
output_name = session.get_outputs()[0].name
print("output name", output_name)
output_shape = session.get_outputs()[0].shape
print("output shape", output_shape)
output_type = session.get_outputs()[0].type
print("output type", output_type)

# Batch request shape (batch_size,1,28,28)

In [None]:
batch=X[0:5]
actual=y[0:5]
arr=np.array(batch)
result = session.run([i.name for i in session.get_outputs()], {input_name:(arr.reshape(len(batch),1,28,28).astype(np.float32))})

In [None]:
print(np.argmax(result[0],axis=1),actual.values)

# Single request shape (1,1,28,28) - Loop through all samples

In [None]:
count=0
for i in X.index:
    arr=np.array(X.iloc[i])
    result = session.run([i.name for i in session.get_outputs()], {input_name:(arr.reshape(1,*input_shape[1:]).astype(np.float32))})
    expected=y.iloc[i]
    actual=np.argmax(result)
    if str(actual)!=expected: count+=1
    #print("Expected",expected,"Actual",str(actual)==expected)
print('Failed percent:',count/len(X))

# Copy onnx model to S3 bucket

In [None]:
import os
import boto3
from boto3 import session
key_id = os.environ.get('AWS_ACCESS_KEY_ID')
secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
endpoint_url = os.environ.get('AWS_S3_ENDPOINT')
session = boto3.session.Session(aws_access_key_id=key_id, aws_secret_access_key=secret_key)
s3_client = boto3.client('s3', aws_access_key_id=key_id, aws_secret_access_key=secret_key,endpoint_url=endpoint_url,verify=False)
buckets=s3_client.list_buckets()
for bucket in buckets['Buckets']: print(bucket['Name'])
s3_client.upload_file(model_file_name, bucket['Name'],model_file_name)
[item.get("Key") for item in s3_client.list_objects_v2(Bucket=bucket['Name']).get("Contents")]

Deploy the model in your Data Science project using RHODS UI under "Models and model servers"

# Submit HTTP REST request to the ModelMesh for a batch and receive response from model

In [None]:
model_name="mnist4"

import requests
import json
URL='http://modelmesh-serving.huggingface.svc.cluster.local:8008/v2/models/'+model_name+'/infer' # underscore characters are removed
headers = {}
payload = {
        "inputs": [{ "name": "input_0", "shape": (5,1,28,28), "datatype": "FP32", "data": X.loc[0:4].values.flatten().tolist()}]
    }
#print(payload)
headers = {"content-type": "application/json"}
res = requests.post(URL, json=payload, headers=headers)
print(res)
print(res.text)

In [None]:
actual=y[0:5].values.tolist()
expected=np.argmax(np.array(res.json()['outputs'][0]['data']).reshape(res.json()['outputs'][0]['shape']),axis=1)
print(actual,expected)

# Submit HTTP REST request to the ModelMesh for single sample and receive response from model

In [None]:
model_name="mnist1"

import requests
import json
URL='http://modelmesh-serving.huggingface.svc.cluster.local:8008/v2/models/'+model_name+'/infer' # underscore characters are removed
headers = {}
payload = {
        "inputs": [{ "name": "input_0", "shape": (1,1,28,28), "datatype": "FP32", "data": X.iloc[0].values.tolist()}]
    }
headers = {"content-type": "application/json"}
res = requests.post(URL, json=payload, headers=headers)
print(res)
print(res.text)

In [None]:
print("Expected",y.iloc[0],"Actual",np.argmax(res.json()['outputs'][0]['data']))