# Iris Classification with PyTorch and Cloudera AI Inference

This notebook demonstrates a complete machine learning pipeline:
1. **Model Training**: Train an iris classifier using PyTorch with MLflow tracking
2. **Model Export**: Convert PyTorch model to ONNX format for deployment
3. **Model Registry**: Log trained model to AI Registry
4. **CDP CLI & Auth Setup**: Install CDP CLI in a project, configure it so that control plane CLIs can be run
5. **AI Registry & Inference APIs**: Explore some AI Registry API v2 and AI Inference management APIs 
6. **Deploy Model**: Deploy the trained model to AI Inference as an endpoint
7. **Batch Inference**: Run inference against deployed models on Cloudera AI Inference

## Prerequisites
- Cloudera AI Workbench
- Cloudera AI Registry
- Cloudera AI Inference service
- Required Python packages installed

## Part 1: Model Training and Export

In [None]:
# Import required libraries
import os
import subprocess
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report
import mlflow
import mlflow.pytorch
import mlflow.onnx
import numpy as np
import pandas as pd
import onnx
import onnxruntime as ort
import matplotlib.pyplot as plt
import seaborn as sns

# Import inference-related libraries
from open_inference.openapi.client import OpenInferenceClient, InferenceRequest
import httpx
import json
import time
from urllib.parse import urlparse, urlunparse
from typing import Optional, Dict, Any, List

# Used while configuring CDP credentials config
import getpass

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

print("Libraries imported successfully!")

### Define the Neural Network Architecture

In [None]:
class IrisClassifier(nn.Module):
    def __init__(self, input_size=4, hidden_size=16, num_classes=3):
        super(IrisClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, num_classes)
        
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc3(x)
        return x

print("Model architecture defined!")

### Data Preparation

In [None]:
def prepare_data():
    """Load and prepare the iris dataset"""
    iris = load_iris()
    X, y = iris.data, iris.target
    
    print(f"Dataset shape: {X.shape}")
    print(f"Classes: {iris.target_names}")
    print(f"Features: {iris.feature_names}")
    
    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    # Scale the features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Convert to PyTorch tensors
    X_train_tensor = torch.FloatTensor(X_train_scaled)
    X_test_tensor = torch.FloatTensor(X_test_scaled)
    y_train_tensor = torch.LongTensor(y_train)
    y_test_tensor = torch.LongTensor(y_test)
    
    return X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor, scaler, iris

# Prepare the data
X_train, X_test, y_train, y_test, scaler, iris_data = prepare_data()

### Visualize the Dataset

In [None]:
# Create a DataFrame for visualization
iris_df = pd.DataFrame(iris_data.data, columns=iris_data.feature_names)
iris_df['target'] = iris_data.target
iris_df['species'] = iris_df['target'].map(dict(enumerate(iris_data.target_names)))

# Create pairplot
plt.figure(figsize=(12, 8))
sns.pairplot(iris_df, hue='species', height=2.5)
plt.suptitle('Iris Dataset - Feature Relationships', y=1.02)
plt.show()

# Class distribution
plt.figure(figsize=(8, 6))
sns.countplot(data=iris_df, x='species')
plt.title('Class Distribution')
plt.show()

### Training Functions

In [None]:
def train_model(model, train_loader, criterion, optimizer, num_epochs=100):
    """Train the model and return training history"""
    model.train()
    train_losses = []
    train_accuracies = []
    
    for epoch in range(num_epochs):
        epoch_loss = 0.0
        correct = 0
        total = 0
        
        for batch_X, batch_y in train_loader:
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            
            epoch_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += batch_y.size(0)
            correct += (predicted == batch_y).sum().item()
        
        avg_loss = epoch_loss / len(train_loader)
        accuracy = 100 * correct / total
        
        train_losses.append(avg_loss)
        train_accuracies.append(accuracy)
        
        if (epoch + 1) % 20 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')
    
    return train_losses, train_accuracies

