# SageMaker Pipeline for MLOps
---

This notebook demonstrates how to build MLOps using SageMaker pipeline. I will also utilize **AutoML** instead of choosing the algorithm, training, and tuning it by myself.

<div class="alert alert-block alert-info">
    This notebook runs on <i>ml.t3.medium</i> instance with <b>Data Science 3.0</b> Kernel.
</div>

#### Processing job may take sometimes to start before execution
---

During the processing step execution, you will experience a bit of **cold start** problem. This is the transient nature of compute clusters: every job execution runs on new EC2 cluster (the number of new EC2 will be based on your configuration). This is good for security, scalability, and fault tolerance.


In [2]:
import pandas as pd
import numpy as np
import json

s3_bucket_name = 'ml-ai-demo-th'
s3_init_prefix = 'mlops'

## A bit on sample dataset
---

In this demonstrate, I will use the dataset from UCI, you can find the dataset [here](https://archive.ics.uci.edu/dataset/887/national+health+and+nutrition+health+survey+2013-2014+(nhanes)+age+prediction+subset). The objective here is to predict the age (binary classification).

This dataset is already downloaded and provided in the `../data` directory.

**Dataset citation**:
```
NA,NA. (2023). National Health and Nutrition Health Survey 2013-2014 (NHANES) Age Prediction Subset. UCI Machine Learning Repository. https://doi.org/10.24432/C5BS66.
```

In [3]:
df = pd.read_csv('../data/NHANES_age_prediction.csv', header=0)
print('Data set shape: {}'.format(df.shape))
display(df.head())
print('Target distribution: {}'.format(np.unique(df.age_group, return_counts=True)))

Data set shape: (2278, 10)


Unnamed: 0,SEQN,age_group,RIDAGEYR,RIAGENDR,PAQ605,BMXBMI,LBXGLU,DIQ010,LBXGLT,LBXIN
0,73564.0,Adult,61.0,2.0,2.0,35.7,110.0,2.0,150.0,14.91
1,73568.0,Adult,26.0,2.0,2.0,20.3,89.0,2.0,80.0,3.85
2,73576.0,Adult,16.0,1.0,2.0,23.2,89.0,2.0,68.0,6.14
3,73577.0,Adult,32.0,1.0,2.0,28.9,104.0,2.0,84.0,16.15
4,73580.0,Adult,38.0,2.0,1.0,35.9,103.0,2.0,81.0,10.92


Target distribution: (array(['Adult', 'Senior'], dtype=object), array([1914,  364]))


### Upload full dataset to S3 bucket
---

SageMaker pipeline will utilize various processing steps to read the dataset from Amazon S3, hence, I will upload this dataset to designated S3 bucket.

In [4]:
df.to_csv(
    's3://{0}/{1}/nhanes-dataset/dataset.csv'.format(s3_bucket_name, s3_init_prefix), 
    header=True, index=False
)

## Define common parameters

In [5]:
import sagemaker
import boto3
from sagemaker.workflow.pipeline_context import PipelineSession

role_arn = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sm_client = boto3.client('sagemaker')

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


## Define Pipeline Parameters

In [6]:
from sagemaker.workflow.parameters import ParameterFloat, ParameterInteger, ParameterString

pipeline_automl_instance_type = ParameterString(
    name='AutoMLInstanceType',
    default_value='ml.c5.xlarge',
)
pipeline_instance_count = ParameterInteger(
    name='InstanceCount',
    default_value=1,
)
pipeline_max_automl_runtime = ParameterInteger(
    name='MaxAutoMLRuntime',
    default_value=3600*1,
)
pipeline_model_approval_status = ParameterString(
    name='ModelApprovalStatus', 
    default_value='Approved'
)

pipeline_target_column_name = ParameterString(
    name='TargetColumnName',
    default_value='age_group',
)

## Data preprocessing
---

When we are dealing with building ML model, we first construct the dataset - i.e., feature engineering, train-test dataset splitting. For this step, I will demonstrate on how to use **ProcessingStep** to do this.

In [7]:
script_path = 'script'
%mkdir -p {script_path}

### Define data processing script

In [8]:
%%writefile {script_path}/data-processing.py

''' 
Name: data-processing.py
Description: This file is intended to conduct simple data processing before running an AutoML job
'''
import pandas as pd
import numpy as np
import logging
import argparse
import pathlib
import os
import json
from sklearn.model_selection import train_test_split
from typing import Tuple, Optional, Dict

id_column_name = 'seqn'
target_column_name = 'age_group'

def train_val_test_split(
    df: pd.DataFrame,
    target_column_name: str,
    args: argparse.Namespace
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    
    train_df, test_df = train_test_split(
        df, test_size=0.2, shuffle=True, stratify=df[target_column_name], random_state=args.random_state,
    )
    
    train_df, val_df = train_test_split(
        train_df, test_size=0.2, shuffle=True, 
        stratify=train_df[target_column_name], random_state=args.random_state,
    )
    logger.info('Train shape: {}'.format(train_df.shape))
    logger.info('Val shape: {}'.format(val_df.shape))
    logger.info('Test shape: {}'.format(test_df.shape))
    
    return train_df, val_df, test_df


def create_features_spec(
    df: pd.DataFrame,
    id_column_name: Optional[str],
    target_column_name: Optional[str]
) -> Dict:
    features_to_use = [x for x in df.columns.tolist() if (x != id_column_name) & (x != target_column_name)]
    logger.info('The features specification: {}'.format(features_to_use))
    features_dict = {
        "FeatureAttributeNames": features_to_use
    }
    return features_dict


if __name__ == '__main__':
    # define logger
    logger = logging.getLogger(__name__)
    logging.basicConfig(
        format='%(asctime)s %(levelname)s:%(message)s', 
        level=logging.INFO, 
        datefmt='%Y-%m-%d %I:%M:%S'
    )
    logger.setLevel(logging.INFO)
    
    # define argparse
    parser = argparse.ArgumentParser(
        description='Argument for processing job'
    )
    parser.add_argument(
        '--source-file-name', type=str, default='dataset.csv',
        help='File name to read (default: dataset.csv)'
    )
    parser.add_argument(
        '--random-state', type=int, default=1234,
        help='Random state during data shuffle / split (default: 1234)'
    )
    args = parser.parse_args()
    
    # main starts
    input_data_path = os.path.join('/opt/ml/processing/input', args.source_file_name)
    logger.info('Reading data from {}'.format(input_data_path))
    df = pd.read_csv(input_data_path, header=0)
    
    # convert all column names to lower case
    df.columns = [x.lower() for x in df.columns]

    logger.info('Full data shape: {}'.format(df.shape))
    train_df, val_df, test_df = train_val_test_split(
        df=df.drop(id_column_name, axis=1, errors='ignore'), 
        target_column_name=target_column_name,
        args=args,
    )
    logger.info('Writing to csv files')
    train_df.to_csv('/opt/ml/processing/train/train.csv', header=True, index=False)
    val_df.to_csv('/opt/ml/processing/validation/validation.csv', header=True, index=False)
    test_df.to_csv('/opt/ml/processing/test/test.csv', header=True, index=False)
    
    test_df.drop(target_column_name, axis=1)\
      .to_csv('/opt/ml/processing/x_test/test-features-only.csv', header=False, index=False)
    
    test_df[target_column_name]\
      .to_csv('/opt/ml/processing/y_true/test-y-true.csv', header=False, index=False)
    
    feature_dict = create_features_spec(
        df=train_df,
        id_column_name=id_column_name,
        target_column_name=target_column_name
    )
    feature_path = '/opt/ml/processing/feature-spec'
    pathlib.Path(feature_path).mkdir(parents=True, exist_ok=True)
    feature_file_name = os.path.join(feature_path, "feature-specification.json")
    logger.info('Writing the features spec to: {}'.format(feature_file_name))
    with open(feature_file_name, "w") as f:
        f.write(json.dumps(feature_dict))
    
    logger.info('COMPLETED !!! ')

Overwriting script/data-processing.py


### Define Processing Step using SKLearnProcessor

In [9]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version='1.0-1',
    role=role_arn,
    instance_type='ml.m5.xlarge',
    instance_count=pipeline_instance_count,
    sagemaker_session=pipeline_session,
)

inputs = [
    ProcessingInput(
        source='s3://{0}/{1}/nhanes-dataset/dataset.csv'.format(s3_bucket_name, s3_init_prefix), 
        destination="/opt/ml/processing/input"),
]
outputs = [
    ProcessingOutput(
        output_name='train-dataset', 
        source='/opt/ml/processing/train',
        destination='s3://{0}/{1}/nhanes-train-dataset/'.format(s3_bucket_name, s3_init_prefix), 
    ),
    ProcessingOutput(
        output_name='validation-dataset', 
        source='/opt/ml/processing/validation',
        destination='s3://{0}/{1}/nhanes-validation-dataset/'.format(s3_bucket_name, s3_init_prefix), 
    ),
    ProcessingOutput(
        output_name='test-dataset', 
        source='/opt/ml/processing/test',
        destination='s3://{0}/{1}/nhanes-test-dataset/'.format(s3_bucket_name, s3_init_prefix), 
    ),
    ProcessingOutput(
        output_name='test-features-only', 
        source="/opt/ml/processing/x_test",
        destination='s3://{0}/{1}/nhanes-test-features-only/'.format(s3_bucket_name, s3_init_prefix), 
    ),
    ProcessingOutput(
        output_name='test-label-only',
        source='/opt/ml/processing/y_true',
        destination='s3://{0}/{1}/nhanes-test-label-only/'.format(s3_bucket_name, s3_init_prefix), 
    ),
    ProcessingOutput(
        output_name='features-specification',
        source='/opt/ml/processing/feature-spec',
        destination='s3://{0}/{1}/nhanes-features-to-use/'.format(s3_bucket_name, s3_init_prefix), 
    ),
]

data_processing_step = ProcessingStep(
    name="DataProcessingStep",
    processor=sklearn_processor,
    inputs=inputs,
    outputs=outputs,
    code='script/data-processing.py',
    job_arguments=[
        '--random-state', '515'
    ]
)


## AutoML Training Step
---

An **AutoML object** is used to define the Autopilot training job run and can be added to the SageMaker pipeline by using the `AutoMLStep` class, as shown in the following code. 

The **Ensembling** training mode needs to be specified, but other parameters can be adjusted as needed. For example, instead of letting the AutoML job automatically infer the ML problem type and objective metric, these could be hardcoded by specifying the `problem_type` and `job_objective` parameters passed to the `AutoML` object.

In [10]:
from sagemaker.workflow.automl_step import AutoMLStep
from sagemaker.automl.automl import AutoMLInput

automl = sagemaker.AutoML(
    role=role_arn,
    output_path='s3://{0}/{1}/nhanes-automl-model/'.format(s3_bucket_name, s3_init_prefix),
    target_attribute_name=pipeline_target_column_name,
    sagemaker_session=pipeline_session,
    total_job_runtime_in_seconds=pipeline_max_automl_runtime,
    mode='ENSEMBLING',  
    problem_type='BinaryClassification',
    job_objective={
        'MetricName': 'F1'
    },
    feature_specification_s3_uri='s3://ml-ai-demo-th/mlops/nhanes-features-to-use/feature-specification.json',
    generate_candidate_definitions_only=False,
)

train_args = automl.fit(
    inputs=[
        AutoMLInput(
            inputs=data_processing_step.properties.ProcessingOutputConfig.Outputs[
                'train-dataset'
            ].S3Output.S3Uri,
            target_attribute_name=pipeline_target_column_name,
            channel_type='training',
        ),
        AutoMLInput(
            inputs=data_processing_step.properties.ProcessingOutputConfig.Outputs[
                'validation-dataset'
            ].S3Output.S3Uri,
            target_attribute_name=pipeline_target_column_name,
            channel_type='validation',
        )
    ],
    job_name='mlops-automl',
)

automl_step = AutoMLStep(
    name="AutoMLTrainingStep",
    step_args=train_args,
)



## Create the best model from pipeline
---

Once the **AutoML** step finishes, it takes care of generating various machine learning candidate models, combining them, and obtaining the best performing model. Model artefacts and metadata are automatically stored and can be obtained by calling the `get_best_auto_ml_model()` method on the AutoML training step. These can then be used to create a SageMaker model as part of the Model step.

In [11]:
from sagemaker.workflow.model_step import ModelStep

best_automl_model = automl_step.get_best_auto_ml_model(
    role_arn, 
    sagemaker_session=pipeline_session,
)
create_model_step_args = best_automl_model.create(
    instance_type=pipeline_automl_instance_type
)
create_best_model_step = ModelStep(
    name='CreateBestAutoMLStep',
    step_args=create_model_step_args
)

## Batch transform step
---
Next, once we have the model object, we can use the **Transformer object** for <u>batch inference</u> on the test dataset, which can then be used for evaluation purpose.

In [12]:
from sagemaker.transformer import Transformer
from sagemaker.workflow.functions import Join
from sagemaker.workflow.steps import TransformStep

batch_transformer = Transformer(
    model_name=create_best_model_step.properties.ModelName,
    instance_count=pipeline_instance_count,
    instance_type=pipeline_automl_instance_type,
    output_path=Join(
        on='/', 
        values=['s3:/', s3_bucket_name, s3_init_prefix, 'nhanes-output', 'batch-transform']
    ),
    sagemaker_session=pipeline_session,
)
batch_transform_step = TransformStep(
    name="BatchTransformStep",
    step_args=batch_transformer.transform(
        data=data_processing_step.properties.ProcessingOutputConfig.Outputs['test-features-only'].S3Output.S3Uri,
        content_type="text/csv"
    ),
)

## Model evaluation
---

In the previous step, I have defined batch predictions on the dataset. The prediction output will be saved onto the specified folder. 

Next, we can set up the **processing step / job** to compute the model metrics using **sklearn.metrics**, i.e., F1 score, accuracy score, or other metrics based on your business needs.

### Define Evaluation Script

In [13]:
%%writefile {script_path}/model-evaluation.py
import json
import os
import pathlib
import pandas as pd
import logging
from typing import Dict
from sklearn.metrics import confusion_matrix, accuracy_score

def get_binary_classification_metrics(
    y_true: pd.DataFrame, 
    y_pred: pd.DataFrame
) -> Dict:
    cm = confusion_matrix(y_true, y_pred)
    tp, fp, fn, tn = cm[0][0], cm[0][1], cm[1][0], cm[1][1]
    precision_scor = tp / (tp + fp)
    recall_scor = tp / (tp + fn)
    f1_scor = (2*precision_scor*recall_scor) / (precision_scor + recall_scor)
    specificity_scor = tn / (tn + fp)
    metrics_dict = {
        'classification_metrics': {
            "f1_score": {
                "value": f1_scor,
            },
            "recall_score": {
                "value": recall_scor,
            },
            "precision_score": {
                "value": precision_scor,
            },
            'specificity': {
                'value': specificity_scor
            },
        },
    }
    return metrics_dict
    

if __name__ == "__main__":
    logger = logging.getLogger(__name__)
    logging.basicConfig(
        format='%(asctime)s %(levelname)s:%(message)s', 
        level=logging.INFO, 
        datefmt='%Y-%m-%d %I:%M:%S'
    )
    logger.setLevel(logging.INFO)
    pred_path = '/opt/ml/processing/input/predictions/'
    true_path = '/opt/ml/processing/input/true_labels/'
    pred_file_nm = os.listdir(pred_path)[0]
    true_file_nm = os.listdir(true_path)[0]
    y_pred = pd.read_csv(os.path.join(pred_path, pred_file_nm), header=None)
    y_true = pd.read_csv(os.path.join(true_path, true_file_nm), header=None)
    logger.info('Pred: {}'.format(y_pred.shape))
    logger.info('True: {}'.format(y_true.shape))
    assert y_pred.shape[0] == y_true.shape[0], 'Something wrong, please review the data source'
    
    report_dict = get_binary_classification_metrics(y_true, y_pred)
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = os.path.join(output_dir, "evaluation_metrics.json")
    logger.info('Writing the output to: {}'.format(evaluation_path))
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))
        
    logger.info('Completed!!!')

