# Step 3: Add an ML pipeline

In this step you automate our end-to-end ML workflow using [Amazon SageMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines/) and [Amazon SageMaker Model Registry](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry.html). You make feature engineering re-usable, repeatable, and scaleable using [Amazon SageMaker Feature Store](https://aws.amazon.com/sagemaker/feature-store/).

![](img/six-steps-3.png)

<div class="alert alert-info"> Make sure you using <code>Data Science 3.0</code> image in Studio for this notebook.</div>

In [253]:
import pandas as pd
import json
import boto3
import pathlib
import io
import sagemaker
from time import gmtime, strftime, sleep
from sagemaker.deserializers import CSVDeserializer
from sagemaker.serializers import CSVSerializer

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import (
    ProcessingInput, 
    ProcessingOutput, 
    ScriptProcessor
)
from sagemaker.inputs import TrainingInput

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import (
    ProcessingStep, 
    TrainingStep, 
    CreateModelStep
)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.parameters import (
    ParameterInteger, 
    ParameterFloat, 
    ParameterString, 
    ParameterBoolean
)
from sagemaker.workflow.clarify_check_step import (
    ModelBiasCheckConfig, 
    ClarifyCheckStep, 
    ModelExplainabilityCheckConfig
)
from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.conditions import (
    ConditionGreaterThan,
    ConditionGreaterThanOrEqualTo
)
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import (
    Join,
    JsonGet
)
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda

from sagemaker.model_metrics import (
    MetricsSource, 
    ModelMetrics, 
    FileSource
)
from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig 
from sagemaker.image_uris import retrieve
from sagemaker.workflow.function_step import step
from sagemaker.workflow.step_outputs import get_step

sagemaker.__version__

'2.212.0'

In [254]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [278]:
%store -r 

%store

try:
    initialized
except NameError:
    print("+++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN 00-start-here notebook   ")
    print("+++++++++++++++++++++++++++++++++++++++++++++++++")

Stored variables and their in-db values:
baseline_s3_url                         -> 's3://sagemaker-us-east-1-906545278380/from-idea-t
bucket_name                             -> 'sagemaker-us-east-1-906545278380'
bucket_prefix                           -> 'from-idea-to-prod/xgboost'
dataset_feature_group_name              -> 'from-idea-to-prod-03-11-15-15'
dataset_file_local_path                 -> 'data/bank-additional/bank-additional-full.csv'
domain_id                               -> 'd-9oteoq7mqbpp'
evaluation_s3_url                       -> 's3://sagemaker-us-east-1-906545278380/from-idea-t
experiment_name                         -> 'from-idea-to-prod-experiment-09-18-03-39'
feature_store_bucket_prefix             -> 'from-idea-to-prod/feature-store'
initialized                             -> True
input_s3_url                            -> 's3://sagemaker-us-east-1-906545278380/from-idea-t
model_package_group_name                -> 'from-idea-to-prod-model-group-03-14-21-18'
outp

## Set constants

In [256]:
# Set names of pipeline objects
project = "from-idea-to-prod"

current_timestamp = strftime('%m-%d-%H-%M', gmtime())
pipeline_name = f"{project}-pipeline-{current_timestamp}"
pipeline_model_name = f"{project}-model-xgb"
model_package_group_name = f"{project}-model-group-{current_timestamp}"
endpoint_config_name = f"{project}-endpoint-config"
endpoint_name = f"{project}-endpoint"
model_approval_status = "PendingManualApproval"

In [257]:
# Set instance types and counts
process_instance_type = "ml.c5.xlarge"
process_instance_count = 1
train_instance_count = 1
train_instance_type = "ml.m5.xlarge"

In [258]:
# Set S3 urls for processed data
output_s3_prefix = f"s3://{bucket_name}/{bucket_prefix}"
train_s3_url = f"{output_s3_prefix}/train"
validation_s3_url = f"{output_s3_prefix}/validation"
test_s3_url = f"{output_s3_prefix}/test"
baseline_s3_url = f"{output_s3_prefix}/baseline"

evaluation_s3_url = f"{output_s3_prefix}/evaluation"
prediction_baseline_s3_url = f"{output_s3_prefix}/prediction_baseline"

output_s3_url = f"{output_s3_prefix}/output"

In [259]:
%store train_s3_url
%store validation_s3_url
%store test_s3_url
%store baseline_s3_url
%store pipeline_name
%store model_package_group_name
%store evaluation_s3_url
%store prediction_baseline_s3_url
%store output_s3_url

Stored 'train_s3_url' (str)
Stored 'validation_s3_url' (str)
Stored 'test_s3_url' (str)
Stored 'baseline_s3_url' (str)
Stored 'pipeline_name' (str)
Stored 'model_package_group_name' (str)
Stored 'evaluation_s3_url' (str)
Stored 'prediction_baseline_s3_url' (str)
Stored 'output_s3_url' (str)


In [260]:
print(f"Train S3 url: {train_s3_url}")
print(f"Validation S3 url: {validation_s3_url}")
print(f"Test S3 url: {test_s3_url}")
print(f"Data baseline S3 url: {baseline_s3_url}")
print(f"Evaluation metrics S3 url: {evaluation_s3_url}")
print(f"Model prediction baseline S3 url: {prediction_baseline_s3_url}")

Train S3 url: s3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/train
Validation S3 url: s3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/validation
Test S3 url: s3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/test
Data baseline S3 url: s3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/baseline
Evaluation metrics S3 url: s3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/evaluation
Model prediction baseline S3 url: s3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/prediction_baseline


## Create a pipeline

### Setup pipeline parameters
SageMaker Pipelines supports [parameterization](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html), which allows you to specify input parameters at runtime without changing your pipeline code. You can use the parameter classes available under the [`sagemaker.workflow.parameters`](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#parameters) module.
Parameters have a default value, which you can override by specifying parameter values when starting a pipeline execution.

In [458]:
# Set processing instance type
process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)

# Set training instance type
train_instance_type_param = ParameterString(
    name="TrainingInstanceType",
    default_value=train_instance_type,
)

# Set training instance count
train_instance_count_param = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=train_instance_count
)

# Set model approval status for the model registry
model_approval_status_param = ParameterString(
    name="ModelApprovalStatus",
    default_value=model_approval_status
)

# Minimal threshold for model performance on the test dataset
test_score_threshold_param = ParameterFloat(
    name="TestScoreThreshold", 
    default_value=0.75
)

# Parametrize the S3 url for input dataset
input_s3_url_param = ParameterString(
    name="InputDataUrl",
    default_value=input_s3_url,
)

# Model package group name
model_package_group_name_param = ParameterString(
    name="ModelPackageGroupName",
    default_value=model_package_group_name,
)

In [459]:
skprocessor_framework_version = "0.23-1"

### Build the pipeline steps
You create a pipeline with the following:
| Step | Description |
|---|---|
| **Data processing** | runs a SageMaker processing job for feature engineering and dataset split|
| **Training** | runs a SageMaker training job using XGBoost algorithm |
| **Evaluation** | evaluates the performance of the trained model |
| **Condition** | checks if the performance of the model meets the specified threshold |
| **Register model** | registers a version of the model in the SageMaker model registry |