def evaluate_model(model, test_loader):
    """Evaluate the model on test data"""
    model.eval()
    all_predictions = []
    all_targets = []
    
    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            outputs = model(batch_X)
            _, predicted = torch.max(outputs, 1)
            all_predictions.extend(predicted.cpu().numpy())
            all_targets.extend(batch_y.cpu().numpy())
    
    accuracy = accuracy_score(all_targets, all_predictions)
    return accuracy, all_predictions, all_targets

print("Training functions defined!")

### Model Training with MLflow Tracking

In [None]:
# Set MLflow experiment
mlflow.set_experiment("iris_classification_notebook")

# Hyperparameters
hidden_size = 16
learning_rate = 0.01
num_epochs = 100
batch_size = 16

print(f"Training configuration:")
print(f"Hidden size: {hidden_size}")
print(f"Learning rate: {learning_rate}")
print(f"Number of epochs: {num_epochs}")
print(f"Batch size: {batch_size}")

In [None]:
with mlflow.start_run() as run:
    print("Starting MLflow experiment...")
    
    # Log hyperparameters
    mlflow.log_param("hidden_size", hidden_size)
    mlflow.log_param("learning_rate", learning_rate)
    mlflow.log_param("num_epochs", num_epochs)
    mlflow.log_param("batch_size", batch_size)
    
    # Create data loaders
    train_dataset = TensorDataset(X_train, y_train)
    test_dataset = TensorDataset(X_test, y_test)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    # Initialize model, loss function, and optimizer
    model = IrisClassifier(input_size=4, hidden_size=hidden_size, num_classes=3)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    print("\nTraining model...")
    # Train the model
    train_losses, train_accuracies = train_model(
        model, train_loader, criterion, optimizer, num_epochs
    )
    
    # Evaluate the model
    print("\nEvaluating model...")
    test_accuracy, predictions, targets = evaluate_model(model, test_loader)
    
    # Log metrics
    mlflow.log_metric("final_train_accuracy", train_accuracies[-1])
    mlflow.log_metric("test_accuracy", test_accuracy)
    mlflow.log_metric("final_train_loss", train_losses[-1])
    
    # Log training curves
    for epoch, (loss, acc) in enumerate(zip(train_losses, train_accuracies)):
        mlflow.log_metric("train_loss", loss, step=epoch)
        mlflow.log_metric("train_accuracy", acc, step=epoch)
    
    print(f"\nFinal Test Accuracy: {test_accuracy:.4f}")
    print(f"MLflow Run ID: {run.info.run_id}")
    
    # Store the run_id for later use
    training_run_id = run.info.run_id

### Visualize Training Progress

In [None]:
# Plot training curves
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

# Loss curve
ax1.plot(train_losses)
ax1.set_title('Training Loss')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.grid(True)

# Accuracy curve
ax2.plot(train_accuracies)
ax2.set_title('Training Accuracy')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy (%)')
ax2.grid(True)

plt.tight_layout()
plt.show()

### Model Evaluation and Classification Report

In [None]:
# Create and display classification report
class_names = iris_data.target_names
report = classification_report(targets, predictions, target_names=class_names)
print("Classification Report:")
print(report)

# Confusion Matrix
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(targets, predictions)

plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=class_names, yticklabels=class_names)
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

### Convert Model to ONNX Format

In [None]:
def convert_to_onnx(model, input_size=(1, 4), onnx_path="iris_model.onnx"):
    """Convert PyTorch model to ONNX format"""
    model.eval()
    
    # Create dummy input for tracing
    dummy_input = torch.randn(input_size)
    
    # Export to ONNX
    torch.onnx.export(
        model,
        dummy_input,
        onnx_path,
        export_params=True,
        opset_version=11,
        do_constant_folding=True,
        input_names=['input'],
        output_names=['output'],
        dynamic_axes={
            'input': {0: 'batch_size'},
            'output': {0: 'batch_size'}
        }
    )
    
    print(f"Model exported to ONNX format: {onnx_path}")
    return onnx_path

