# Load Data
We already have the raw data in S3 from our previous pipeline renditions.

# Define a Pipeline
In addition to previous pipeline additions, we will now incorporate hyperparameter optimisation, which essentially runs multiple training jobs to find the set of hyperparameters that yield a model with the best results (e.g. highest validation accuracy).

The main changes from previous pipelines are summarised below:
* Training step has been replaced with a Tuning step. this step is what calls the estimator.fit method
* Hyperparameters for the Decision Tree Classifier are no longer baked into the code as we will be running a hyperparameter tuning job to explore best possible hyperparameter values for **tree max depth** and **min samples split**
* Out of the 10 models produced from tuning, we will choose the top performing model and obtain in-depth evaluation metrics (e.g. Confusion Matrix, Test Dataset Accuracy) for it
* The model that eventually gets registered, subject to the ConditionStep, will also be the top performing model

In [1]:
import sagemaker

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.workflow.steps import ProcessingStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput
)

from sagemaker.inputs import TrainingInput

from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.condition_step import ConditionStep

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo, ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import Join

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean
)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
# additional imports
from sagemaker.workflow.steps import TuningStep
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.functions import JsonGet
from sagemaker.tuner import (
    IntegerParameter,
    HyperparameterTuner
)

Define pipeline parameters

In [3]:
# high-level
session = sagemaker.Session()
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
bucket = 'sagemaker-fraud-detection-ml'

# pipeline parameters
min_validation_accuracy = ParameterFloat(
    name='MinValidationAccuracyToRegisterModel',
    default_value=0.85
)

Define processing step

In [4]:
processor = SKLearnProcessor(
    framework_version='1.0-1',
    role=role,
    instance_type='ml.m5.large',
    instance_count=1                          
)

preprocessing_step = ProcessingStep(
    name='FraudDetectionProcess',
    processor=processor,
    code='data_processing.py',
    inputs=[
    
    # define where the processor needs to look to find raw data 
        ProcessingInput(
            input_name='raw-data',
            source=f's3://{bucket}/raw_data.csv',
            destination='/opt/ml/processing/input/data/',
            s3_data_distribution_type='ShardedByS3Key'
        )
    ],

    # define where the processor needs to look to find processed data to upload to s3
    outputs=[
        ProcessingOutput(
            output_name='train',
            source='/opt/ml/processing/output/train',
            s3_upload_mode='EndOfJob'
        ),
        ProcessingOutput(
            output_name='validation',
            source='/opt/ml/processing/output/validation',
            s3_upload_mode='EndOfJob'
        ),
        ProcessingOutput(
            output_name='test',
            source='/opt/ml/processing/output/test',
            s3_upload_mode='EndOfJob'
        ),
    ],
    job_arguments=['--input-data', '/opt/ml/processing/input/data/'],
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


Define processed input data

In [5]:
# training inputs are dynamically linked to s3 outputs from the preprocessing step

s3_input_train = TrainingInput(
    s3_data=Join(
        on='/', 
        values=[
            preprocessing_step.properties.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri,
            'train.csv'
        ]
    ),
    content_type='csv'
)
s3_input_validate = TrainingInput(
    s3_data=Join(
        on='/',
        values=[
            preprocessing_step.properties.ProcessingOutputConfig.Outputs['validation'].S3Output.S3Uri,
            'validation.csv'
        ]
    ),
    content_type='csv'
)

Define estimator & tuning step

In [6]:
from sagemaker.sklearn.estimator import SKLearn

# Regex tells Sagemaker what to look out for in the training logs.
# whatever this matches, this will be reported as metrics in the Sagemaker Pipeline
metric_definitions=metric_definitions = [
     {'Name': 'training:accuracy', 'Regex': 'train_acc: ([0-9.]+)'},
     {'Name': 'validation:accuracy', 'Regex': 'val_acc: ([0-9.]+)'},
]

estimator = SKLearn(
    entry_point='train.py',
    source_dir='.',
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    py_version='py3',
    framework_version='1.0-1',
    metric_definitions=metric_definitions,
    input_mode='File',
    sagemaker_session=pipeline_session
)

In [15]:
hyperparameter_ranges = {
    'max-depth': IntegerParameter(1, 10, scaling_type='Auto'),
    'min-samples-split': IntegerParameter(1, 10, scaling_type='Auto'),
}

hyperparameter_tuner = HyperparameterTuner(
    estimator=estimator,
    objective_metric_name='validation:accuracy',
    hyperparameter_ranges=hyperparameter_ranges,
    metric_definitions=metric_definitions,
    strategy='Bayesian',
    max_jobs=4,
    max_parallel_jobs=2,
    objective_type='Maximize',
)

tuning_step = TuningStep(
    name='TuningStep',
    step_args=hyperparameter_tuner.fit(
        inputs={
            'train': s3_input_train,
            'validation': s3_input_validate
        }
    )

)

Evaluation Step for the best performing model

In [16]:
evaluation_processor = SKLearnProcessor(
    framework_version='1.0-1',
    role=role,
    instance_type='ml.m5.large',
    instance_count=1                          
)

evaluation_report = PropertyFile(
    name='BestModelEvaluationReport',
    output_name='evaluation',
    path='evaluation.json',
)

evaluation_step = ProcessingStep(
    name='EvaluateTopModel',
    processor=processor,
    code='evaluation.py',
    inputs=[
        ProcessingInput(
            source=tuning_step.get_top_model_s3_uri(
                top_k=0, # get the best model
                s3_bucket=session.default_bucket()
            ),
            destination='/opt/ml/processing/model',
        ),
        ProcessingInput(
            source=preprocessing_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
            destination='/opt/ml/processing/test',
        ),
    ],
    outputs=[
        ProcessingOutput(output_name='evaluation', source='/opt/ml/processing/evaluation'),
    ],
    property_files=[evaluation_report]
)

# convert evaluation metrics to a ModelMetrics instance so we can attach it to the model register step
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(on='/', values=[
            evaluation_step.properties.ProcessingOutputConfig.Outputs['evaluation'].S3Output.S3Uri,
            'evaluation.json'
        ]),
        content_type='application/json',
    )
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


