### Import Libraries

In [None]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.estimator import Estimator
from sagemaker.xgboost import XGBoostPredictor
from sagemaker.workflow.steps import (ProcessingStep, TrainingInput, TuningStep, PropertyFile)
from sagemaker.processing import (ProcessingInput, ProcessingOutput, ScriptProcessor)
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.model_metrics import (MetricsSource, ModelMetrics)
from sagemaker.workflow.functions import (Join, JsonGet)
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession, LocalPipelineSession
from sagemaker.workflow.steps import CacheConfig
from sagemaker.tuner import (ContinuousParameter, HyperparameterTuner)
from time import strftime, gmtime
import sagemaker
import json

### Initialize Variables and Constants

In [None]:
PROJECT = "loan-classification"


sm_session = sagemaker.Session()
region = sm_session.boto_session.region_name
sm_role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
local_pipeline_session = LocalPipelineSession()
current_timestamp = strftime('%m-%d-%H-%M', gmtime())
FRAMEWORK = "xgboost"
FRAMEWORK_VERSION = "1.0-1"

BUCKET_NAME = sm_session.default_bucket()

target_s3_prefix = f"s3://{BUCKET_NAME}/{PROJECT}/{FRAMEWORK}"
train_s3_url = f"{target_s3_prefix}/train"
validation_s3_url = f"{target_s3_prefix}/validation"
test_s3_url = f"{target_s3_prefix}/test"
evaluation_s3_url = f"{target_s3_prefix}/evaluation"
input_s3_url = f"{target_s3_prefix}/input"

base_job_name_param=f"{PROJECT}-{FRAMEWORK}-job"
model_package_group_name=f"{PROJECT}-{FRAMEWORK}-model-group"
pipeline_name = f"{PROJECT}-{FRAMEWORK}-pipeline"
model_name = f"{PROJECT}-{FRAMEWORK}-model"
endpoint_config_name = f"{model_name}-endpoint-config"
endpoint_name = f"{model_name}-endpoint"

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")

In [None]:
# Set processing instance type
process_instance_type_param = ParameterString(name="ProcessingInstanceType", default_value="ml.t3.medium")

# Set process instance count
process_instance_count_param = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

# Set training instance type
train_instance_type_param = ParameterString(name="TrainingInstanceType", default_value="ml.m4.xlarge")

# Set inference instance type
inference_instance_type_param = ParameterString(name="InferenceInstanceType", default_value="ml.m4.xlarge")

# Set transform instance type
transform_instance_type_param = ParameterString(name="TransformInstanceType", default_value="ml.m4.xlarge")

# Set model approval status for the model registry
model_approval_status_param = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")

#Â Minimal threshold for model performance on the test dataset
test_score_threshold_param = ParameterFloat(name="TestScoreThreshold", default_value=0.75)

input_s3_url_param = ParameterString(name="InputData", default_value=input_s3_url)

### Pre Processing Step

In [None]:
# Upload processing script to S3
sm_session.upload_data(
    path="preprocessing.py", bucket=BUCKET_NAME, key_prefix=f"{PROJECT}/{FRAMEWORK}/scripts"
)

sklearn_processor = SKLearnProcessor(
    framework_version=FRAMEWORK_VERSION,
    instance_type=process_instance_type_param,
    instance_count=process_instance_count_param,
    base_job_name=base_job_name_param,
    sagemaker_session=pipeline_session,
    role=sm_role,
)

processor_run_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_s3_url_param, destination="/opt/ml/processing/input")
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=train_s3_url),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=validation_s3_url),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=test_s3_url)
    ],
    code=f"{target_s3_prefix}/scripts/preprocessing.py",
    arguments=[
        "--filename", "LoanApprovalStatus.csv"
    ]
)

process_step = ProcessingStep(
    name="PrePreprocess",
    step_args=processor_run_args,
    cache_config=cache_config,
)

### Training Step

In [None]:
image_uri = sagemaker.image_uris.retrieve(
    framework=FRAMEWORK,
    region=region,
    version=FRAMEWORK_VERSION,
    py_version="py3",
    instance_type="local",
)