def verify_onnx_model(onnx_path, test_data):
    """Verify ONNX model works correctly"""
    # Load ONNX model
    onnx_model = onnx.load(onnx_path)
    onnx.checker.check_model(onnx_model)
    
    # Create ONNX Runtime session
    ort_session = ort.InferenceSession(onnx_path)
    
    # Test with a small batch
    test_input = test_data[:5].numpy()  # Take first 5 samples
    ort_inputs = {ort_session.get_inputs()[0].name: test_input}
    ort_outputs = ort_session.run(None, ort_inputs)
    
    print(f"ONNX model verification successful. Output shape: {ort_outputs[0].shape}")
    return True

# Convert and verify ONNX model
onnx_path = "iris_model.onnx"
convert_to_onnx(model, input_size=(1, 4), onnx_path=onnx_path)
verify_onnx_model(onnx_path, X_test)

### Log Models to Cloudera AI Registry

In [None]:
# Prepare input examples for model logging
# Use a small sample of test data as input example
input_example = X_test[:3].numpy()  # Convert to numpy array for MLflow
print(f"Input example shape: {input_example.shape}")
print(f"Input example (first sample): {input_example[0]}")

# Optionally, we can also create a model signature manually
import mlflow.types.schema as schema
from mlflow.models.signature import infer_signature

# Create model prediction for signature inference
model.eval()
with torch.no_grad():
    example_output = model(X_test[:3]).numpy()

# Infer signature from input and output
signature = infer_signature(input_example, example_output)
print(f"Model signature: {signature}")
print("✓ Input examples and signature prepared for MLflow logging")

### Prepare Input Examples for Model Logging

To avoid MLflow warnings, we need to provide input examples when logging models. This helps MLflow automatically infer the model signature.

In [None]:
REGISTERED_MODEL_NAME_ONNX = "iris_onnx_classifier_notebook"
with mlflow.start_run() as run:
    # Log the PyTorch model to MLflow with input example and signature
    print("Logging PyTorch model to MLflow...")
    mlflow.pytorch.log_model(
        model, 
        "iris_classifier_pytorch",
        registered_model_name="iris_pytorch_classifier_notebook",
        input_example=input_example,
        signature=signature
    )
    
    # Log the ONNX model to MLflow with input example and signature  
    print("Logging ONNX model to MLflow...")
    onnx_model = onnx.load(onnx_path)
    mlflow.onnx.log_model(
        onnx_model,
        "iris_classifier_onnx",
        registered_model_name=f"{REGISTERED_MODEL_NAME_ONNX}",
        input_example=input_example,
        signature=signature
    )
    
    print("Models logged successfully to MLflow!")
    print(f"MLflow Run ID: {run.info.run_id}")
    print("✓ Models logged with input examples and signatures!")

## Part 2: Deploy Model to Cloudera AI Inference

This section demonstrates how to configure CDP credentials and deploy the trained model from part 1 to Cloudera AI Inference

**Note**: The following steps assume that the cdpcli package is already installed, but not configured.

In [None]:
# Create CDP configuration

def configure_cdp(varname, value):
    command = ["cdp", "configure", "set", varname, value]
    result = subprocess.run(command, capture_output=True, text=True)

    if result.returncode != 0:
        print("cdp command failed.")
        print("Error:")
        print(result.stderr)
    return None
print("CDP config function defined!")

**Note**: For the next step you would need your CDP credentials, which you can get from your profile page in the CDP control plane UI.

In [None]:
# Configure CDP control plane credentials


access_key_id = getpass.getpass("Enter your CDP access key ID: ")
private_key = getpass.getpass("Enter your CDP private key: ")
configure_cdp("cdp_access_key_id", access_key_id)
configure_cdp("cdp_private_key", private_key)