💡 You can use subclass compatibility for [workflow pipeline job steps](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#steps) to build job abstractions and use exactly the same code to configure the pipeline as the code for running processing, training, transform, and tuning jobs from the previous step notebooks. You also can use [PipelineSession](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline_context.PipelineSession) context instead of `sagemaker_session` to capture the run calls such as `processor.run()` or `estimator.fit()` but not run until the pipeline is created and executed.

In [460]:
# Note the usage of PipelineSession instead of Session
session = PipelineSession()

#### Processing step
Re-use the processor definition and `sklearn_processor.run()` code from the step 2 [notebook](./02-sagemaker-containers.ipynb) to create a pipeline processing step.

In [461]:
%%writefile preprocessing.py

from sklearn.preprocessing import MinMaxScaler, LabelEncoder
import pandas as pd
import numpy as np
import argparse
import os

def _parse_args():
    
    parser = argparse.ArgumentParser()
    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.
    parser.add_argument('--filepath', type=str, default='/opt/ml/processing/input/')
    parser.add_argument('--filename', type=str, default='bank-additional-full.csv')
    parser.add_argument('--outputpath', type=str, default='/opt/ml/processing/output/')
    
    return parser.parse_known_args()


def process_data(df_data):
    target_col = "y"
        
    # Indicator variable to capture when pdays takes a value of 999
    df_data["no_previous_contact"] = np.where(df_data["pdays"] == 999, 1, 0)

    # Indicator for individuals not actively employed
    df_data["not_working"] = np.where(
        np.in1d(df_data["job"], ["student", "retired", "unemployed"]), 1, 0
    )

    # remove unnecessary data
    df_model_data = df_data.drop(
        ["duration", "emp.var.rate", "cons.price.idx", "cons.conf.idx", "euribor3m", "nr.employed"],
        axis=1,
    )


    bins = [18, 30, 40, 50, 60, 70, 90]
    labels = ['18-29', '30-39', '40-49', '50-59', '60-69', '70-plus']

    df_model_data['age_range'] = pd.cut(df_model_data.age, bins, labels=labels, include_lowest=True)
    df_model_data = pd.concat([df_model_data, pd.get_dummies(df_model_data['age_range'], prefix='age', dtype=int)], axis=1)
    df_model_data.drop('age', axis=1, inplace=True)
    df_model_data.drop('age_range', axis=1, inplace=True)

    scaled_features = ['pdays', 'previous', 'campaign']
    df_model_data[scaled_features] = MinMaxScaler().fit_transform(df_model_data[scaled_features])

    df_model_data = pd.get_dummies(df_model_data, dtype=int)  # Convert categorical variables to sets of indicators

    # Replace "y_no" and "y_yes" with a single label column, and bring it to the front:
    df_model_data = pd.concat(
        [
            df_model_data["y_yes"].rename(target_col),
            df_model_data.drop(["y_no", "y_yes"], axis=1),
        ],
        axis=1,
    )
    
    return df_model_data

if __name__=="__main__":
    # Process arguments
    args, _ = _parse_args()
    
    target_col = "y"
    
    # process data
    df_model_data = process_data(pd.read_csv(os.path.join(args.filepath, args.filename), sep=";"))

    # Shuffle and splitting dataset
    train_data, validation_data, test_data = np.split(
        df_model_data.sample(frac=1, random_state=1729),
        [int(0.7 * len(df_model_data)), int(0.9 * len(df_model_data))],
    )

    print(f"Data split > train:{train_data.shape} | validation:{validation_data.shape} | test:{test_data.shape}")
    
    # Save datasets locally
    train_data.to_csv(os.path.join(args.outputpath, 'train/train.csv'), index=False, header=False)
    validation_data.to_csv(os.path.join(args.outputpath, 'validation/validation.csv'), index=False, header=False)
    test_data[target_col].to_csv(os.path.join(args.outputpath, 'test/test_y.csv'), index=False, header=False)
    test_data.drop([target_col], axis=1).to_csv(os.path.join(args.outputpath, 'test/test_x.csv'), index=False, header=False)
    
    # Save the baseline dataset for model monitoring
    df_model_data.drop([target_col], axis=1).to_csv(os.path.join(args.outputpath, 'baseline/baseline.csv'), index=False, header=False)
    
    print("## Processing complete. Exiting.")

Overwriting preprocessing.py


In [462]:
sklearn_processor = SKLearnProcessor(
        framework_version=skprocessor_framework_version,
        role=sm_role,
        instance_type=process_instance_type_param,
        instance_count=process_instance_count,
        base_job_name=f"{project}-preprocess",
        sagemaker_session=session,
    )
    
processing_inputs=[
    ProcessingInput(
        source=input_s3_url_param, 
        destination="/opt/ml/processing/input"
    )
]

processing_outputs=[
    ProcessingOutput(
        output_name="train_data",
        source="/opt/ml/processing/output/train", 
        destination=train_s3_url
    ),
    ProcessingOutput(
        output_name="validation_data",
        source="/opt/ml/processing/output/validation",
        destination=validation_s3_url
    ),
    ProcessingOutput(
        output_name="test_data", 
        source="/opt/ml/processing/output/test",
        destination=test_s3_url
    ),
    ProcessingOutput(
        output_name="baseline_data",
        source="/opt/ml/processing/output/baseline", 
        destination=baseline_s3_url
    ),
]

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [463]:
processor_args = sklearn_processor.run(
    inputs=processing_inputs,
    outputs=processing_outputs,
    code='preprocessing.py',
    # arguments = ['arg1', 'arg2'],
)
    
# Define processing step
step_process = ProcessingStep(
    name=f"{project}-preprocess",
    step_args=processor_args,
)

#### Training step
Re-use the estimator definition and hyperparameters setup code from the step 2 [notebook](./02-sagemaker-containers.ipynb) to create a model training pipeline step.

In [464]:
xgboost_image_uri = sagemaker.image_uris.retrieve("xgboost", region=region, version="1.5-1")

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


In [465]:
# Instantiate an XGBoost estimator object
estimator = sagemaker.estimator.Estimator(
    image_uri=xgboost_image_uri,
    role=sm_role, 
    instance_type=train_instance_type_param,
    instance_count=train_instance_count_param,
    output_path=output_s3_url,
    sagemaker_session=session,
    base_job_name=f"{project}-train",
)

# Define algorithm hyperparameters
estimator.set_hyperparameters(
    num_round=100, # the number of rounds to run the training
    max_depth=3, # maximum depth of a tree
    eta=0.5, # step size shrinkage used in updates to prevent overfitting
    alpha=2.5, # L1 regularization term on weights
    objective="binary:logistic",
    eval_metric="auc", # evaluation metrics for validation data
    subsample=0.8, # subsample ratio of the training instance
    colsample_bytree=0.8, # subsample ratio of columns when constructing each tree
    min_child_weight=3, # minimum sum of instance weight (hessian) needed in a child
    early_stopping_rounds=10, # the model trains until the validation score stops improving
    verbosity=1, # verbosity of printing messages
)

training_inputs = {
    "train": TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
            "train_data"
        ].S3Output.S3Uri,
        content_type="text/csv",
    ),
    "validation": TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
            "validation_data"
        ].S3Output.S3Uri,
        content_type="text/csv",
    ),
}

training_args = estimator.fit(training_inputs)

# Define training step
step_train = TrainingStep(
    name=f"{project}-train",
    step_args=training_args
)

#### Evaluation step
Create a model evaluation script to check if the model performance meets the specified threshold. 

In [466]:
%%writefile evaluation.py

import json
import os
import pathlib
import pickle as pkl
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost as xgb
import datetime as dt
from sklearn.metrics import roc_curve, auc

if __name__ == "__main__":   
    
    # All paths are local for the processing container
    model_path = "/opt/ml/processing/model/model.tar.gz"
    test_x_path = "/opt/ml/processing/test/test_x.csv"
    test_y_path = "/opt/ml/processing/test/test_y.csv"
    output_dir = "/opt/ml/processing/evaluation"
    output_prediction_path = "/opt/ml/processing/output/"
        
    # Read model tar file
    with tarfile.open(model_path, "r:gz") as t:
        t.extractall(path=".")
    
    # Load model
    model = xgb.Booster()
    model.load_model("xgboost-model")
    
    # Read test data
    X_test = xgb.DMatrix(pd.read_csv(test_x_path, header=None).values)
    y_test = pd.read_csv(test_y_path, header=None).to_numpy()

    # Run predictions
    probability = model.predict(X_test)

    # Evaluate predictions
    fpr, tpr, thresholds = roc_curve(y_test, probability)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }

    # Save evaluation report
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    with open(f"{output_dir}/evaluation.json", "w") as f:
        f.write(json.dumps(report_dict))
    
    # Save prediction baseline file - you need it later for the model quality monitoring
    pd.DataFrame({"prediction":np.array(np.round(probability), dtype=int),
                  "probability":probability,
                  "label":y_test.squeeze()}
                ).to_csv(os.path.join(output_prediction_path, 'prediction_baseline/prediction_baseline.csv'), index=False, header=True)

Overwriting evaluation.py


Create a processor to run the evaluation script and construct the evaluation step:

In [467]:
script_processor = ScriptProcessor(
    image_uri=xgboost_image_uri,
    role=sm_role,
    command=["python3"],
    instance_type=process_instance_type_param,
    instance_count=process_instance_count,
    base_job_name=f"{project}-evaluate",
    sagemaker_session=session,
)

eval_inputs=[
    ProcessingInput(source=step_train.properties.ModelArtifacts.S3ModelArtifacts, 
                    destination="/opt/ml/processing/model"),
    ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri, 
                    destination="/opt/ml/processing/test"),
]

eval_outputs=[
    ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", 
                     destination=evaluation_s3_url),
    ProcessingOutput(output_name="prediction_baseline_data", source="/opt/ml/processing/output/prediction_baseline", 
                     destination=prediction_baseline_s3_url),
]

eval_args = script_processor.run(
    inputs=eval_inputs,
    outputs=eval_outputs,
    code="evaluation.py",
)
    
evaluation_report = PropertyFile(
    name="ModelEvaluationReport", output_name="evaluation", path="evaluation.json"
)

step_eval = ProcessingStep(
    name=f"{project}-evaluate",
    step_args=eval_args,
    property_files=[evaluation_report]
)

Use [property files](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html) to store information from the output of a processing step. This is particularly useful when analyzing the results of a processing step to decide how a conditional step should be executed. The `JsonGet` function processes a property file and enables you to use [`JsonPath`](https://github.com/json-path/JsonPath) notation to query the property JSON file.

#### Register step
The register step creates a SageMaker model and registers a new version of a model in the SageMaker Model Registry within a [model package group](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry-model-group.html).

In [468]:
model = Model(
    image_uri=xgboost_image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    name=f"from-idea-to-prod-xgboost-model",
    sagemaker_session=session,
    role=sm_role,
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.xlarge", "ml.m5.large"],
    transform_instances=["ml.m5.xlarge", "ml.m5.large"],
    model_package_group_name=model_package_group_name_param,
    approval_status=model_approval_status_param,
    model_metrics=model_metrics,
)

step_register = ModelStep(
    name=f"{project}-register",
    step_args=register_args
)



#### Fail step
Add a Pipelines [FailStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.fail_step.FailStep) to stop the pipeline execution if the model performance metric doesn't meet the specified threshold. 

In [469]:
step_fail = FailStep(
    name=f"{project}-fail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", test_score_threshold_param]),
)

#### Condition step
The last step to add is a condition step. The condition step checks the model performance score calculated in the evaluation step and conditionally creates a model and registers it in the model registry, or stops and fails the pipeline execution.

In [470]:
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=test_score_threshold_param,
)

step_cond = ConditionStep(
    name=f"{project}-check-test-score",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[step_fail],
)

### Construct a pipeline 
Having all steps, you can now constract an end-to-end pipeline.
You don't need to manually define a sequence of the steps, as SageMaker automatically derives the processing flow based on data dependencies between pipeline's steps. You also don't need to manage transfer of artifacts and datasets from one pipeline's step to another, because SageMaker automatically takes care of the data flow.

Pipelines are integrated with SageMaker Experiments. By default, when SageMaker Pipelines creates and executes a pipeline, the following SageMaker Experiments entities are created if they don't exist:

- An experiment for the pipeline. Experiment name is the name of the pipeline
- A run group for every execution of the pipeline
- A run that's added to the run group for each SageMaker job created in a pipeline execution step

Refer to the [Developer Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-experiments.html) for more details on experiment integration.

In [471]:
pipeline_def_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        process_instance_type_param,
        train_instance_type_param,
        train_instance_count_param,
        model_approval_status_param,
        test_score_threshold_param,
        input_s3_url_param,
        model_package_group_name_param,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=session,
    pipeline_definition_config=pipeline_def_config
)

Based on the data dependencies between the pipeline's steps, SageMaker builds the following DAG with the data flow in your pipeline:
![](img/pipeline-graph.png)

In [472]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sm_role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:906545278380:pipeline/from-idea-to-prod-pipeline-03-14-21-18',
 'ResponseMetadata': {'RequestId': '486149e3-a15f-440b-8ac5-9b49e265a119',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '486149e3-a15f-440b-8ac5-9b49e265a119',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '106',
   'date': 'Sat, 16 Mar 2024 19:27:01 GMT'},
  'RetryAttempts': 0}}

In [None]:
pipeline_definition = json.loads(pipeline.describe()['PipelineDefinition'])
pipeline_definition

Look at and understand the pipeline definition JSON. For example, you can see, how the pipeline paramemters are defined and how are they used.

Definition:
```json
'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.c5.xlarge'},
               ...
               ]
```

Parameter substitution:
```json
'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType':{'Get':'Parameters.ProcessingInstanceType'}
```

You can also see that the processing script `preprocessing.py` is automatically uploaded to an Amazon S3 bucket and the S3 url is specified as one of the step's `ProcessingInputs`:
```json
{'InputName': 'code',
      'AppManaged': False,
      'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-XXXX/PreprocessData-e5d4c2b08e616c264c9dc5053871519a/input/code/preprocessing.py',
       'LocalPath': '/opt/ml/processing/input/code',
       'S3DataType': 'S3Prefix',
       'S3InputMode': 'File',
       'S3DataDistributionType': 'FullyReplicated',
       'S3CompressionType': 'None'}}
```

## Execute the pipeline
The following code starts an execution of the pipeline with the specified parameters.

In [473]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType=process_instance_type,
        TrainingInstanceType=train_instance_type,
        TrainingInstanceCount=train_instance_count,
        ModelApprovalStatus="PendingManualApproval",
        TestScoreThreshold=0.75,
        InputDataUrl=input_s3_url
    )
)

In [None]:
sleep(5)
execution.list_steps()

In [None]:
# Un-comment this call if you want the notebook to wait until the pipeline's execution finished
# Execution time of this pipeline is about 13 minutes
# execution.wait()

You can follow the pipeline execution in Studio by selecting **Pipelines** in **SageMaker Home**:

<img src="img/pipelines-widget.png" width="400"/>

Double click on your pipeline name in the list of the pipelines to see the pipeline executions and other data:

![](img/pipelines-list.png)

After a successful execution you can open the pipeline execution graph and see details for each pipeline step:

![](img/pipeline-execution-graph.png)

## Batch transform in the pipeline
You can integrate a batch transform step using the trained model in the pipeline.


In [None]:
from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep
from sagemaker.inputs import CreateModelInput

In [None]:
# First, create a SageMaker model for the transform
model = Model(
    image_uri=xgboost_image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    name=f"from-idea-to-prod-xgboost-model",
    sagemaker_session=session,
    role=sm_role,
)

model_create_args = model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium")

# Define create model step
step_create_model = ModelStep(
    name=f"{project}-model",
    step_args=model_create_args,
)

In [None]:
# Use the defined model to create a transformer
transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    accept="text/csv",
    assemble_with="Line",
    output_path=f"{output_s3_prefix}/transform",
    sagemaker_session=session,
)
    
transform_args = transformer.transform(    
    data=step_process.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri,
    content_type="text/csv",
    split_type="Line", 
)
            
# Create the transform step
step_transform = TransformStep(
    name=f"{project}-transform", 
    step_args=transform_args,
)

In [None]:
# Add create model, register model, and transform steps to the "true" branch of the condition
step_cond = ConditionStep(
    name=f"{project}-check-test-score",
    conditions=[cond_lte],
    if_steps=[step_create_model, step_register, step_transform],
    else_steps=[step_fail],
)

In [None]:
pipeline_def_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

# Create the pipeline
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        process_instance_type_param,
        train_instance_type_param,
        train_instance_count_param,
        model_approval_status_param,
        test_score_threshold_param,
        input_s3_url_param,
        model_package_group_name_param,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=session,
    pipeline_definition_config=pipeline_def_config,
)

In [None]:
pipeline.upsert(role_arn=sm_role)

The new pipeline contains now the steps for creating the model and executing batch transform:
![](img/pipeline-graph-with-transform.png)

Optionally you can start the new pipeline execution:

In [None]:
execution_batch_transform = pipeline.start(
    parameters=dict(
        ProcessingInstanceType=process_instance_type,
        TrainingInstanceType=train_instance_type,
        TrainingInstanceCount=train_instance_count,
        ModelApprovalStatus="PendingManualApproval",
        TestScoreThreshold=0.75,
        InputDataUrl=input_s3_url
    )
)

In [None]:
sleep(5)
execution_batch_transform.list_steps()

In [None]:
# Un-comment this call if you want the notebook to wait until the pipeline's execution finished
# Execution time of this pipeline is about 17 minutes
# execution_batch_transform.wait()