Overwriting script/model-evaluation.py


### Define Processing Step
---

The ML model performance is captured in the form of an evaluation report in JSON format that is uploaded to S3 by the Evaluation Step and made available to other pipeline steps in the form of a property file:



In [14]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name='evaluation-file', 
    output_name='model-evaluation', 
    path='evaluation_metrics.json'
)

eval_processor = SKLearnProcessor(
    role=role_arn,
    framework_version='1.0-1',
    instance_count=pipeline_instance_count,
    instance_type='ml.m5.xlarge',
    sagemaker_session=pipeline_session,
)

eval_processor_step_args = eval_processor.run(
    inputs=[
        ProcessingInput(
            source=batch_transform_step.properties.TransformOutput.S3OutputPath,
            destination="/opt/ml/processing/input/predictions",
        ),
        ProcessingInput(
            source=data_processing_step.properties.ProcessingOutputConfig.Outputs[
                'test-label-only'
            ].S3Output.S3Uri,
            destination='/opt/ml/processing/input/true_labels'
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name='model-evaluation',
            source="/opt/ml/processing/evaluation",
            destination='s3://{0}/{1}/nhanes-output/evaluation-metrics/'.format(s3_bucket_name, s3_init_prefix),
        ),
    ],
    code='script/model-evaluation.py',
)
eval_step = ProcessingStep(
    name="ModelEvaluationStep",
    step_args=eval_processor_step_args,
    property_files=[evaluation_report],
)