In [None]:
# Get AI Registry URL and obtain details of the model we registered in Part 1
# You can of course set this manually

def get_registry_endpoint():
    command = ["cdp", "ml", "list-model-registries"]
    result = subprocess.run(command, capture_output=True, text=True)

    if result.returncode != 0:
        print("cdp command failed.")
        print("Error:")
        print(result.stderr)
        return None
    try:
        return json.loads(result.stdout)['modelRegistries'][0]['domain']
    except (json.JSONDecodeError, KeyError) as e:
        print(f"Error: {e}")
        return None

REGISTRY_ENDPOINT = get_registry_endpoint()
print(REGISTRY_ENDPOINT)

In [None]:
# Obtain CDP_TOKEN JWT using CDP CLI
def get_ums_jwt_token():
    command = ["cdp", "iam", "generate-workload-auth-token",
               "--workload-name", "DE"]
    result = subprocess.run(command, capture_output=True, text=True)

    if result.returncode != 0:
        print("cdp command failed.")
        print("Error:")
        print(result.stderr)
        return None

    try:
        return json.loads(result.stdout)['token']
    except (json.JSONDecodeError, KeyError) as e:
        print(f"Error: {e}")
        return None

print("Defined function to get CDP_TOKEN!")

In [None]:
# Functions to retrieve model details. These can be obtained from the AI Registry UI as well
def get_model_details(registry_endpoint, model_name, token):
    headers = {'Authorization': 'Bearer ' + token,
           'Content-Type': 'application/json'}
    client = httpx.Client(headers=headers)
    url = registry_endpoint+'/api/v2/models'
    params = {
        'name': model_name,
    }
    result = client.get(url, params=params)
    try:
        return next((element for element in result.json()['models'] if element.get('name') == model_name), None)
    except (json.JSONDecodeError, KeyError) as e:
        print(f"Error: {e}")
        return None

def get_most_recent_model_version(registry_endpoint, model_id, token):
    headers = {'Authorization': 'Bearer ' + token,
           'Content-Type': 'application/json'}
    client = httpx.Client(headers=headers)
    url = registry_endpoint+'/api/v2/models/'+ model_id
    result = client.get(url)
    try:
        all_versions = result.json()
        num_versions = len(all_versions['model_versions'])
        return all_versions['model_versions'][num_versions-1]['version']
    except (json.JSONDecodeError) as e:
        print(f"Error: {e}")
        return None
CDP_TOKEN = get_ums_jwt_token()
#model_details = get_model_details(REGISTRY_ENDPOINT, REGISTERED_MODEL_NAME_ONNX, CDP_TOKEN)
model_details = get_model_details(REGISTRY_ENDPOINT, "iris_onnx_classifier_notebook", CDP_TOKEN)
version = get_most_recent_model_version(REGISTRY_ENDPOINT, model_details['id'], CDP_TOKEN)

MODEL_ID = model_details['id']
MODEL_VERSION = version


**Preparing To Deploy The Model**

Now that we have retrieved the model ID and its latest version from the registry, let's prepare to deploy it to a Cloudera AI Inference service in the same environment!

In [None]:
# We assume that there is only one AI Inference service in the environment. Change the code if you have more!
# You can get the domainname from the AI Inference UI too.
def get_caii_domain():
    command = ["cdp", "ml", "list-ml-serving-apps"]
    result = subprocess.run(command, capture_output=True, text=True)

    if result.returncode != 0:
        print("cdp command failed.")
        print("Error:")
        print(result.stderr)
        return None

    try:
        return json.loads(result.stdout)['apps'][1]['cluster']['domainName']
    except (json.JSONDecodeError, KeyError) as e:
        print(f"Error: {e}")
        return None