## Optional: Lift-and-shift your Python code to a pipeline with @step decorator
One of the useful features of SageMaker is easy conversion of your existing Python code to SageMaker pipelines. You can develop Python functions to implement an ML workflow using SageMaker Python SDK and test all code locally in the notebook. When you want to create a pipeline, you can use a [`@step`](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#step-decorator) decorator to convert Python functions into pipeline steps. Refer to the SageMaker [Developer Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-step-decorator-create-pipeline.html) for more details and examples.

The following session uses Python functions to implement workflow steps and then `@step` decorator to re-use the function as a pipeline step.

### Configuring defaults for AWS infrastructure
You can use YAML configuration file to define the default values that are automatically passed to SageMaker APIs. It's especially convenient when you need to provide static parameters for infrastructure settings, such as VPC ids, Security Groups, KMS keys etc, or work with remote functions.

Refer to [Configuring and using defaults with the SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk) documentation for examples and more details.

In [481]:
# Print default location of configuration files
import os
from platformdirs import site_config_dir, user_config_dir

#Prints the location of the admin config file
print(os.path.join(site_config_dir("sagemaker"), "config.yaml"))

#Prints the location of the user config file
print(os.path.join(user_config_dir("sagemaker"), "config.yaml"))

/etc/xdg/sagemaker/config.yaml
/root/.config/sagemaker/config.yaml


The next cell creates a configuration file and sets default values for remote functions. These values are automatically passed to `@step` decorator and you don't need to specify them explicitely. Refer to [Configure your pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-step-decorator-cfg-pipeline.html) in the Developer Guide.

In [482]:
%%writefile config.yaml

SchemaVersion: '1.0'
SageMaker:
    PythonSDK:
        Modules:
            RemoteFunction:
                InstanceType: ml.m5.xlarge
                Dependencies: ./requirements.txt
                IncludeLocalWorkDir: true
                CustomFileFilter:
                    IgnoreNamePatterns: # files or directories to ignore
                        - "*.ipynb" # all notebook files
                        - "*.md" # all markdown files
                        - "__pycache__"

Overwriting config.yaml


In [483]:
# copy the configuration file to user config file location
%mkdir -p {user_config_dir("sagemaker")}
%cp config.yaml {os.path.join(user_config_dir("sagemaker"), "config.yaml")}

### Prepare environment

In [None]:
# Install xgboost for the local testing of the code
%pip install -q xgboost

Create a `requirement.txt` file for the pipeline environment:

In [484]:
%%writefile requirements.txt

sagemaker
xgboost

Overwriting requirements.txt


### Develop and test local code

In [485]:
# Python function code is in the local files
from pipeline_steps.preprocess import preprocess
from pipeline_steps.evaluation import evaluate
from pipeline_steps.register import register
# from pipeline_steps.train import train


In [486]:
# !pygmentize pipeline_steps/preprocess.py

In [487]:
# !pygmentize  pipeline_steps/evaluation.py

In [488]:
# !pygmentize  pipeline_steps/register.py

In [489]:
# check that there is dataset under S3 url
!aws s3 ls {input_s3_url}

2024-03-09 21:14:11    5834924 bank-additional-full.csv


In [490]:
# You can run your Python code locally and verify the corectness before constructing a pipeline
r_preprocess = preprocess(
    input_s3_url,
    output_s3_prefix=output_s3_prefix,
)
r_preprocess

  return bound(*args, **kwds)


Data split > train:(28831, 65) | validation:(8238, 65) | test:(4119, 65)
## Pre-processing complete. Exiting.


{'train_data': 's3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/train/train.csv',
 'validation_data': 's3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/validation/validation.csv',
 'test_x_data': 's3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/test/test_x.csv',
 'test_y_data': 's3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/test/test_y.csv',
 'baseline_data': 's3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/baseline/baseline.csv'}

In [491]:
# check that the function generated output
!aws s3 ls {output_s3_prefix}/test/

2024-03-16 20:57:28     600465 test_x.csv
2024-03-16 20:57:29       8238 test_y.csv


In [492]:
# get a model tar file S3 url from a previous pipeline execution
# you need to wait until the pipeline you created in the previous section completes the training step
execution.wait()
training_job_name = [s['Metadata'] for s in execution.list_steps() if s['StepName'] == f'{project}-train'][0]['TrainingJob']['Arn'].split('/')[1]
model_s3_path = boto3.client('sagemaker').describe_training_job(TrainingJobName=training_job_name)['ModelArtifacts']['S3ModelArtifacts']

In [493]:
# check that the model file is there
!aws s3 ls {model_s3_path}

2024-03-16 19:33:17      11336 model.tar.gz


In [494]:
# Run the evaluation code locally
r_eval = evaluate(
    test_x_data_s3_path=r_preprocess['test_x_data'],
    test_y_data_s3_path=r_preprocess['test_y_data'],
    model_s3_path=model_s3_path,
    output_s3_prefix=output_s3_prefix,
)
r_eval

{'classification_metrics': {'auc_score': {'value': 0.7726359592480987}}}

In [495]:
# check that the evaluation function generated output
!aws s3 ls {output_s3_prefix}/prediction_baseline/

                           PRE reports/
                           PRE results/
2024-03-16 20:57:34      63018 prediction_baseline.csv


In [496]:
# Run the model register code locally
model_package_arn = register(
    training_job_name=training_job_name,
    model_package_group_name=model_package_group_name,
    model_approval_status=model_approval_status,
    evaluation_result=r_eval,
    output_s3_prefix=output_s3_url,
)
model_package_arn


2024-03-16 19:33:24 Starting - Preparing the instances for training
2024-03-16 19:33:24 Downloading - Downloading the training image
2024-03-16 19:33:24 Training - Training image download completed. Training in progress.
2024-03-16 19:33:24 Uploading - Uploading generated training model
2024-03-16 19:33:24 Completed - Training job completed


'arn:aws:sagemaker:us-east-1:906545278380:model-package/from-idea-to-prod-model-group-03-14-21-18/3'

In [497]:
# check that a new model version has been registered in the model package group
boto3.client('sagemaker').describe_model_package(ModelPackageName=model_package_arn)

{'ModelPackageGroupName': 'from-idea-to-prod-model-group-03-14-21-18',
 'ModelPackageVersion': 3,
 'ModelPackageArn': 'arn:aws:sagemaker:us-east-1:906545278380:model-package/from-idea-to-prod-model-group-03-14-21-18/3',
 'CreationTime': datetime.datetime(2024, 3, 16, 20, 57, 43, 515000, tzinfo=tzlocal()),
 'InferenceSpecification': {'Containers': [{'Image': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.5-1',
    'ImageDigest': 'sha256:4cb2fa59a895f589854fd57de22cc0d0c749aec1a0afee48403d9d6252cb4ce4',
    'ModelDataUrl': 's3://sagemaker-us-east-1-906545278380/from-idea-to-prod/xgboost/output/from-idea-to-prod-train-kxqmemfg8c6w-nPDapDh3v1/output/model.tar.gz',
    'Environment': {}}],
  'SupportedTransformInstanceTypes': ['ml.m5.xlarge', 'ml.m5.large'],
  'SupportedRealtimeInferenceInstanceTypes': ['ml.m5.xlarge', 'ml.m5.large'],
  'SupportedContentTypes': ['text/csv'],
  'SupportedResponseMIMETypes': ['text/csv']},
 'ModelPackageStatus': 'Completed',
 'ModelPackageS

### Build a pipeline using @step decorator
After local testing you can use the same Python code without any changes to construct a pipeline.

The next cell creates pipeline steps. Note, that you use @step-decorated functions (preprocess, evaluate, register) and traditional pipeline steps (train) in the same pipeline and pass data between them.

In [498]:
# preprocess data step
processed_data_s3_path = step(preprocess, 
                              instance_type=process_instance_type_param,
                              name=f"{project}-preprocess")(
    data_s3_path=input_s3_url_param,
    output_s3_prefix=output_s3_prefix,
)

# train step
training_inputs = {
    "train": TrainingInput(
        processed_data_s3_path['train_data'],
        content_type="text/csv",
    ),
    "validation": TrainingInput(
        processed_data_s3_path['validation_data'],
        content_type="text/csv",
    ),
}

step_train = TrainingStep(
    name=f"{project}-train",
    step_args=estimator.fit(training_inputs)
)    

# evaluate step
eval_result = step(evaluate,
                   instance_type=process_instance_type_param,
                   name=f"{project}-evaluate")(
    test_x_data_s3_path=processed_data_s3_path['test_x_data'],
    test_y_data_s3_path=processed_data_s3_path['test_y_data'],
    model_s3_path=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    output_s3_prefix=output_s3_prefix
)

# register step
model_package_arn = step(register,
                         name=f"{project}-register")(
    training_job_name=step_train.properties.TrainingJobName,
    model_package_group_name=model_package_group_name_param,
    model_approval_status=model_approval_status_param,
    evaluation_result=eval_result,
    output_s3_prefix=output_s3_url,
)

# fail step
step_fail = FailStep(
    name=f"{project}-fail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", test_score_threshold_param]),
)

# conditionally register step
conditionally_register = ConditionStep(
    name=f"{project}-check-metrics",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            left=eval_result['classification_metrics']['auc_score']['value'],  
            right=test_score_threshold_param,
        )
    ],
    if_steps=[model_package_arn],
    else_steps=[step_fail],
)



To access a step output or pass data between steps you use [`DelayedReturn`](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.DelayedReturn) Python SDK class.

### Create and run a pipeline
Now create and run the pipeline. You need to pass only the last step to `Pipeline` constructor. The SDK automatically builds a pipeline DAG based on data dependencies between steps. Refer to the [Developer Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-step-decorator-create-pipeline.html#pipelines-step-define-delayed) for more details.

In [499]:
last_pipeline_step = conditionally_register

pipeline_decorator = Pipeline(
    name=f"{pipeline_name}-decorator",
    parameters=[
        input_s3_url_param,
        process_instance_type_param,
        train_instance_type_param,
        train_instance_count_param,
        model_approval_status_param,
        test_score_threshold_param,
        model_package_group_name_param,
    ],
    steps=[last_pipeline_step]
)

In [500]:
# Upsert operation serialize the function code, arguments, and other artefacts to S3 where it can be accessed during pipeline's runtime
pipeline_decorator.upsert(role_arn=sm_role)

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.Dependencies
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.IncludeLocalWorkDir
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.CustomFileFilter.IgnoreNamePatterns
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.InstanceType


2024-03-16 20:58:22,360 sagemaker.remote_function INFO     Uploading serialized function code to s3://sagemaker-us-east-1-906545278380/from-idea-to-prod-pipeline-03-14-21-18-decorator/from-idea-to-prod-register/2024-03-16-20-58-20-246/function
2024-03-16 20:58:22,428 sagemaker.remote_function INFO     Uploading serialized function arguments to s3://sagemaker-us-east-1-906545278380/from-idea-to-prod-pipeline-03-14-21-18-decorator/from-idea-to-prod-register/2024-03-16-20-58-20-246/arguments
2024-03-16 20:58:22,595 sagemaker.remote_function INFO     Copied dependencies file at './requirements.txt' to '/tmp/tmpj1noo4iq/requirements.txt'
2024-03-16 20:58:22,637 sagemaker.remote_function INFO     Successfully uploaded dependencies and pre execution scripts to 's3://sagemaker-us-east-1-906545278380/from-idea-to-prod-pipeline-03-14-21-18-decorator/from-idea-to-prod-register/2024-03-16-20-58-20-246/pre_exec_script_and_dependencies'
2024-03-16 20:58:23,090 sagemaker.remote_function INFO     Copi

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.Dependencies
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.IncludeLocalWorkDir
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.CustomFileFilter.IgnoreNamePatterns


2024-03-16 20:58:29,098 sagemaker.remote_function INFO     Uploading serialized function code to s3://sagemaker-us-east-1-906545278380/from-idea-to-prod-pipeline-03-14-21-18-decorator/from-idea-to-prod-evaluate/2024-03-16-20-58-20-246/function
2024-03-16 20:58:29,171 sagemaker.remote_function INFO     Uploading serialized function arguments to s3://sagemaker-us-east-1-906545278380/from-idea-to-prod-pipeline-03-14-21-18-decorator/from-idea-to-prod-evaluate/2024-03-16-20-58-20-246/arguments
2024-03-16 20:58:29,269 sagemaker.remote_function INFO     Copied dependencies file at './requirements.txt' to '/tmp/tmp8f3gekyt/requirements.txt'
2024-03-16 20:58:29,295 sagemaker.remote_function INFO     Successfully uploaded dependencies and pre execution scripts to 's3://sagemaker-us-east-1-906545278380/from-idea-to-prod-pipeline-03-14-21-18-decorator/from-idea-to-prod-evaluate/2024-03-16-20-58-20-246/pre_exec_script_and_dependencies'


sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.Dependencies
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.IncludeLocalWorkDir
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.CustomFileFilter.IgnoreNamePatterns


2024-03-16 20:58:32,483 sagemaker.remote_function INFO     Uploading serialized function code to s3://sagemaker-us-east-1-906545278380/from-idea-to-prod-pipeline-03-14-21-18-decorator/from-idea-to-prod-preprocess/2024-03-16-20-58-20-246/function
2024-03-16 20:58:32,544 sagemaker.remote_function INFO     Uploading serialized function arguments to s3://sagemaker-us-east-1-906545278380/from-idea-to-prod-pipeline-03-14-21-18-decorator/from-idea-to-prod-preprocess/2024-03-16-20-58-20-246/arguments
2024-03-16 20:58:32,635 sagemaker.remote_function INFO     Copied dependencies file at './requirements.txt' to '/tmp/tmp8lp6cl1r/requirements.txt'
2024-03-16 20:58:32,681 sagemaker.remote_function INFO     Successfully uploaded dependencies and pre execution scripts to 's3://sagemaker-us-east-1-906545278380/from-idea-to-prod-pipeline-03-14-21-18-decorator/from-idea-to-prod-preprocess/2024-03-16-20-58-20-246/pre_exec_script_and_dependencies'


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:906545278380:pipeline/from-idea-to-prod-pipeline-03-14-21-18-decorator',
 'ResponseMetadata': {'RequestId': '31825e37-e15f-4878-a9bd-5a6758bde027',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '31825e37-e15f-4878-a9bd-5a6758bde027',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '116',
   'date': 'Sat, 16 Mar 2024 20:58:32 GMT'},
  'RetryAttempts': 0}}