## Conditional Model Registration Step
---

As part of MLOps, we should utilize **SageMaker model registry** to register the AutoML model. We can define the metric threshold as part of register into the model registry.

In this example, I will only register the model with `F1 score >= 0.8`.

### Define pipeline parameters related to model registration
---

Define additional parameters to input to the pipeline.

In [15]:
pipeline_model_package_group_name = ParameterString(
    name='ModelPackageName', 
    default_value='AutoMLForNHANESdataset',
)

pipeline_model_metric_threshold = ParameterFloat(
    name='ModelRegistrationMetricThreshold', default_value=0.8
)

### Define register step

In [16]:
from sagemaker import ModelMetrics, ModelPackage, MetricsSource

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=automl_step.properties.BestCandidateProperties.ModelInsightsJsonReportPath,
        content_type='application/json',
    ),
    explainability=MetricsSource(
        s3_uri=automl_step.properties.BestCandidateProperties.ExplainabilityJsonReportPath,
        content_type='application/json',
    ),
)

model_register_step_args = best_automl_model.register(
    content_types=['text/csv'],
    response_types=['text/csv'],
    inference_instances=[pipeline_automl_instance_type],
    transform_instances=[pipeline_automl_instance_type],
    model_package_group_name=pipeline_model_package_group_name,
    approval_status=pipeline_model_approval_status,
    model_metrics=model_metrics,
)

