In [None]:
import sagemaker
session = sagemaker.Session()
bucket = session.default_bucket()
role = sagemaker.get_execution_role()

In [None]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession

pipeline_session = PipelineSession()

from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.steps import ProcessingStep, TuningStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)

from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import Join, JsonGet

from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput
)

from sagemaker.tuner import (
    IntegerParameter,
    HyperparameterTuner
)

In [None]:
!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/synthetic/churn.txt s3://{bucket}/churn_raw_data.csv

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean
)

In [None]:
tree_max_depth_parameter = ParameterInteger(
    name='TreeMaxDepth',
    default_value=5
)
churn_validation_accuracy = ParameterFloat(
    name='ChurnValidationAccuracy',
    default_value=0.85
)


In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    framework_version='1.0-1',
    role=role,
    instance_type='ml.m5.large',
    instance_count=1                          
)

preprocessing_step = ProcessingStep(
    name='CustomChurnProcessor',
    processor=processor,
    code='data_processing.py',
    inputs=[
        ProcessingInput(
            input_name='raw-data',
            source=f's3://{bucket}/churn_raw_data.csv',
            destination='/opt/ml/processing/input/data/',
            s3_data_distribution_type='ShardedByS3Key'
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name='train',
            source='/opt/ml/processing/output/train',
            s3_upload_mode='EndOfJob'
        ),
        ProcessingOutput(
            output_name='validation',
            source='/opt/ml/processing/output/validation',
            s3_upload_mode='EndOfJob'
        ),
        ProcessingOutput(
            output_name='test',
            source='/opt/ml/processing/output/test',
            s3_upload_mode='EndOfJob'
        ),
    ],
    job_arguments=['--input-data', '/opt/ml/processing/input/data/'],
)

In [None]:
from sagemaker.inputs import TrainingInput

s3_input_train = TrainingInput(
    s3_data=Join(
        on='/', 
        values=[
            preprocessing_step.properties.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri,
            'train.csv'
        ]
    ),
    content_type='csv'
)
s3_input_validate = TrainingInput(
    s3_data=Join(
        on='/',
        values=[
            preprocessing_step.properties.ProcessingOutputConfig.Outputs['validation'].S3Output.S3Uri,
            'validation.csv'
        ]
    ),
    content_type='csv'
)

In [None]:
from sagemaker.sklearn.estimator import SKLearn
metric_definitions=metric_definitions = [
     {'Name': 'training:accuracy', 'Regex': 'train_acc: ([0-9.]+)'},
     {'Name': 'validation:accuracy', 'Regex': 'val_acc: ([0-9.]+)'},
]

estimator = SKLearn(
    entry_point='train.py',
    source_dir='.',
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    py_version='py3',
    framework_version='1.0-1',
    metric_definitions=metric_definitions,
    input_mode='File',
    sagemaker_session=pipeline_session
)

In [None]:
hyperparameter_ranges = {
    'max-depth': IntegerParameter(1, 10, scaling_type='Auto'),
    'min-samples-split': IntegerParameter(1, 10, scaling_type='Auto'),
}

hyperparameter_tuner = HyperparameterTuner(
    estimator=estimator,
    objective_metric_name='validation:accuracy',
    hyperparameter_ranges=hyperparameter_ranges,
    metric_definitions=metric_definitions,
    strategy='Bayesian',
    max_jobs=10,
    max_parallel_jobs=2,
    objective_type='Maximize',
)

churn_tuning_step = TuningStep(
    name='ChurnTuningStep',
    step_args=hyperparameter_tuner.fit(
        inputs={
            'train': s3_input_train,
            'validation': s3_input_validate
        }
    )

)


In [None]:
evaluation_processor = SKLearnProcessor(
    framework_version='1.0-1',
    role=role,
    instance_type='ml.m5.large',
    instance_count=1                          
)

evaluation_report = PropertyFile(
    name='BestChurnModelEvaluationReport',
    output_name='evaluation',
    path='evaluation.json',
)

evaluation_step = ProcessingStep(
    name='EvaluateTopChurnModel',
    processor=processor,
    code='evaluation.py',
    inputs=[
        ProcessingInput(
            source=churn_tuning_step.get_top_model_s3_uri(
                top_k=0,
                s3_bucket=bucket
            ),
            destination='/opt/ml/processing/model',
        ),
        ProcessingInput(
            source=preprocessing_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
            destination='/opt/ml/processing/test',
        ),
    ],
    outputs=[
        ProcessingOutput(output_name='evaluation', source='/opt/ml/processing/evaluation'),
    ],
    property_files=[evaluation_report]
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(on='/', values=[
            evaluation_step.properties.ProcessingOutputConfig.Outputs['evaluation'].S3Output.S3Uri,
            'evaluation.json'
        ]),
        content_type='application/json',
    )
)

In [None]:
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep

model = Model(
    image_uri=estimator.image_uri,
    model_data=churn_tuning_step.get_top_model_s3_uri(
        top_k=0,
        s3_bucket=bucket
    ),
    entry_point=estimator.entry_point,
    sagemaker_session=pipeline_session,
    role=role,
)

register_args = model.register(
    content_types=['text/csv'],
    response_types=['text/csv'],
    inference_instances=['ml.m5.large'],
    transform_instances=['ml.m5.large'],
    model_package_group_name='churn-model-group',
    approval_status='PendingManualApproval',
    model_metrics=model_metrics
)

register_model_step = ModelStep(
    name='ChurnRegisterModel',
    step_args=register_args
)

In [None]:
fail_step = FailStep(
    name='ChurnFailStep',
    error_message=Join(on=' ', values=['Failed a pipeline due to accuracy < ', churn_validation_accuracy]),
)

In [None]:
condition_step = ConditionStep(
    name='ModelRegistrationConditionStep',
    conditions = [
        ConditionGreaterThanOrEqualTo(
            left=JsonGet(
                step_name=evaluation_step.name,
                property_file=evaluation_report,
                json_path='binary_classification_metrics.accuracy.value',
            ),
            right=churn_validation_accuracy
    )],
    if_steps=[register_model_step],
    else_steps=[fail_step],
)

In [None]:
pipeline = Pipeline(
    name='churn-prediction-model-pipeline',
    steps=[preprocessing_step, churn_tuning_step, condition_step, evaluation_step],
    parameters=[tree_max_depth_parameter, churn_validation_accuracy],
)

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
pipeline.start(
    execution_display_name='pipeline-with-model-evaluation',
    execution_description='Starting from the SageMaker Studio'
)