In [501]:
execution_decorator = pipeline_decorator.start()

In [None]:
execution_decorator.describe()
execution_decorator.wait()
execution_decorator.list_steps()

### Limitations
Be aware of the specific [limitations](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-step-decorator-limit.html) when you use `@step` decorator for pipeline steps.

## Add Feature Store
In this section you use SageMaker Feature Store to manage features and dataset for model training. 

In [182]:
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.feature_store.inputs import TableFormatEnum
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor, to_pipeline
from sagemaker.remote_function import remote
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from datetime import datetime, timezone, date
from time import gmtime, strftime, sleep
import time

In [183]:
%store -r 

In [184]:
session = sagemaker.Session()
project = "from-idea-to-prod"
current_timestamp = strftime('%m-%d-%H-%M', gmtime())

In [185]:
feature_store_bucket_prefix = 'from-idea-to-prod/feature-store'
%store feature_store_bucket_prefix

Stored 'feature_store_bucket_prefix' (str)


### Transform raw data into training-ready features
First transform raw data into features in order to be able to extract feature group schema based on the transformed DataFrame.

In [186]:
df_data = pd.read_csv(dataset_file_local_path, sep=";")
pd.set_option("display.max_columns", 500)
df_data

Unnamed: 0,age,job,marital,education,default,housing,loan,contact,month,day_of_week,duration,campaign,pdays,previous,poutcome,emp.var.rate,cons.price.idx,cons.conf.idx,euribor3m,nr.employed,y
0,56,housemaid,married,basic.4y,no,no,no,telephone,may,mon,261,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
1,57,services,married,high.school,unknown,no,no,telephone,may,mon,149,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
2,37,services,married,high.school,no,yes,no,telephone,may,mon,226,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
3,40,admin.,married,basic.6y,no,no,no,telephone,may,mon,151,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
4,56,services,married,high.school,no,no,yes,telephone,may,mon,307,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
41183,73,retired,married,professional.course,no,yes,no,cellular,nov,fri,334,1,999,0,nonexistent,-1.1,94.767,-50.8,1.028,4963.6,yes
41184,46,blue-collar,married,professional.course,no,no,no,cellular,nov,fri,383,1,999,0,nonexistent,-1.1,94.767,-50.8,1.028,4963.6,no
41185,56,retired,married,university.degree,no,yes,no,cellular,nov,fri,189,2,999,0,nonexistent,-1.1,94.767,-50.8,1.028,4963.6,no
41186,44,technician,married,professional.course,no,no,no,cellular,nov,fri,442,1,999,0,nonexistent,-1.1,94.767,-50.8,1.028,4963.6,yes


In [187]:
target_col = "y"

# Indicator variable to capture when pdays takes a value of 999
df_data["no_previous_contact"] = np.where(df_data["pdays"] == 999, 1, 0)

# Indicator for individuals not actively employed
df_data["not_working"] = np.where(
    np.in1d(df_data["job"], ["student", "retired", "unemployed"]), 1, 0
)

# remove unnecessary data
df_model_data = df_data.drop(
    ["duration", "emp.var.rate", "cons.price.idx", "cons.conf.idx", "euribor3m", "nr.employed"],
    axis=1,
)

bins = [18, 30, 40, 50, 60, 70, 90]
labels = ['18-29', '30-39', '40-49', '50-59', '60-69', '70-plus']

df_model_data['age_range'] = pd.cut(df_model_data.age, bins, labels=labels, include_lowest=True)
df_model_data = pd.concat([df_model_data, pd.get_dummies(df_model_data['age_range'], prefix='age', dtype=int)], axis=1)
df_model_data.drop('age', axis=1, inplace=True)
df_model_data.drop('age_range', axis=1, inplace=True)

scaled_features = ['pdays', 'previous', 'campaign']
df_model_data[scaled_features] = MinMaxScaler().fit_transform(df_model_data[scaled_features])

df_model_data = pd.get_dummies(df_model_data, dtype=int)  # Convert categorical variables to sets of indicators

# Replace "y_no" and "y_yes" with a single label column, and bring it to the front:
df_model_data = pd.concat(
    [
        df_model_data["y_yes"].rename(target_col),
        df_model_data.drop(["y_no", "y_yes"], axis=1),
    ],
    axis=1,
)

In [188]:
def generate_event_timestamp():
    # naive datetime representing local time
    naive_dt = datetime.now()
    # take timezone into account
    aware_dt = naive_dt.astimezone()
    # time in UTC
    utc_dt = aware_dt.astimezone(timezone.utc)
    # transform to ISO-8601 format
    event_time = utc_dt.isoformat(timespec='milliseconds')
    event_time = event_time.replace('+00:00', 'Z')
    return event_time

Add `event_time` and `record_id` columns to the dataset as these two fields are required for each feature group:

In [189]:
df_model_data['event_time'] = generate_event_timestamp()
df_model_data['record_id'] = [f'R{i}' for i in range(len(df_model_data))]

Feature names cannot contain '.' and cannot end on '_'. Also remove '-' from the column names when converting column names:

In [190]:
def convert_col_name(c):
    return c.replace('.', '_').replace('-', '_').rstrip('_')

In [191]:
df_model_data = df_model_data.rename(columns=convert_col_name)
df_model_data = df_model_data.convert_dtypes(infer_objects=True, convert_boolean=False)
df_model_data['record_id'] = df_model_data['record_id'].astype('string')
df_model_data['event_time'] = df_model_data['event_time'].astype('string')

In [192]:
df_model_data.dtypes

y                                Int64
campaign                       Float64
pdays                          Float64
previous                       Float64
no_previous_contact              Int64
                             ...      
poutcome_failure                 Int64
poutcome_nonexistent             Int64
poutcome_success                 Int64
event_time              string[python]
record_id               string[python]
Length: 67, dtype: object

In [193]:
df_model_data.columns

Index(['y', 'campaign', 'pdays', 'previous', 'no_previous_contact',
       'not_working', 'age_18_29', 'age_30_39', 'age_40_49', 'age_50_59',
       'age_60_69', 'age_70_plus', 'job_admin', 'job_blue_collar',
       'job_entrepreneur', 'job_housemaid', 'job_management', 'job_retired',
       'job_self_employed', 'job_services', 'job_student', 'job_technician',
       'job_unemployed', 'job_unknown', 'marital_divorced', 'marital_married',
       'marital_single', 'marital_unknown', 'education_basic_4y',
       'education_basic_6y', 'education_basic_9y', 'education_high_school',
       'education_illiterate', 'education_professional_course',
       'education_university_degree', 'education_unknown', 'default_no',
       'default_unknown', 'default_yes', 'housing_no', 'housing_unknown',
       'housing_yes', 'loan_no', 'loan_unknown', 'loan_yes',
       'contact_cellular', 'contact_telephone', 'month_apr', 'month_aug',
       'month_dec', 'month_jul', 'month_jun', 'month_mar', 'month_may'

In [194]:
df_model_data.head()

Unnamed: 0,y,campaign,pdays,previous,no_previous_contact,not_working,age_18_29,age_30_39,age_40_49,age_50_59,age_60_69,age_70_plus,job_admin,job_blue_collar,job_entrepreneur,job_housemaid,job_management,job_retired,job_self_employed,job_services,job_student,job_technician,job_unemployed,job_unknown,marital_divorced,marital_married,marital_single,marital_unknown,education_basic_4y,education_basic_6y,education_basic_9y,education_high_school,education_illiterate,education_professional_course,education_university_degree,education_unknown,default_no,default_unknown,default_yes,housing_no,housing_unknown,housing_yes,loan_no,loan_unknown,loan_yes,contact_cellular,contact_telephone,month_apr,month_aug,month_dec,month_jul,month_jun,month_mar,month_may,month_nov,month_oct,month_sep,day_of_week_fri,day_of_week_mon,day_of_week_thu,day_of_week_tue,day_of_week_wed,poutcome_failure,poutcome_nonexistent,poutcome_success,event_time,record_id
0,0,0.0,1.0,0.0,1,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,1,0,0,0,0,0,0,0,1,0,0,1,0,0,1,0,0,0,1,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,2024-03-11T15:15:41.091Z,R0
1,0,0.0,1.0,0.0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,1,0,0,1,0,0,0,1,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,2024-03-11T15:15:41.091Z,R1
2,0,0.0,1.0,0.0,1,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,1,0,0,0,1,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,2024-03-11T15:15:41.091Z,R2
3,0,0.0,1.0,0.0,1,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,1,0,0,1,0,0,1,0,0,0,1,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,2024-03-11T15:15:41.091Z,R3
4,0,0.0,1.0,0.0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,1,0,0,1,0,0,0,0,1,0,1,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,2024-03-11T15:15:41.091Z,R4


In [521]:
df_model_data.shape

(41188, 67)

In [195]:
df_model_data.to_csv('./data/feature_dataset.csv', index=False)

In [196]:
record_count = len(df_model_data)

In [197]:
record_count

41188

### Create a feature group
Now is everything ready to create a feature group with the dataset schema.

In [198]:
dataset_feature_group_name = f'{project}-{current_timestamp}'

In [199]:
%store dataset_feature_group_name

Stored 'dataset_feature_group_name' (str)


In [200]:
dataset_feature_group = FeatureGroup(name=dataset_feature_group_name, sagemaker_session=session)

In [None]:
# use the DataFrame to extract the feature group definitions
dataset_feature_group.load_feature_definitions(data_frame=df_model_data)

In [202]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get('FeatureGroupStatus')
    print(f'Initial status: {status}')
    while status == 'Creating':
        print(f'Waiting for feature group: {feature_group.name} to be created ...')
        time.sleep(5)
        status = feature_group.describe().get('FeatureGroupStatus')
    if status != 'Created':
        raise SystemExit(f'Failed to create feature group {feature_group.name}: {status}')
    print(f'FeatureGroup {feature_group.name} was successfully created.')

In [203]:
dataset_feature_group.create(
    s3_uri=f's3://{bucket_name}/{feature_store_bucket_prefix}', 
    record_identifier_name='record_id', 
    event_time_feature_name='event_time', 
    role_arn=sm_role, 
    enable_online_store=False,
    table_format=TableFormatEnum.ICEBERG 
)

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:906545278380:feature-group/from-idea-to-prod-03-11-15-15',
 'ResponseMetadata': {'RequestId': 'a4b19b62-0ba3-44db-8f6c-e4348d49883a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'a4b19b62-0ba3-44db-8f6c-e4348d49883a',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '106',
   'date': 'Mon, 11 Mar 2024 15:17:45 GMT'},
  'RetryAttempts': 0}}

