In [None]:
!python -V  # 3.8.13

In [None]:
!pip install --upgrade \
    boto3==1.26.69 \
    flask==2.2.3 \
    mlflow==2.1.1 \
    numpy==1.20.3 \
    pandas==1.3.4 \
    sagemaker==2.132.0 \
    scikit-learn==0.24.2

- install `awscli`
- set up AWS credentials

In [None]:
import datetime
import json
import pathlib
import typing

import mlflow
import mlflow.sagemaker
import numpy as np
import pandas as pd
import sagemaker
from sklearn.ensemble import RandomForestClassifier

# 1. log a model to mlflow

In [None]:
current_folder_path = pathlib.Path.cwd().absolute()
mlflow_folder_path = current_folder_path / 'mlflow'
mlflow_runs_folder_path = mlflow_folder_path / 'runs'

mlflow_artifact_path = 'test'
mlflow_registered_model_name = 'test-model'

In [None]:
mlflow.set_tracking_uri(f'file://{mlflow_runs_folder_path}')

In [None]:
training_data_size = 100
features = np.random.rand(training_data_size, 3)
labels = np.random.randint(0, 2, (training_data_size, 1))

with mlflow.start_run():
    rf_model = RandomForestClassifier()
    rf_model.fit(features, labels)
    mlflow.sklearn.log_model(rf_model, artifact_path=mlflow_artifact_path, registered_model_name=mlflow_registered_model_name)

---

# 2. build and push an mlflow serving image

In [None]:
mlflow_ecr_repository_name = 'mlflow-sm-serving'
mlflow_ecr_image_tag = 'latest'

In [None]:
!mlflow sagemaker build-and-push-container --container $mlflow_ecr_repository_name --mlflow-home <path-to-mlflow-repo-folder>

---

# 3. push the logged model to Sagemaker

In [None]:
def add_current_datetime_suffix(prefix: str) -> str:
    current_datetime_str = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
    return f'{prefix}-{current_datetime_str}'

In [None]:
aws_account_id = <>
aws_region = <>
mlflow_inference_image_uri = f'{aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com/{mlflow_ecr_repository_name}:{mlflow_ecr_image_tag}'
execution_role_arn = <>
output_s3_bucket_name = <>

In [None]:
sagemaker_session = sagemaker.Session(default_bucket=output_s3_bucket_name)

In [None]:
sagemaker_model_name = add_current_datetime_suffix(mlflow_registered_model_name)
mlflow.sagemaker.push_model_to_sagemaker(
    model_name=sagemaker_model_name,
    model_uri=f'models:/{mlflow_registered_model_name}/latest',
    execution_role_arn=execution_role_arn,
    bucket=output_s3_bucket_name,
    image_url=mlflow_inference_image_uri,
    region_name=aws_region,
    flavor='python_function',
)

sagemaker_model_name

In [None]:
sagemaker_model_metadata = sagemaker_session.sagemaker_client.describe_model(ModelName=sagemaker_model_name)

sagemaker_model_metadata

---

# 4. create & test a multi-container model

In [None]:
def create_multi_container_sagemaker_model(
        model_name: str,
        containers: typing.List[dict],
        inference_execution_mode: str,
        role_arn: str,
        sm_session: sagemaker.Session,
) -> dict:
    assert inference_execution_mode in {'Direct', 'Serial'}

    response = sm_session.sagemaker_client.create_model(
        ModelName=model_name,
        Containers=containers,
        InferenceExecutionConfig={'Mode': inference_execution_mode},
        ExecutionRoleArn=role_arn,
    )
    return response


def create_endpoint_config(
        endpoint_config_name: str,
        sm_model_name: str,
        sm_session: sagemaker.Session,
        instance_type: str = 'ml.m5.large',
        instance_count: int = 1,
) -> dict:
    response = sm_session.sagemaker_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                'VariantName': 'prod',
                'ModelName': sm_model_name,
                'InitialInstanceCount': instance_count,
                'InstanceType': instance_type,
            },
        ],
    )
    return response


def create_endpoint(
        endpoint_name: str,
        endpoint_config_name: str,
) -> dict:
    response = sagemaker_session.sagemaker_client.create_endpoint(
        EndpointName=endpoint_name,
        EndpointConfigName=endpoint_config_name,
    )
    return response