estimator = Estimator(
    image_uri=image_uri,
    instance_type=train_instance_type_param,
    instance_count=1,
    output_path=f"{target_s3_prefix}/hpo",
    code_location=f"{target_s3_prefix}/hpo",
    base_job_name=base_job_name_param,
    sagemaker_session=pipeline_session,
    role=sm_role
)

estimator.set_hyperparameters(
    eval_metric="auc,accuracy,f1",
    objective="binary:hinge",  # Define the object metric for the training job
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

objective_metric_name = "validation:auc"

hyperparameter_ranges = {
    "alpha": ContinuousParameter(0.01, 10, scaling_type="Logarithmic"),
    "lambda": ContinuousParameter(0.01, 10, scaling_type="Logarithmic"),
}

tuner_job = HyperparameterTuner(
    estimator,
    objective_metric_name,
    hyperparameter_ranges,
    max_jobs=3,
    max_parallel_jobs=3,
    objective_type="Maximize",
)

hpo_args = tuner_job.fit(
    inputs={
        "train": TrainingInput(
            s3_data=process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=process_step.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

tuning_step = TuningStep(
    name="HyperParameterTuning",
    step_args=hpo_args,
    cache_config=cache_config
)

### Evaluation Step

In [None]:
# Upload evaluation script to S3
sm_session.upload_data(
    path="evaluation.py", bucket=BUCKET_NAME, key_prefix=f"{PROJECT}/{FRAMEWORK}/scripts"
)


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=process_instance_type_param,
    instance_count=1,
    base_job_name=base_job_name_param,
    sagemaker_session=pipeline_session,
    role=sm_role,
)

evaluation_report = PropertyFile(
    name="ModelEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

processor_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=tuning_step.get_top_model_s3_uri(top_k=0, s3_bucket=BUCKET_NAME, prefix=f"{PROJECT}/{FRAMEWORK}/hpo"),
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", destination=evaluation_s3_url),
    ],
    code=f"{target_s3_prefix}/scripts/evaluation.py",
)

# This can be extended to evaluate multiple models from the HPO step
evaluation_step = ProcessingStep(
    name="EvaluateTopModel",
    step_args=processor_args,
    property_files=[evaluation_report],
    cache_config=cache_config,
)

### Register Model Step

In [None]:
model = Model(
    image_uri=image_uri,
    model_data=tuning_step.get_top_model_s3_uri(top_k=0, s3_bucket=BUCKET_NAME, prefix=f"{PROJECT}/{FRAMEWORK}/hpo"),
    predictor_cls=XGBoostPredictor,
    sagemaker_session=pipeline_session,
    role=sm_role,
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[inference_instance_type_param],
    transform_instances=[transform_instance_type_param],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status_param,
    model_metrics=model_metrics
)

register_step = ModelStep(
    name="Latest",
    step_args=register_args
)

### Failed Step

In [None]:
fail_step = FailStep(
    name=f"ExpectationNotMet",
    error_message=Join(on=" ", values=["Execution failed due to F1 score <", test_score_threshold_param]),
)

### Condition Step

In [None]:
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path="classification_metrics.f1.value",
    ),
    right=test_score_threshold_param,
)
condition_step = ConditionStep(
    name="F1Score-GreaterThan-Threshold",
    conditions=[cond_lte],
    if_steps=[register_step],
    else_steps=[fail_step],
)

### Pipeline

In [None]:
pipeline_def_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        process_instance_type_param,
        process_instance_count_param,
        train_instance_type_param,
        inference_instance_type_param,
        transform_instance_type_param,
        model_approval_status_param,
        test_score_threshold_param,
        input_s3_url_param
    ],
    steps=[process_step, tuning_step, evaluation_step, condition_step],
    sagemaker_session=pipeline_session,
    pipeline_definition_config=pipeline_def_config
)

In [None]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sm_role)

pipeline_definition = json.loads(pipeline.describe()['PipelineDefinition'])
pipeline_definition

In [None]:
# # Execute Pipeline
# pipeline.start(parameters=dict(
#         ProcessingInstanceType='local',
#         ProcessingInstanceCount=1,
#         TrainingInstanceType='ml.m4.xlarge',
#         InferenceInstanceType='ml.m4.medium',
#         TransformInstanceType='ml.m4.xlarge',
#         ModelApprovalStatus='PendingManualApproval',
#         ModelPackageGroupName='project-model-group',
#         InputData='s3_path_of_input_data'))