In [34]:
wait_for_feature_group_creation_complete(dataset_feature_group)

Initial status: Creating
Waiting for feature group: from-idea-to-prod-03-11-09-47 to be created ...
Waiting for feature group: from-idea-to-prod-03-11-09-47 to be created ...
Waiting for feature group: from-idea-to-prod-03-11-09-47 to be created ...
Waiting for feature group: from-idea-to-prod-03-11-09-47 to be created ...
Waiting for feature group: from-idea-to-prod-03-11-09-47 to be created ...
FeatureGroup from-idea-to-prod-03-11-09-47 was successfully created.


In [204]:
dataset_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:906545278380:feature-group/from-idea-to-prod-03-11-15-15',
 'FeatureGroupName': 'from-idea-to-prod-03-11-15-15',
 'RecordIdentifierFeatureName': 'record_id',
 'EventTimeFeatureName': 'event_time',
 'FeatureDefinitions': [{'FeatureName': 'y', 'FeatureType': 'Integral'},
  {'FeatureName': 'campaign', 'FeatureType': 'Fractional'},
  {'FeatureName': 'pdays', 'FeatureType': 'Fractional'},
  {'FeatureName': 'previous', 'FeatureType': 'Fractional'},
  {'FeatureName': 'no_previous_contact', 'FeatureType': 'Integral'},
  {'FeatureName': 'not_working', 'FeatureType': 'Integral'},
  {'FeatureName': 'age_18_29', 'FeatureType': 'Integral'},
  {'FeatureName': 'age_30_39', 'FeatureType': 'Integral'},
  {'FeatureName': 'age_40_49', 'FeatureType': 'Integral'},
  {'FeatureName': 'age_50_59', 'FeatureType': 'Integral'},
  {'FeatureName': 'age_60_69', 'FeatureType': 'Integral'},
  {'FeatureName': 'age_70_plus', 'FeatureType': 'Integral'},
  {'FeatureName': 

The feature group is ready for use. Now you need to ingest data into it.

### Ingest data into the feature group
Same as in the previous section you're going to use [`@step`](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-step-decorator-create-pipeline.html) decorator to create a feature ingestion pipeline. You can use a SageMaker pipeline to process and ingest features.

In [205]:
# Create a requirements.txt file with dependencies for the pipeline step
sagemaker_version = sagemaker.__version__
with open("requirements.txt", "w") as file:
    file.write(f"sagemaker=={sagemaker_version}")

Compile all previous feature transformation and ingestion code into a remote function. You use the Python SDK [`FeatureGroup.ingest()`](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_group.FeatureGroup.ingest) method to ingest the content of a pandas DataFrame to a feature group.

In [206]:
@step(name=f'{project}-fs-ingest')
def process_and_ingest(
    input_s3_url,
    feature_group_name,
):
    from sagemaker.feature_store.feature_group import FeatureGroup
    from sagemaker.s3_utils import parse_s3_url
    import numpy as np
    from sklearn.preprocessing import MinMaxScaler
    from datetime import datetime, timezone, date
    import boto3

    def generate_event_timestamp():
        # naive datetime representing local time
        naive_dt = datetime.now()
        # take timezone into account
        aware_dt = naive_dt.astimezone()
        # time in UTC
        utc_dt = aware_dt.astimezone(timezone.utc)
        # transform to ISO-8601 format
        event_time = utc_dt.isoformat(timespec='milliseconds')
        event_time = event_time.replace('+00:00', 'Z')
        return event_time
    
    def convert_col_name(c):
        return c.replace('.', '_').replace('-', '_').rstrip('_')
    
    s3 = boto3.client("s3")
    
    bucket, object_key = parse_s3_url(input_s3_url)
    s3.download_file(bucket, object_key, "input_data.csv")
    
    # Load data
    df_data = pd.read_csv("input_data.csv", sep=";")
    
    target_col = "y"
    
    # Indicator variable to capture when pdays takes a value of 999
    df_data["no_previous_contact"] = np.where(df_data["pdays"] == 999, 1, 0)

    # Indicator for individuals not actively employed
    df_data["not_working"] = np.where(
        np.in1d(df_data["job"], ["student", "retired", "unemployed"]), 1, 0
    )

    # remove unnecessary data
    df_model_data = df_data.drop(
        ["duration", "emp.var.rate", "cons.price.idx", "cons.conf.idx", "euribor3m", "nr.employed"],
        axis=1,
    )

    bins = [18, 30, 40, 50, 60, 70, 90]
    labels = ['18-29', '30-39', '40-49', '50-59', '60-69', '70-plus']

    df_model_data['age_range'] = pd.cut(df_model_data.age, bins, labels=labels, include_lowest=True)
    df_model_data = pd.concat([df_model_data, pd.get_dummies(df_model_data['age_range'], prefix='age', dtype=int)], axis=1)
    df_model_data.drop('age', axis=1, inplace=True)
    df_model_data.drop('age_range', axis=1, inplace=True)

    scaled_features = ['pdays', 'previous', 'campaign']
    df_model_data[scaled_features] = MinMaxScaler().fit_transform(df_model_data[scaled_features])

    df_model_data = pd.get_dummies(df_model_data, dtype=int)  # Convert categorical variables to sets of indicators

    # Replace "y_no" and "y_yes" with a single label column, and bring it to the front:
    df_model_data = pd.concat(
        [
            df_model_data["y_yes"].rename(target_col),
            df_model_data.drop(["y_no", "y_yes"], axis=1),
        ],
        axis=1,
    )
    
    df_model_data['event_time'] = generate_event_timestamp()
    df_model_data['record_id'] = [f'R{i}' for i in range(len(df_model_data))]
    
    df_model_data = df_model_data.rename(columns=convert_col_name)
    
    df_model_data = df_model_data.convert_dtypes(infer_objects=True, convert_boolean=False)
    df_model_data['record_id'] = df_model_data['record_id'].astype('string')
    df_model_data['event_time'] = df_model_data['event_time'].astype('string')
    
    # Ingest data into the feature group
    dataset_feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker.Session())
    
    print(f'Ingesting data into feature group: {dataset_feature_group.name} ...')
    dataset_feature_group.ingest(data_frame=df_model_data, max_processes=4, wait=True)
    print(f'{len(df_model_data)} customer records ingested into feature group: {dataset_feature_group.name}')
    
    return dataset_feature_group.describe()['FeatureGroupArn']

In [287]:
# Create parameters for the feature store ingestion pipeline
input_s3_url_param = ParameterString(
    name="InputDataUrl",
    default_value=input_s3_url,
)

feature_group_name_param = ParameterString(
    name="FeatureGroupName",
    default_value=dataset_feature_group.describe()['FeatureGroupArn'],
)

In [208]:
# Create a pipeline with an ingest step
pipeline_fs_ingest = Pipeline(
    name=f"{pipeline_name}-fs-ingest",
    parameters=[
        input_s3_url_param,
        feature_group_name_param
    ],
    steps=[
        process_and_ingest(
            input_s3_url=input_s3_url_param,
            feature_group_name=feature_group_name_param,
        )
    ]
)

In [209]:
pipeline_fs_ingest.upsert(role_arn=sm_role)

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.Dependencies
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.IncludeLocalWorkDir
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.CustomFileFilter.IgnoreNamePatterns
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.RemoteFunction.InstanceType


2024-03-11 15:20:50,896 sagemaker.remote_function INFO     Uploading serialized function code to s3://sagemaker-us-east-1-906545278380/from-idea-to-prod-pipeline-03-11-09-43-fs-ingest/from-idea-to-prod-fs-ingest/2024-03-11-15-20-48-054/function
2024-03-11 15:20:50,968 sagemaker.remote_function INFO     Uploading serialized function arguments to s3://sagemaker-us-east-1-906545278380/from-idea-to-prod-pipeline-03-11-09-43-fs-ingest/from-idea-to-prod-fs-ingest/2024-03-11-15-20-48-054/arguments
2024-03-11 15:20:51,175 sagemaker.remote_function INFO     Copied dependencies file at './requirements.txt' to '/tmp/tmpnedd3b2q/requirements.txt'
2024-03-11 15:20:51,204 sagemaker.remote_function INFO     Successfully uploaded dependencies and pre execution scripts to 's3://sagemaker-us-east-1-906545278380/from-idea-to-prod-pipeline-03-11-09-43-fs-ingest/from-idea-to-prod-fs-ingest/2024-03-11-15-20-48-054/pre_exec_script_and_dependencies'
2024-03-11 15:20:51,587 sagemaker.remote_function INFO     C

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:906545278380:pipeline/from-idea-to-prod-pipeline-03-11-09-43-fs-ingest',
 'ResponseMetadata': {'RequestId': '61d40a38-e996-4924-9142-86128f7c0d62',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '61d40a38-e996-4924-9142-86128f7c0d62',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '116',
   'date': 'Mon, 11 Mar 2024 15:21:00 GMT'},
  'RetryAttempts': 0}}

