![alt text](https://whylabs-public.s3.us-west-2.amazonaws.com/assets/whylabs-logo-night-blue.svg)

*Run AI with Certainty*

# **Using WhyLabs with Sagemaker** 

In [1]:
# Torch installed elsewhere
%pip install sagemaker xgboost python-dotenv ipywidgets


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/home/anthony/workspace/sagemaker-example/.venv/bin/python -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
# Just storing sensitive stuff in a .env file.
from dotenv import load_dotenv
import os

# Create a sagemaker.env file with these vars
# SAGEMAKER_ROLE=
# WHYLABS_API_KEY=
# WHYLABS_DEFAULT_DATASET_ID=
# BUCKET_ENV=

load_dotenv(dotenv_path='sagemaker.env')

True

## AWS Authentication

Set up the AWS authentication by preparing an execution role for Sagemaker and ensuring you can use the aws cli.

In [3]:
import sagemaker

# A sagemaker execution role that you previously created
aws_role = os.getenv("SAGEMAKER_ROLE")
aws_region = "us-west-2"
session = sagemaker.Session()

## Train a model

In [4]:
import xgboost as xgb
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Load dataset
iris = datasets.load_iris()
X = iris.data
y = iris.target

# Split the dataset into a training set and a test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


# Convert the dataset into the DMatrix format used by XGBoost
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)

# Set up the parameters for XGBoost
# Objective is multi:softmax for multi-class classification problems
# num_class should be set to the number of classes to predict
# There are many other parameters you can set to customize the boosting process
param = {
    'max_depth': 3,  # Maximum depth of a tree
    'eta': 0.3,      # Learning rate
    'objective': 'multi:softmax',  # Multiclass classification problem
    'num_class': 3}  # Number of classes in objective
num_round = 20  # Number of boosting rounds

# df = pd.DataFrame(X_train)

# Train the model
bst = xgb.train(param, dtrain, num_round)


print(y_test)
# Make predictions
preds = bst.predict(dtest)

# Evaluate the predictions
accuracy = accuracy_score(y_test, preds)
print(f"Accuracy: {accuracy * 100:.2f}%")

[1 0 2 1 1 0 1 2 1 1 2 0 0 0 0 1 2 1 1 2 0 2 0 2 2 2 2 2 0 0]
Accuracy: 100.00%


## Prepare model
For this example we'll package up an existing model, one of the resnet variants available in pytorch.

In [5]:
import tarfile

model_file_name = "xgboost-model"
bst.save_model(model_file_name)

# Set up the path in the bucket
bucket = os.getenv("BUCKET_NAME")
key_prefix = 'sagemaker_models/xgboost-iris'


# Then, compress it into a tar.gz file
model_archive_name = "model.tar.gz"
with tarfile.open(model_archive_name, "w:gz") as tar:
    tar.add(model_file_name)

upload_path = session.upload_data(path=model_archive_name, bucket=bucket, key_prefix=key_prefix)
print(f"Model artifact uploaded to: {upload_path}")



Model artifact uploaded to: s3://sagemaker-us-west-2-207285235248/sagemaker_models/xgboost-iris/model.tar.gz


## Prepare the requirements file
You'll need to install whylogs on the Sagemaker host. You do this by passing a requirements file with everything that you need. We'll create a dummy virtual env here just to export a requirements file for Sagemaker. 

In [6]:
# Just bundled with the requirements.txt file instead of creating dynamically

# ! mkdir -p code 
# ! bash -c "virtualenv ./code/.venv && source ./code/.venv/bin/activate && pip install xgboost==1.7.6 whylogs[proc]==1.3.8 && pip freeze > code/requirements.txt"
# ! rm -rf ./code/.venv

## Create an inference.py file
The integration happens in the custom inference logic for the Sagemaker container. The important parts are captured below.  This cell will be written to a file and deployed along with the model further down. This happens to be logging image data but it works with other kinds of data as well.

In [7]:
%%writefile code/inference.py
import multiprocessing
import os
import traceback
import json
import xgboost as xgb
import pandas as pd
from typing import List

import whylogs as why
from whylogs.api.writer import Writer, Writers
from whylogs.api.logger.experimental.logger.actor.thread_rolling_logger import ThreadRollingLogger
from whylogs.api.logger.experimental.logger.actor.time_util import Schedule, TimeGranularity

# Initialize whylogs with your WhyLabs API key and target dataset ID. You can get an api key from the
# settings menu of you WhyLabs account.
why.init() # This loads credentials from the env directly