model_register_step = ModelStep(
    name='ModelRegistrationStep', 
    step_args=model_register_step_args,
)

### Define conditional step

In [17]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

conditional_registration_step = ConditionStep(
    name='ModelConditionalRegistrationStep',
    conditions=[
        ConditionGreaterThanOrEqualTo(
            left=JsonGet(
                step_name=eval_step.name,
                property_file=evaluation_report,
                json_path='classification_metrics.f1_score.value',
            ),
            right=pipeline_model_metric_threshold,
        )
    ],
    if_steps=[model_register_step],
    else_steps=[],
)

## SageMaker Pipeline
---
Once we have completed defining all the steps, we define the **Pipeline** object to call all the steps we have.

SageMaker pipeline is a series of interconnected steps that are defined using **pipelines SDK**. This pipeline definition encodes a pipeline using a *directed acyclic graph (DAG)* that can be exported as a JSON file. This DAG gives information on the requirements for and relationships between each step of the pipeline.


In [18]:
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
from sagemaker.workflow.pipeline import Pipeline

nhanes_mlops_pipeline = Pipeline(
    name='nhanes-pipelines-automl',
    parameters=[
        pipeline_automl_instance_type,
        pipeline_instance_count,
        pipeline_max_automl_runtime,
        pipeline_model_approval_status,
        pipeline_target_column_name,
        pipeline_model_metric_threshold,
        pipeline_model_package_group_name,
    ],
    steps=[
        data_processing_step,
        automl_step,
        create_best_model_step,
        batch_transform_step,
        eval_step,
        conditional_registration_step,
    ],
    sagemaker_session=pipeline_session,
    pipeline_definition_config=PipelineDefinitionConfig(use_custom_job_prefix=False),
)