def invoke_endpoint(
        endpoint_name: str,
        inference_data: str,
        sm_session: sagemaker.Session,
        container_host_name: typing.Optional[str] = None,
        request_content_type: str = 'application/json',
        response_accept: str = 'application/json',
) -> str:
    request_args = dict(
        EndpointName=endpoint_name,
        ContentType=request_content_type,
        Accept=response_accept,
        Body=inference_data,
    )

    if container_host_name is not None:
        request_args['TargetContainerHostname'] = container_host_name


    response = sm_session.sagemaker_runtime_client.invoke_endpoint(**request_args)

    return response['Body'].read().decode()


def delete_endpoint(
        endpoint_name: str,
        sm_session: sagemaker.Session,
        endpoint_config_name: typing.Optional[str] = None,
        sm_model_name: typing.Optional[str] = None,
):
    sm_session.sagemaker_client.delete_endpoint(EndpointName=endpoint_name)

    if endpoint_config_name is not None:
        sm_session.sagemaker_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)

    if sm_model_name is not None:
        sm_session.sagemaker_client.delete_model(ModelName=sm_model_name)

In [None]:
first_container_definition = {
    'ContainerHostname': 'model-1',
    'Image': sagemaker_model_metadata['PrimaryContainer']['Image'],
    'ModelDataUrl': sagemaker_model_metadata['PrimaryContainer']['ModelDataUrl'],
}

second_container_definition = first_container_definition.copy()
second_container_definition['ContainerHostname'] = 'model-2'

In [None]:
multi_container_model_name = add_current_datetime_suffix('mlflow-multi-container')
create_model_response = create_multi_container_sagemaker_model(
    model_name=multi_container_model_name,
    containers=[first_container_definition, second_container_definition],
    inference_execution_mode='Direct',
    role_arn=execution_role_arn,
    sm_session=sagemaker_session,
)

create_model_response

In [None]:
multi_container_endpoint_config_name = add_current_datetime_suffix('mlflow-multi-container-config')
endpoint_config_response = create_endpoint_config(
    endpoint_config_name=multi_container_endpoint_config_name,
    sm_model_name=multi_container_model_name,
    sm_session=sagemaker_session,
)

endpoint_config_response

In [None]:
multi_container_endpoint_name = add_current_datetime_suffix('mlflow-multi-container-endpoint')
endpoint_response = create_endpoint(
    endpoint_name=multi_container_endpoint_name,
    endpoint_config_name=multi_container_endpoint_config_name,
)

endpoint_response

In [None]:
invoke_response = invoke_endpoint(
    endpoint_name=multi_container_endpoint_name,
    inference_data=json.dumps({'dataframe_split': pd.DataFrame(features).to_dict(orient='split')}),
    container_host_name=first_container_definition['ContainerHostname'],
    sm_session=sagemaker_session,
)

invoke_response

In [None]:
invoke_response = invoke_endpoint(
    endpoint_name=multi_container_endpoint_name,
    inference_data=json.dumps({'dataframe_split': pd.DataFrame(features).to_dict(orient='split')}),
    container_host_name=second_container_definition['ContainerHostname'],
    sm_session=sagemaker_session,
)

invoke_response

In [None]:
delete_endpoint(
    endpoint_name=multi_container_endpoint_name,
    endpoint_config_name=multi_container_endpoint_config_name,
    sm_model_name=multi_container_model_name,
    sm_session=sagemaker_session,
)

---

# 5. create preprocessing "model" (script only) for an inference pipeline endpoint

prepare model inference code:

In [None]:
scripts_folder_path = './scripts'
processing_code_folder_path = f'{scripts_folder_path}/code'
processing_script_name = 'inference.py'
processing_script_path = f'{processing_code_folder_path}/{processing_script_name}'
processing_model_archive_name = 'test-processing-model.tar.gz'
processing_model_archive_path = f'./{processing_model_archive_name}'

In [None]:
!mkdir -p $processing_code_folder_path

In [None]:
%%writefile $processing_script_path


import json


def model_fn(model_dir):
    return None


def input_fn(request_body, request_content_type):
    if request_content_type == 'application/json':
        input_data = json.loads(request_body)
        return input_data
    else:
        raise ValueError(f'Unsupported request content type: "{request_content_type}"')


def predict_fn(input_data, model):
    return input_data


def output_fn(prediction, content_type):
    if content_type == 'application/json':
        return json.dumps(prediction), content_type
    else:
        raise ValueError(f'Unsupported response content type: "{content_type}"')

In [None]:
!tar -czvf $processing_model_archive_path -C $scripts_folder_path .

In [None]:
!aws s3 cp $processing_model_archive_path s3://$output_s3_bucket_name/

In [None]:
!rm -rf $scripts_folder_path $processing_model_archive_path

create a processing container definition:

In [None]:
processing_container_definition = {
    'ContainerHostname': 'processing-model',
    'Image': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.0-1-cpu-py3',  # public image
    'ModelDataUrl': f's3://{output_s3_bucket_name}/{processing_model_archive_name}',
    'Environment': {
        'SAGEMAKER_PROGRAM': processing_script_name,
        'SAGEMAKER_SUBMIT_DIRECTORY': '/opt/ml/model/code',
    },
}

---

# 6. create an inference-pipeline model

In [None]:
inference_pipeline_model_name = add_current_datetime_suffix('mlflow-inference-pipeline')
create_model_response = create_multi_container_sagemaker_model(
    model_name=inference_pipeline_model_name,
    containers=[processing_container_definition, second_container_definition],
    inference_execution_mode='Serial',
    role_arn=execution_role_arn,
    sm_session=sagemaker_session,
)

create_model_response

In [None]:
inference_pipeline_endpoint_config_name = add_current_datetime_suffix('mlflow-inference-pipeline-config')
endpoint_config_response = create_endpoint_config(
    endpoint_config_name=inference_pipeline_endpoint_config_name,
    sm_model_name=inference_pipeline_model_name,
    sm_session=sagemaker_session,
)

endpoint_config_response

In [None]:
inference_pipeline_endpoint_name = add_current_datetime_suffix('mlflow-inference-pipeline-endpoint')
endpoint_response = create_endpoint(
    endpoint_name=inference_pipeline_endpoint_name,
    endpoint_config_name=inference_pipeline_endpoint_config_name,
)

endpoint_response

In [None]:
invoke_response = invoke_endpoint(
    endpoint_name=inference_pipeline_endpoint_name,
    inference_data=json.dumps({'dataframe_split': pd.DataFrame(features).to_dict(orient='split')}),
    sm_session=sagemaker_session,
)

invoke_response

In [None]:
delete_endpoint(
    endpoint_name=inference_pipeline_endpoint_name,
    endpoint_config_name=inference_pipeline_endpoint_config_name,
    sm_model_name=inference_pipeline_model_name,
    sm_session=sagemaker_session,
)

---

# 7. multi-container endpoint with `nginx` disabled

In [None]:
first_container_non_nginx_definition = first_container_definition.copy()
first_container_non_nginx_definition['Environment'] = {'DISABLE_NGINX': 'true'}

second_container_non_nginx_definition = second_container_definition.copy()
second_container_non_nginx_definition['Environment'] = {'DISABLE_NGINX': 'true'}

In [None]:
multi_container_non_nginx_model_name = add_current_datetime_suffix('mlflow-multi-container-nn')
create_model_response = create_multi_container_sagemaker_model(
    model_name=multi_container_non_nginx_model_name,
    containers=[first_container_non_nginx_definition, second_container_non_nginx_definition],
    inference_execution_mode='Direct',
    role_arn=execution_role_arn,
    sm_session=sagemaker_session,
)

create_model_response

In [None]:
multi_container_non_nginx_endpoint_config_name = add_current_datetime_suffix('mlflow-multi-container-nn-config')
endpoint_config_response = create_endpoint_config(
    endpoint_config_name=multi_container_non_nginx_endpoint_config_name,
    sm_model_name=multi_container_non_nginx_model_name,
    sm_session=sagemaker_session,
)

endpoint_config_response

In [None]:
multi_container_non_nginx_endpoint_name = add_current_datetime_suffix('mlflow-multi-container-nn-endpoint')
endpoint_response = create_endpoint(
    endpoint_name=multi_container_non_nginx_endpoint_name,
    endpoint_config_name=multi_container_non_nginx_endpoint_config_name,
)

endpoint_response

In [None]:
invoke_response = invoke_endpoint(
    endpoint_name=multi_container_non_nginx_endpoint_name,
    inference_data=json.dumps({'dataframe_split': pd.DataFrame(features).to_dict(orient='split')}),
    container_host_name=first_container_non_nginx_definition['ContainerHostname'],
    sm_session=sagemaker_session,
)

invoke_response

In [None]:
invoke_response = invoke_endpoint(
    endpoint_name=multi_container_non_nginx_endpoint_name,
    inference_data=json.dumps({'dataframe_split': pd.DataFrame(features).to_dict(orient='split')}),
    container_host_name=second_container_non_nginx_definition['ContainerHostname'],
    sm_session=sagemaker_session,
)

invoke_response

In [None]:
delete_endpoint(
    endpoint_name=multi_container_non_nginx_endpoint_name,
    endpoint_config_name=multi_container_non_nginx_endpoint_config_name,
    sm_model_name=multi_container_non_nginx_model_name,
    sm_session=sagemaker_session,
)

---