In [210]:
execution_fs_ingest = pipeline_fs_ingest.start()

In [211]:
execution_fs_ingest.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:906545278380:pipeline/from-idea-to-prod-pipeline-03-11-09-43-fs-ingest',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:906545278380:pipeline/from-idea-to-prod-pipeline-03-11-09-43-fs-ingest/execution/rmlv3gxa4jm7',
 'PipelineExecutionDisplayName': 'execution-1710170466576',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'from-idea-to-prod-pipeline-03-11-09-43-fs-ingest',
  'TrialName': 'rmlv3gxa4jm7'},
 'CreationTime': datetime.datetime(2024, 3, 11, 15, 21, 6, 515000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 3, 11, 15, 21, 6, 515000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:906545278380:user-profile/d-9oteoq7mqbpp/studio-user-54491630',
  'UserProfileName': 'studio-user-54491630',
  'DomainId': 'd-9oteoq7mqbpp',
  'IamIdentity': {'Arn': 'arn:aws:sts::906545278380:assumed-role/sm-domain-workshop-SageMakerExecutionRole-2tKGP8Kthe7Q/

In [212]:
execution_fs_ingest.wait()
execution_fs_ingest.list_steps()

[{'StepName': 'from-idea-to-prod-fs-ingest',
  'StartTime': datetime.datetime(2024, 3, 11, 15, 21, 7, 785000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 3, 11, 15, 25, 36, 626000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 1,
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:906545278380:training-job/pipelines-rmlv3gxa4jm7-from-idea-to-prod-fs-kaaFXpqrqJ'}}}]

### Retrieve ingested features from the feature group

There are many approaches to extract features from the offline feature store. For example, you can use [Amazon Athena query](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-create-a-dataset.html#feature-store-athena-sample-queries) to query and join data stored in the offline store, or you can use [Offline Store Python SDK](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-create-a-dataset.html#feature-store-dataset-python-sdk). You're going to use Python SDK to extract features and create a dataset for the model building pipeline.

<div style="border: 4px solid coral; text-align: center; margin: auto;">
    <p style=" text-align: center; margin: auto;">Ingestion to the offline store is buffered and it takes up to 15 minutes for data to appear in the feature group. After features are ingested and available in the offline store, you can query them and create datasets for model training and scoring.
    </p>
</div>


In [213]:
sagemaker_client = boto3.client('sagemaker')
output_location = f's3://{bucket_name}/{feature_store_bucket_prefix}/offline-store/query_results/'

In [214]:
def get_historical_record_count(fg):
    fs_query = dataset_feature_group.athena_query()
    query_string = f'SELECT COUNT(*) FROM "' + fs_query.table_name + f'"'
    output_location =  f's3://{bucket_name}/{feature_store_bucket_prefix}/offline-store/query_results/'

    fs_query.run(query_string=query_string, output_location=output_location)
    fs_query.wait()
    fs_df = fs_query.as_dataframe()
    
    return fs_df.iat[0, 0]

In [215]:
# Before accessing the feature data you need to check if the offline feature store was populated
offline_store_contents = None
while offline_store_contents is None:    
    fs_record_count = get_historical_record_count(dataset_feature_group)

    if fs_record_count >= record_count:
        print(f'[{fs_record_count} feature records are available in offline store for {dataset_feature_group.name} feature group]')
        offline_store_contents = fs_record_count
    else:
        print('[Waiting for data arrives in offline store ...]')
        time.sleep(60)

INFO:sagemaker:Query 62514bc6-4ac6-4461-a4a6-3cc5f367c7e0 is being executed.
INFO:sagemaker:Query 62514bc6-4ac6-4461-a4a6-3cc5f367c7e0 successfully executed.


[Waiting for data arrives in offline store ...]


INFO:sagemaker:Query 20c8dc1b-2f73-4f9f-9857-f531f57e75b6 is being executed.
INFO:sagemaker:Query 20c8dc1b-2f73-4f9f-9857-f531f57e75b6 successfully executed.


[Waiting for data arrives in offline store ...]


INFO:sagemaker:Query fb5467c1-9f25-4126-a13c-55c57723cb58 is being executed.
INFO:sagemaker:Query fb5467c1-9f25-4126-a13c-55c57723cb58 successfully executed.


[Waiting for data arrives in offline store ...]


INFO:sagemaker:Query 79613601-050a-4c8a-a911-fb00b8979265 is being executed.
INFO:sagemaker:Query 79613601-050a-4c8a-a911-fb00b8979265 successfully executed.


[41188 feature records are available in offline store for from-idea-to-prod-03-11-15-15 feature group]


#### Use the Amazon SageMaker Python SDK (DatasetBuilder) to query the feature store
This section demonstrates how to use [`DatasetBuilder`](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.dataset_builder.DatasetBuilder) to get data from feature groups. Refer to the [Developer Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-create-a-dataset.html) for detailed examples.

In [216]:
from sagemaker.feature_store.feature_store import FeatureStore

In [217]:
region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)

s3_client = boto3.client('s3', region_name=region)
sagemaker_client = boto_session.client(service_name="sagemaker", region_name=region)
featurestore_runtime = boto_session.client(service_name="sagemaker-featurestore-runtime",region_name=region)

In [218]:
# Create FeatureStore session object
feature_store_session = sagemaker.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime,
)

feature_store = FeatureStore(sagemaker_session=feature_store_session)

In [219]:
included_feature_names = [f.feature_name for f in dataset_feature_group.feature_definitions]

In [224]:
# Create dataset builder to retrieve the most recent version of each record
builder = feature_store.create_dataset(
    base=dataset_feature_group,
    # included_feature_names=included_feature_names,
    output_path=output_location,
).with_number_of_recent_records_by_record_identifier(1)

In [225]:
df_dataset, query = builder.to_dataframe()

INFO:sagemaker:Query b98da08f-1832-4f4b-b5d7-e9146ab83b11 is being executed.
INFO:sagemaker:Query b98da08f-1832-4f4b-b5d7-e9146ab83b11 successfully executed.


In [439]:
df_dataset

Unnamed: 0,y,campaign,pdays,previous,no_previous_contact,not_working,age_18_29,age_30_39,age_40_49,age_50_59,age_60_69,age_70_plus,job_admin,job_blue_collar,job_entrepreneur,job_housemaid,job_management,job_retired,job_self_employed,job_services,job_student,job_technician,job_unemployed,job_unknown,marital_divorced,marital_married,marital_single,marital_unknown,education_basic_4y,education_basic_6y,education_basic_9y,education_high_school,education_illiterate,education_professional_course,education_university_degree,education_unknown,default_no,default_unknown,default_yes,housing_no,housing_unknown,housing_yes,loan_no,loan_unknown,loan_yes,contact_cellular,contact_telephone,month_apr,month_aug,month_dec,month_jul,month_jun,month_mar,month_may,month_nov,month_oct,month_sep,day_of_week_fri,day_of_week_mon,day_of_week_thu,day_of_week_tue,day_of_week_wed,poutcome_failure,poutcome_nonexistent,poutcome_success,event_time,record_id
0,0,0.018182,1.0,0.000000,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,0,1,0,0,1,0,0,1,0,0,0,0,1,1,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,1,0,2024-03-11T15:23:16.272Z,R19132
1,0,0.000000,1.0,0.000000,1,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,1,0,0,0,0,0,0,0,1,0,0,0,0,1,1,0,0,1,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,2024-03-11T15:23:16.272Z,R19358
2,0,0.036364,1.0,0.000000,1,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,1,0,0,1,0,0,1,0,0,0,1,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,2024-03-11T15:23:16.272Z,R9125
3,0,0.018182,1.0,0.000000,1,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,1,0,0,1,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,1,0,2024-03-11T15:23:16.272Z,R40455
4,0,0.000000,1.0,0.000000,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,1,0,1,0,0,0,0,1,1,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,1,0,2024-03-11T15:23:16.272Z,R40459
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
41183,0,0.018182,1.0,0.142857,1,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,1,0,0,2024-03-11T15:23:16.272Z,R32456
41184,0,0.072727,1.0,0.000000,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,0,0,1,0,0,1,0,0,0,1,1,0,0,0,1,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,1,0,2024-03-11T15:23:16.272Z,R1587
41185,1,0.036364,1.0,0.000000,1,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,1,0,1,0,0,1,0,0,0,1,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,2024-03-11T15:23:16.272Z,R11931
41186,1,0.000000,1.0,0.000000,1,0,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,1,0,0,0,0,1,1,0,0,1,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,2024-03-11T15:23:16.272Z,R32830


### Integrate a feature group in a model building pipeline
So far you ingested all transformed features into the feature store. As the last step in this notebook you need to adjust the model building pipeline to use the transformed features from the feature group instead of loading and transforming a raw data file from an S3 bucket.

Create a new script to extract features and create datasets for training and evaluation steps. There is no feature processing code in the script because all feature engineering is done before ingesting features into feature store.

In [518]:
%%writefile extract_features.py

import boto3
import pandas as pd
import numpy as np
import argparse
import os
from sagemaker.session import Session
from sagemaker.feature_store.feature_store import FeatureStore
from sagemaker.feature_store.feature_group import FeatureGroup

def _parse_args():
    
    parser = argparse.ArgumentParser()
    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.
    parser.add_argument('--feature_group_name', type=str, default='from-idea-to-prod')
    parser.add_argument('--outputpath', type=str, default='/opt/ml/processing/output/')
    
    return parser.parse_known_args()

def extract_features(
    feature_group_name,
):
    region = boto3.Session().region_name
    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client(service_name="sagemaker", region_name=region)
    featurestore_runtime = boto_session.client(service_name="sagemaker-featurestore-runtime", region_name=region)
    
    # Create FeatureStore session object
    feature_store_session = Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_featurestore_runtime_client=featurestore_runtime,
    )

    output_location = f"s3://{feature_store_session.default_bucket()}/from-idea-to-prod/feature-store/offline-store/query_results/"
    feature_store = FeatureStore(sagemaker_session=feature_store_session)
    dataset_feature_group = FeatureGroup(feature_group_name)
    
    # Create dataset builder to retrieve the most recent version of each record
    builder = feature_store.create_dataset(
        base=dataset_feature_group,
        output_path=output_location,
    ).with_number_of_recent_records_by_record_identifier(1)
    
    df_dataset, query = builder.to_dataframe()
    
    return df_dataset
    
if __name__=="__main__":
    # Process arguments
    args, _ = _parse_args()
    
    target_col = 'y'
    feature_store_col = ['event_time', 'record_id']
    
    df_model_data = extract_features(args.feature_group_name).drop(feature_store_col, axis=1)
    
    print(f"Extracted {len(df_model_data)} rows from the feature group {args.feature_group_name}")
    
    # Shuffle and splitting dataset
    train_data, validation_data, test_data = np.split(
        df_model_data.sample(frac=1, random_state=1729),
        [int(0.7 * len(df_model_data)), int(0.9 * len(df_model_data))],
    )

    print(f"Data split > train:{train_data.shape} | validation:{validation_data.shape} | test:{test_data.shape}")
    
     # Save datasets locally
    train_data.to_csv(os.path.join(args.outputpath, 'train/train.csv'), index=False, header=False)
    validation_data.to_csv(os.path.join(args.outputpath, 'validation/validation.csv'), index=False, header=False)
    test_data[target_col].to_csv(os.path.join(args.outputpath, 'test/test_y.csv'), index=False, header=False)
    test_data.drop([target_col], axis=1).to_csv(os.path.join(args.outputpath, 'test/test_x.csv'), index=False, header=False)
    
    # Save the baseline dataset for model monitoring
    df_model_data.drop([target_col], axis=1).to_csv(os.path.join(args.outputpath, 'baseline/baseline.csv'), index=False, header=False)
    
    print("Created datasets. Exiting.")

Overwriting extract_features.py


#### Adjust the existing model building pipeline

You need to replace the `preproccess` step with a new `extract` step in the pipeline. Look at the pipeline graph and step dependencies:

![](img/pipeline-graph-dependencies.png)

There are two other steps depending on the `preprocess` – `train` and `evaluate`. You need to replace all references to the `preprocess` step in the input parameters for all the dependent steps with the `extract` step.
Since the `train` and `evaluate` steps are changed, all their dependend steps must be re-defined as well. In this pipeline, the `register` step depends on the `train` step and the conditional step depends on `evaluate`.

The follow code creates a new `extract` step, re-defines all dependend steps with new references, and upserts the adjusted pipeline.

In [508]:
pipeline_session = PipelineSession()

In [509]:
script_image_uri = boto3.client('sagemaker').describe_training_job(
    TrainingJobName=execution_fs_ingest.list_steps()[0]['Metadata']['TrainingJob']['Arn'].split('/')[1]
)['AlgorithmSpecification']['TrainingImage']

env = {'AWS_DEFAULT_REGION':boto3.Session().region_name}

In [519]:
%store script_image_uri

Stored 'script_image_uri' (str)


In [510]:
# Use script processor with the same ECR image as feature ingestion script
extract_processor = ScriptProcessor(
        image_uri=script_image_uri,
        role=sm_role,
        command=["python3"],
        instance_type=process_instance_type_param,
        instance_count=process_instance_count,
        base_job_name=f"{pipeline_name}-extract",
        sagemaker_session=pipeline_session,
        env=env,
    )

# re-use the feature_group_name_param pipeline parameter 
processor_args = extract_processor.run(
    code='extract_features.py',
    outputs=processing_outputs,
    arguments = ['--feature_group_name', feature_group_name_param],
)
    
# Define feature extraction step
step_extract = ProcessingStep(
    name=f"{project}-extract",
    step_args=processor_args,
)

# Change references in training_inputs from step_process to step_extract
training_inputs = {
    "train": TrainingInput(
        s3_data=step_extract.properties.ProcessingOutputConfig.Outputs[
            "train_data"
        ].S3Output.S3Uri,
        content_type="text/csv",
    ),
    "validation": TrainingInput(
        s3_data=step_extract.properties.ProcessingOutputConfig.Outputs[
            "validation_data"
        ].S3Output.S3Uri,
        content_type="text/csv",
    ),
}

training_args = estimator.fit(training_inputs)

# Re-define training step
step_train = TrainingStep(
    name=f"{project}-train",
    step_args=training_args
)

# Change references in eval_input for evaluation step from step_process to step_extract
eval_inputs=[
    ProcessingInput(source=step_train.properties.ModelArtifacts.S3ModelArtifacts, 
                    destination="/opt/ml/processing/model"),
    ProcessingInput(source=step_extract.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri, 
                    destination="/opt/ml/processing/test"),
]

eval_args = script_processor.run(
    inputs=eval_inputs,
    outputs=eval_outputs,
    code="evaluation.py",
)

# Re-define evaluation step
step_eval = ProcessingStep(
    name=f"{project}-evaluate",
    step_args=eval_args,
    property_files=[evaluation_report]
)

# Create a model object
model = Model(
    image_uri=xgboost_image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=session,
    role=sm_role,
)
    
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.xlarge", "ml.m5.large"],
    transform_instances=["ml.m5.xlarge", "ml.m5.large"],
    model_package_group_name=model_package_group_name_param,
    approval_status=model_approval_status_param,
    model_metrics=model_metrics,
)

# Re-define register step
step_register = ModelStep(
    name=f"{pipeline_name}-register",
    step_args=register_args
)

# Remove transform step from the pipeline
step_cond = ConditionStep(
    name=f"{project}-check-test-score",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[step_fail],
)



In [511]:
pipeline_def_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

pipeline_feature_store = Pipeline(
    name=f"{pipeline_name}-feature-store",
    parameters=[
        feature_group_name_param,
        process_instance_type_param,
        train_instance_type_param,
        train_instance_count_param,
        model_approval_status_param,
        test_score_threshold_param,
        model_package_group_name_param,
    ],
    steps=[step_extract, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
    pipeline_definition_config=pipeline_def_config
)

In [512]:
pipeline_feature_store.upsert(role_arn=sm_role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:906545278380:pipeline/from-idea-to-prod-pipeline-03-14-21-18-feature-store',
 'ResponseMetadata': {'RequestId': '2b66f32c-7359-4cb0-8dae-52ec0c368669',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '2b66f32c-7359-4cb0-8dae-52ec0c368669',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '120',
   'date': 'Sat, 16 Mar 2024 21:28:37 GMT'},
  'RetryAttempts': 0}}

In [513]:
# pipeline_definition = json.loads(pipeline_feature_store.describe()['PipelineDefinition'])
# pipeline_definition

In [514]:
execution_feature_store = pipeline_feature_store.start()

In [420]:
# Uncomment these two lines if you'd like to wait until execution is done
# execution_fs_ingest.wait()
# execution_fs_ingest.list_steps()

### Further reading: use Feature Processor for feature transformation and ingestion
SageMaker provides you with a Spark-based [Feature Processor SDK](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-decorator) with which you can transform and ingest data from batch data sources into your feature groups. Read through the description and examples in the [Developer Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-feature-processing.html).

Refer to a more detailed example of feature processor in [feature store feature processor](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-featurestore/feature_store_feature_processor.ipynb) notebook.

---

## Continue with the workshop flow
After finishing this lab, you can continue with the step 4 and 5 [notebooks](04-sagemaker-project.ipynb) or go directly to the step 6 [notebook](06-monitoring.ipynb):

- **Step 4 and 5 notebooks**: Use SageMaker Projects to implement CI/CD automation pipelines for model build and deployment

- **Step 6 notebooks**: Data and model quality monitoring

## Further development ideas for your real-world projects
- Add [data quality check](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-quality-check) and [model explainability](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-clarify-check) steps. Add model metrics calculated by [SageMaker Clarify](https://docs.aws.amazon.com/sagemaker/latest/dg/clarify-configure-processing-jobs.html) to the model metadata in the model registry
- Add event-driven launching of the ML pipeline as soon as a new dataset is uploaded to an Amazon S3 bucket. You can use [Amazon EventBridge integeration](https://docs.aws.amazon.com/sagemaker/latest/dg/pipeline-eventbridge.html#pipeline-eventbridge-schedule) to implement various event-driven workflows
- Use a designated IAM execution role for the pipeline execution
- Add data encryption by using S3 bucket encryption and AWS KMS keys for container EBS volume encryption

## Additional resources
- [Automate Machine Learning Workflows](https://aws.amazon.com/getting-started/hands-on/machine-learning-tutorial-mlops-automate-ml-workflows/)
- [Amazon SageMaker Feature Store workshop](https://github.com/aws-samples/amazon-sagemaker-feature-store-end-to-end-workshop)
- [Amazon SageMaker Model Building Pipeline](https://github.com/aws/sagemaker-python-sdk/blob/master/doc/amazon_sagemaker_model_building_pipeline.rst)
- [MLOPs With SageMaker Pipelines Step Decorator](https://towardsaws.com/mlops-with-sagemaker-pipelines-step-decorator-bb63fce88846)

# Shutdown kernel

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>