You can check the step by calling `definition()` method from **Pipeline** class

In [19]:
json.loads(nhanes_mlops_pipeline.definition())



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'AutoMLInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.c5.xlarge'},
  {'Name': 'InstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'MaxAutoMLRuntime', 'Type': 'Integer', 'DefaultValue': 3600},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'Approved'},
  {'Name': 'TargetColumnName', 'Type': 'String', 'DefaultValue': 'age_group'},
  {'Name': 'ModelRegistrationMetricThreshold',
   'Type': 'Float',
   'DefaultValue': 0.8},
  {'Name': 'ModelPackageName',
   'Type': 'String',
   'DefaultValue': 'AutoMLForNHANESdataset'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'DataProcessingStep',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.InstanceCount'

### Create or update the pipeline

In [20]:
nhanes_mlops_pipeline.upsert(role_arn=role_arn)



{'PipelineArn': 'arn:aws:sagemaker:us-west-2:587892818306:pipeline/nhanes-pipelines-automl',
 'ResponseMetadata': {'RequestId': 'e8441b02-e885-442d-ac58-0b845fc6f1f0',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e8441b02-e885-442d-ac58-0b845fc6f1f0',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '91',
   'date': 'Sat, 06 Jan 2024 14:03:22 GMT'},
  'RetryAttempts': 0}}

### Execute the pipeline

In [21]:
import datetime
from sagemaker.workflow.selective_execution_config import SelectiveExecutionConfig

ml_pipeline_execution = nhanes_mlops_pipeline.start(
    execution_display_name='nhanes-execution-{}'.format(datetime.datetime.now().strftime('%Y%m%d-%H%M%S')),
)

When we run above command, the pipeline will start to execute and you will be able to continue to work on the same notebook. I have added the `wait()` here for the pipeline to finish its executions.

In [22]:
ml_pipeline_execution.wait(delay=30, max_attempts=120) 
ml_pipeline_execution.list_steps()

[{'StepName': 'ModelRegistrationStep-RegisterModel',
  'StartTime': datetime.datetime(2024, 1, 6, 14, 26, 58, 376000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 1, 6, 14, 26, 59, 414000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-west-2:587892818306:model-package/AutoMLForNHANESdataset/2'}},
  'AttemptCount': 1},
 {'StepName': 'ModelConditionalRegistrationStep',
  'StartTime': datetime.datetime(2024, 1, 6, 14, 26, 57, 120000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 1, 6, 14, 26, 57, 586000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Condition': {'Outcome': 'True'}},
  'AttemptCount': 1},
 {'StepName': 'ModelEvaluationStep',
  'StartTime': datetime.datetime(2024, 1, 6, 14, 22, 26, 198000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 1, 6, 14, 26, 56, 539000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemak

## Model deployment
---

In the previous section, I have created and executed the pipeline to create, and register the ML model. In this section, I will create model endpoint using the model configuration of the latest approved ML model saved within **SageMaker model registry** model package group.

This is for <u>demonstration purpose</u>, you may implement automatically the endpoint deployment or other CI/CD approach based on your business objective for production environment. 


In [23]:
model_package = sm_client.list_model_packages(
    MaxResults=2,
    ModelApprovalStatus="Approved",
    ModelPackageGroupName=pipeline_model_package_group_name.default_value,
    SortBy="CreationTime",
    SortOrder="Descending",
)
model_package_arn = model_package["ModelPackageSummaryList"][0]["ModelPackageArn"]

sm_client.describe_model_package(
    ModelPackageName=model_package_arn,
)


{'ModelPackageGroupName': 'AutoMLForNHANESdataset',
 'ModelPackageVersion': 2,
 'ModelPackageArn': 'arn:aws:sagemaker:us-west-2:587892818306:model-package/AutoMLForNHANESdataset/2',
 'CreationTime': datetime.datetime(2024, 1, 6, 14, 26, 59, 311000, tzinfo=tzlocal()),
 'InferenceSpecification': {'Containers': [{'Image': '763104351884.dkr.ecr.us-west-2.amazonaws.com/autogluon-inference:0.4.3-cpu-py38-ubuntu20.04',
    'ImageDigest': 'sha256:f461d5f846f9e2f30db9ee57ff6fa6c8ba2263f8cf343f031e1c178a6b6af864',
    'ModelDataUrl': 's3://ml-ai-demo-th/mlops/nhanes-automl-model/tojt3glonlal-AutoMLTr-4mjMUlPsZF/sagemaker-automl-candidates/model/LightGBM-BAG-L1-FULL-t1/model.tar.gz',
    'Environment': {'MODEL_NAME': 'LightGBM-BAG-L1-FULL',
     'SAGEMAKER_DEFAULT_INVOCATIONS_ACCEPT': 'text/csv',
     'SAGEMAKER_INFERENCE_OUTPUT': 'predicted_label',
     'SAGEMAKER_INFERENCE_SUPPORTED': 'predicted_label,probability,probabilities,labels',
     'SAGEMAKER_PROGRAM': 'tabular_serve.py',
     'SAGEMAK

In [24]:
nhanes_model = ModelPackage(
    role=role_arn, 
    model_package_arn=model_package_arn
)

nhanes_model.deploy(
    initial_instance_count=pipeline_instance_count.default_value,
    instance_type=pipeline_automl_instance_type.default_value,
)

INFO:sagemaker:Creating model with name: AutoMLForNHANESdataset-2024-01-06-14-27-30-090
INFO:sagemaker:Creating endpoint-config with name AutoMLForNHANESdataset-2024-01-06-14-27-30-891
INFO:sagemaker:Creating endpoint with name AutoMLForNHANESdataset-2024-01-06-14-27-30-891


----!

### Option 1: Invoke endpoint with SageMaker SDK

In [25]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer

nhanes_predictor = Predictor(
    endpoint_name=nhanes_model.endpoint_name,
    sagemaker_session=pipeline_session,
    serializer=CSVSerializer(),
)
print(
    nhanes_predictor.predict(
        df.drop(['SEQN', 'age_group'], axis=1).head(10).values
    ).decode("utf-8")
)

Adult
Adult
Adult
Adult
Adult
Adult
Adult
Adult
Senior
Adult



### Option 2: Invoke endpoint using boto3 SDK
---

For the usage guide, please refer to the [document](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker-runtime/client/invoke_endpoint.html).

In [26]:
sm_rt_client = boto3.client('sagemaker-runtime')
resp = sm_rt_client.invoke_endpoint(
    EndpointName=nhanes_model.endpoint_name,
    Body=df.drop(['SEQN', 'age_group'], axis=1).head(10).to_csv(header=False, index=False).encode('utf-8'),
    ContentType='text/csv',
    Accept='text/csv',
)

print(resp['Body'].read().decode('utf-8'))

Adult
Adult
Adult
Adult
Adult
Adult
Adult
Adult
Senior
Adult



## Clean up
---

If you are not using the endpoint anymore, it is recommended to delete it to prevent any recurring charge from the endpoint.

In [27]:
sm_client.delete_endpoint(EndpointName=nhanes_model.endpoint_name)

{'ResponseMetadata': {'RequestId': 'db895b20-cbcb-465b-903e-dac090cbacb4',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'db895b20-cbcb-465b-903e-dac090cbacb4',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Sat, 06 Jan 2024 14:30:02 GMT'},
  'RetryAttempts': 0}}