def deploy_model_to_caii(caii_domain, cdp_token, model_id, model_version, endpoint_name):
    # construct url
    deploy_url = f"https://{caii_domain}/api/v1alpha1/deployEndpoint"
    
    headers = {'Authorization': 'Bearer ' + cdp_token,
           'Content-Type': 'application/json'}
    
    client = httpx.Client(headers=headers)

    # Deploy the model endpoint. Note that "serving-default" is the only valid
    # namespace. Adjust resources and autoscaling parameters as you need. Also note
    # that we're not requesting a GPU for the model deployment. If your model requires GPUs,
    # you can add it to the "resources" section, e.g.
    # "resources": {
    #     "num_gpus": "2",
    #     "req_cpu": "4",
    #     "req_memory": "8Gi"
    #  }
    #
    deploy_payload = {
        "namespace": "serving-default",
        "name": f"{endpoint_name}",
        "source": {
            "registry_source": {
                "model_id": f"{model_id}",
                "version": f"{model_version}"
            }
        },
        "resources": {
            "req_cpu": "2",
            "req_memory": "4Gi"
        },
        "autoscaling": {
            "min_replicas": "1",
            "max_replicas": "2"
        }
    }
    try:
        response = client.post(deploy_url, json=deploy_payload)
        response.raise_for_status()
        print(f"Deployed {endpoint_name} successfully!")
    except httpx.HTTPStatusError as e:
        print(f"HTTP {e.response.status_code}: {e.response.text}")
    except httpx.RequestError as e:
        print(f"Error deploying {endpoint_name}: {e}")
CAII_DOMAIN = get_caii_domain()
CDP_TOKEN = get_ums_jwt_token()
ENDPOINT_NAME = "iris-onnx-nb"
deploy_model_to_caii(CAII_DOMAIN, CDP_TOKEN, MODEL_ID, MODEL_VERSION, ENDPOINT_NAME)

**Some more steps before we can run inference**
1. Wait for the model endpoint to be ready
2. Fetch the endpoint url

In [None]:
def endpoint_is_ready(caii_domain, cdp_token, endpoint_name):
    headers = {'Authorization': 'Bearer ' + cdp_token,
           'Content-Type': 'application/json'}
    url = f"https://{caii_domain}/api/v1alpha1/describeEndpoint"
    payload = {"namespace": "serving-default", "name": f"{endpoint_name}"}

    client = httpx.Client(headers=headers)

    try:
        response = client.post(url, json=payload)
        response.raise_for_status()
        return response.json()['status']['active_model_state'] == 'Loaded'
    except httpx.HTTPStatusError as e:
        print(f"HTTP {e.response.status_code}: {e.response.text}")
    except httpx.RequestError as e:
        print(f"Error describing {endpoint_name}: {e}")

def base_url(url, target):
    parsed = urlparse(url)
    path = parsed.path
    
    target = target
    pos = path.find(target)
    
    if pos == -1:
        return url
    
    # Find the end position and strip everything after
    end_pos = pos + len(target)
    new_path = path[:end_pos]
    
    # Reconstruct the URL
    new_parsed = parsed._replace(path=new_path)
    return urlunparse(new_parsed)

def get_endpoint_base_url(caii_domain, cdp_token, endpoint_name):
    headers = {'Authorization': 'Bearer ' + cdp_token,
           'Content-Type': 'application/json'}
    url = f"https://{caii_domain}/api/v1alpha1/describeEndpoint"
    payload = {"namespace": "serving-default", "name": f"{endpoint_name}"}

    client = httpx.Client(headers=headers)

    try:
        response = client.post(url, json=payload)
        response.raise_for_status()
        return base_url(response.json()['url'], endpoint_name)
    except httpx.HTTPStatusError as e:
        print(f"HTTP {e.response.status_code}: {e.response.text}")
    except httpx.RequestError as e:
        print(f"Error describing {endpoint_name}: {e}")
CAII_DOMAIN = get_caii_domain()
CDP_TOKEN = get_ums_jwt_token()
ENDPOINT_NAME = "iris-onnx-nb"