def create_logger():
    logger = ThreadRollingLogger(
        # This should match the model type in WhyLabs. We're using a daily model here.
        aggregate_by=TimeGranularity.Day,
        # The profiles will be uploaded from the rolling logger to WhyLabs every 5 minutes. Data
        # will accumulates during that time.
        write_schedule=Schedule(cadence=TimeGranularity.Minute, interval=5),
        writers=[Writers.get('whylabs')]
    )

    return logger

logger = create_logger()

def model_fn(model_dir):
    model_file = "xgboost-model"
    booster = xgb.Booster()
    booster.load_model(os.path.join(model_dir, model_file))
    return booster


def input_fn(request_body, request_content_type):
    print(f'Logger closed: {logger.is_closed()}, alive : {logger.is_alive()}')

    assert request_content_type == 'application/json'
    # Body should be a list of lists of length 4
    body = json.loads(request_body)

    if 'flush' in body and body['flush']:
        # Utility for flushing the logger, which forces it to upload any pending profiles synchronously.
        print("Flushing logger...")
        logger.flush()
        print("Done flushing logger")
        return None

    if 'close' in body and body['close']:
        logger.close()
        return None

    if type(body) is not list:
        raise ValueError(f"Expected a list of lists, got {type(body)}")

    if len(body) == 0:
        raise ValueError("Expected a list of lists, got an empty list")

    if len(body[0]) != 4:
        raise ValueError(f"Expected a list of lists of length 4, got a list of lists of length {len(body[0])}")

    return body


column_names = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']

def predict_fn(input_data: List[List[float]], model):
    if input_data is None:
        return ""

    test = xgb.DMatrix(input_data)
    predictions = model.predict(test)

    df = pd.DataFrame(input_data, columns=column_names)
    df['prediction'] = predictions.tolist()

    try:
        print("Logging prediction...")
        logger.log(df)
        print("Done logging prediction")
    except Exception as e:
        print(f"Failed to log prediction: {e}")
        print(traceback.format_exc())

    return predictions.tolist()


def output_fn(prediction, content_type):
    return json.dumps(prediction)



Overwriting code/inference.py


# Create a XGBoost deployment

In [8]:
from sagemaker.xgboost import XGBoostModel

sagemaker_model = XGBoostModel(
    source_dir='code',
    entry_point='inference.py',
    model_data=upload_path,
    framework_version='1.7-1',
    role=aws_role,
    env={
        'WHYLABS_API_KEY': os.environ['WHYLABS_API_KEY'],
        'WHYLABS_DEFAULT_DATASET_ID': os.environ['WHYLABS_DEFAULT_DATASET_ID']
    },
)

In [9]:
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import StringDeserializer

predictor = sagemaker_model.deploy(initial_instance_count=1, instance_type='ml.m5.large')
predictor.serializer = JSONSerializer()
predictor.deserializer = StringDeserializer()

--------------------------------------

KeyboardInterrupt: 

# Make predictions

In [None]:
def predict(data):
    return predictor.predict(data, initial_args={'ContentType': 'application/json'})

In [None]:
# Predict with our test data

predict(X_test)

'[1.0, 0.0, 2.0, 1.0, 1.0, 0.0, 1.0, 2.0, 1.0, 1.0, 2.0, 0.0, 0.0, 0.0, 0.0, 1.0, 2.0, 1.0, 1.0, 2.0, 0.0, 2.0, 0.0, 2.0, 2.0, 2.0, 2.0, 2.0, 0.0, 0.0]'

## Force the logger to upload

> ⚠️ These things really only reliably work if you have a single instance behind your prediction endpoint. Otherwise you'll have to make sure these requests get to each endpoint individually.

This forces the logger to upload (see the inference.py code) which uploads any remaining data in the logger before we close down the Sagemaker endpoint. The rolling logger typically uploads data on a predefined interval so you can do something like this to make sure you don't clip your profile uploads before shutting things down. Sagemaker doesn't provide any "on close" hooks to make this transparent.


In [None]:
# predictor.predict({'flush':True}, initial_args={'ContentType': 'application/json'})

'""'

In addition, you can close the logger. This also forces an upload but results in the logger no longer being active, so you should only do this before you're about to tear down the endpoint since it let's you synchronously wait for any pending uploads to finish.

Remember, these don't automatically work. They only work because we set up the `inference.py`` file to check for these payloads and call the right methods on the logger.

In [None]:
# predictor.predict({'close':True}, initial_args={'ContentType': 'application/json'})

# Clean up endpoint

In [None]:
# predictor.delete_endpoint()

NameError: name 'predictor' is not defined