Register model step (if conditional step is a success)

In [17]:
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep

model = Model(
    image_uri=estimator.image_uri,
    model_data=tuning_step.get_top_model_s3_uri(
        top_k=0,
        s3_bucket=session.default_bucket()
    ),
    entry_point = estimator.entry_point, # this is necessary because the train.py entry_point contains functions to be executed during inference
    sagemaker_session=pipeline_session,
    role=role,
)

register_args = model.register(
    content_types=['text/csv'],
    response_types=['text/csv'],
    inference_instances=['ml.m5.large'],
    transform_instances=['ml.m5.large'],
    model_package_group_name='fraud-detection-model-group',
    approval_status='PendingManualApproval',
    model_metrics = model_metrics
)

register_model_step = ModelStep(
    name='FraudDetectionRegisterModel',
    step_args=register_args,
)

Fail the pipeline in the event the validation accuracy is too low

In [18]:
fail_step = FailStep(
    name='FailStep',
    error_message=Join(on=' ', values=['Pipeline was failed due to log loss < ', min_validation_accuracy]),
)

In [19]:
condition_step = ConditionStep(
    name='ModelRegistrationConditionStep',
    conditions = [
        ConditionGreaterThanOrEqualTo(
            left=JsonGet(
                step_name=evaluation_step.name,
                property_file=evaluation_report,
                json_path='binary_classification_metrics.accuracy.value',
            ),
            right=min_validation_accuracy
    )],
    if_steps=[register_model_step],
    else_steps=[fail_step],
)

Bring steps together to form the pipeline

In [23]:
pipeline = Pipeline(
    name='fraud-detection-model-pipeline',
    steps=[preprocessing_step, tuning_step, condition_step, evaluation_step],
    parameters=[min_validation_accuracy],
)

In [24]:
pipeline.upsert(role_arn=role)





{'PipelineArn': 'arn:aws:sagemaker:eu-north-1:263108256547:pipeline/fraud-detection-model-pipeline',
 'ResponseMetadata': {'RequestId': '73d8f4cd-0c0a-4523-a4be-d48e90b270f7',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '73d8f4cd-0c0a-4523-a4be-d48e90b270f7',
   'strict-transport-security': 'max-age=47304000; includeSubDomains',
   'x-frame-options': 'DENY',
   'content-security-policy': "frame-ancestors 'none'",
   'cache-control': 'no-cache, no-store, must-revalidate',
   'x-content-type-options': 'nosniff',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '122',
   'date': 'Sat, 25 Oct 2025 11:51:45 GMT'},
  'RetryAttempts': 0}}

In [22]:
# programitically start the pipeline
# pipeline.start(
#     execution_display_name='conditional-model-registration',
#     execution_description='Starting from the SageMaker Studio'
# )