# Must return True before we go on to the next step
ready = endpoint_is_ready(CAII_DOMAIN, CDP_TOKEN, ENDPOINT_NAME)
print(ready)
BASE_URL = get_endpoint_base_url(CAII_DOMAIN, CDP_TOKEN, ENDPOINT_NAME)
print(BASE_URL)

## Part 3: Batch Inference (Demo)

**Note**: This section demonstrates the inference client setup. To actually run inference, you need:
1. A deployed model on Cloudera AI Inference
2. Proper authentication credentials
3. Correct endpoint URLs

### Inference Client Class Definition

In [None]:
class TritonBatchInference:
    """Class to handle Triton inference with dynamic batching"""
    
    def __init__(self, base_url: str, model_name: str, token: str):
        self.base_url = base_url
        self.model_name = model_name
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }
        # Note: Uncomment these lines for actual inference
        self.httpx_client = httpx.Client(headers=self.headers)
        self.client = OpenInferenceClient(base_url=base_url, httpx_client=self.httpx_client)
        
    def get_triton_model_config(self) -> Optional[Dict[str, Any]]:
        """Get Triton model configuration including dynamic batching settings"""
        config_url = f"{self.base_url}/v2/models/{self.model_name}/config"
        
        try:
            response = self.httpx_client.get(config_url)
            response.raise_for_status()
            config = response.json()
            print("Model Configuration:")
            print(json.dumps(config, indent=2))
            
            # Extract dynamic batching info
            if 'dynamic_batching' in config:
                return config['dynamic_batching']
            else:
                return None
                
        except Exception as e:
            print(f"Error getting model config: {e}")
            return None
    
    def check_server_status(self) -> bool:
        """Check if the server is ready and get model metadata"""
        try:
            # Check server readiness
            self.client.check_server_readiness()
            print("✓ Server is ready")
            
            # Get model metadata
            metadata = self.client.read_model_metadata(self.model_name)
            metadata_dict = json.loads(metadata.json())
            print("Model Metadata:")
            print(json.dumps(metadata_dict, indent=2))
            
            return True
        except Exception as e:
            print(f"Error checking server status: {e}")
            return False
        return True
    
    def prepare_iris_data(self) -> tuple:
        """Load and prepare the iris dataset for inference"""
        # Load iris dataset
        iris = load_iris()
        X, y = iris.data, iris.target
        
        # Split data (we'll use test set for batch inference)
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.3, random_state=42, stratify=y
        )
        
        # Scale the features (assuming model was trained with scaled features)
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        print(f"Dataset prepared: {len(X_test_scaled)} samples for inference")
        print(f"Feature shape: {X_test_scaled.shape}")
        
        return X_test_scaled, y_test, iris.target_names
    
    def create_batches(self, data: np.ndarray, batch_size: int) -> List[np.ndarray]:
        """Create batches from the input data"""
        batches = []
        for i in range(0, len(data), batch_size):
            batch = data[i:i + batch_size]
            batches.append(batch)
        return batches
    
    def run_batch_inference_demo(self, data: np.ndarray, batch_size: int) -> np.ndarray:
        """Run batch inference on the data"""
        batches = self.create_batches(data, batch_size)
        all_predictions = []
        
        print(f"Running inference on {len(batches)} batches of size {batch_size}")
        
        for i, batch in enumerate(batches):
            try:
                # Create inference request
                # Note: Adjust input/output names based on your model's specification
                inference_request = InferenceRequest(
                    inputs=[{
                        "name": "input",  # Adjust based on your model's input name
                        "shape": list(batch.shape),
                        "datatype": "FP32",
                        "data": batch.flatten().tolist()
                    }],
                )
                
                start_time = time.time()
                response = self.client.model_infer(self.model_name, request=inference_request)
                inference_time = time.time() - start_time
                
                # Extract predictions from response
                response_dict = json.loads(response.json())
                output_data = response_dict['outputs'][0]['data']
                
                # Reshape output to match batch size and number of classes
                output_array = np.array(output_data).reshape(batch.shape[0], -1)
                predictions = np.argmax(output_array, axis=1)
                all_predictions.extend(predictions)
                
                print(f"Batch {i+1}/{len(batches)} completed in {inference_time:.3f}s")
                
            except Exception as e:
                print(f"Error in batch {i+1}: {e}")
                # Fill with dummy predictions to maintain consistency
                dummy_predictions = [0] * len(batch)
                all_predictions.extend(dummy_predictions)
        
        return np.array(all_predictions)
    
    def evaluate_predictions(self, predictions: np.ndarray, y_true: np.ndarray, 
                           class_names: List[str]) -> Dict[str, Any]:
        """Evaluate the predictions and return metrics"""
        accuracy = accuracy_score(y_true, predictions)
        report = classification_report(y_true, predictions, target_names=class_names)
        
        results = {
            'accuracy': accuracy,
            'classification_report': report,
            'total_samples': len(y_true),
            'correct_predictions': np.sum(predictions == y_true)
        }
        
        return results

print("Inference client class defined!")

### Demo Inference Pipeline

In [None]:
# Demo configuration (replace with actual values for real inference)
DEMO_BASE_URL = BASE_URL
DEMO_MODEL_NAME = f"{MODEL_ID}"
CDP_TOKEN = get_ums_jwt_token()

# Initialize demo inference client
demo_client = TritonBatchInference(DEMO_BASE_URL, DEMO_MODEL_NAME, CDP_TOKEN)

print("Demo inference client initialized")
print(f"Base URL: {DEMO_BASE_URL}")
print(f"Model Name: {DEMO_MODEL_NAME}")

In [None]:
# Run demo inference pipeline
print("=" * 50)
print("DEMO BATCH INFERENCE PIPELINE")
print("=" * 50)

# Check server status (demo)
if demo_client.check_server_status():
    print("✓ Demo server ready")

# Get model configuration (demo)
config = demo_client.get_triton_model_config()
batch_size = config['preferred_batch_size'][0] if config else 8
print(f"Using batch size: {batch_size}")

# Prepare data
X_inference, y_inference, class_names = demo_client.prepare_iris_data()

# Run demo inference
print("\nRunning demo batch inference...")
start_time = time.time()
demo_predictions = demo_client.run_batch_inference_demo(X_inference, batch_size)
total_time = time.time() - start_time

# Evaluate demo results
results = demo_client.evaluate_predictions(demo_predictions, y_inference, class_names)
results['total_inference_time'] = total_time
results['avg_time_per_sample'] = total_time / len(X_inference)
results['throughput_samples_per_second'] = len(X_inference) / total_time

print("\n" + "=" * 50)
print("DEMO INFERENCE RESULTS")
print("=" * 50)
print(f"Total Samples: {results['total_samples']}")
print(f"Correct Predictions: {results['correct_predictions']}")
print(f"Accuracy: {results['accuracy']:.4f}")
print(f"Total Inference Time: {results['total_inference_time']:.3f}s")
print(f"Average Time per Sample: {results['avg_time_per_sample']:.6f}s")
print(f"Throughput: {results['throughput_samples_per_second']:.2f} samples/second")

print("\nDemo Classification Report:")
print(results['classification_report'])

## Summary

This notebook demonstrated:

1. **Complete ML Pipeline**: From data loading to model deployment
2. **PyTorch Model Training**: Neural network with proper validation
3. **MLflow Integration**: Experiment tracking and model registry
4. **ONNX Conversion**: Model format conversion for deployment
5. **CDP CLI Setup**: Configure CDP CLI so it can be used from the notebook
6. **AI Registry API**: Use AI Registry API v2 to get model details so it can be deployed to AI Inference
7. **AI Inference APU**: Explore AI Inference management API to deploy a model, and check deployment status
8. **Inference Client**: Framework for batch inference against deployed models

### Next Steps:
- Run production batch inference
- Monitor model performance and retrain as needed