In [None]:
import boto3
import sagemaker
import sagemaker.session


In [None]:
region = boto3.Session().region_name
session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
bucket = session.default_bucket()
model_package_group_name = f"CreditRiskModelPackageGroupName"
prefix = 'sagemaker/credit-xgboost'

In [None]:
# Upload the raw dataset
input_data_uri = session.upload_data(path='dataset/UCI_Credit_Card.csv', key_prefix=prefix+'/data')
print('Data set uploaded to ', input_data_uri)

### Pipeline input parameters

Pipelines can be initiated with default parameters, but also injected when calling the pipeline.start() method. 


In [None]:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.large"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)


## Pipeline Step: Pre process data (step_process)
In the first step, we create an sklearn processor and pre process the data

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(
    framework_version="0.20.0",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="credit-processing-job"
)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep, CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="T30m")


step_process = ProcessingStep(
    name="CreditProcess",
    processor=sklearn_processor,
    cache_config=cache_config,
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test")
    ],
    code="preprocessing.py",
)

## Pipeline Step: Train a model (step_train)
In the second step, we use the train and validation output from the precious processing step.

We retrieve the XGBoost container, create an XGBoost estimator, specify hyper parameters, and create the step.

In [None]:
from sagemaker.estimator import Estimator

model_path = f"s3://{bucket}/CreditTrain"

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.2-2",
    py_version="py3",
    instance_type=training_instance_type,
)

xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
)

xgb_train.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        objective='binary:logistic',
                        num_round=25)

#### Create training step
Note how the input to the training job directly references the output of the previous processing step

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
    name="CreditTrain",
    estimator=xgb_train,
    cache_config=cache_config,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
)

## Pipeline Step: Evaluate model (script_eval)
To evaluate the model we just trained, we need to write an evalutation script that we run in a processing job

In [None]:
%%writefile evaluation.py
import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost


from sklearn.metrics import accuracy_score

if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    
    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    
    X_test = xgboost.DMatrix(df.values)
    
    #predictions = model.predict(X_test)
    predictions = model.predict(X_test)

    #mse = mean_squared_error(y_test, predictions)
    accuracy = accuracy_score(y_test, np.round(predictions))
    print('Accuracy: ', accuracy)
    
    #std = np.std(y_test - predictions)
    report_dict = {
        "classification_metrics": {
            "accuracy": {
                "value": accuracy,
            },
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Using the evaluation script, we create a processor object

In [None]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-credit-eval",
    role=role,
)

Then we create the processing step

In [None]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

step_eval = ProcessingStep(
    name="CreditEval",
    processor=script_eval,
    cache_config=cache_config,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="evaluation.py",
    property_files=[evaluation_report],
)

## Pipeline Step: Create model (step_create_model)

I'm not sure what this step does...?

In [None]:
from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=session,
    role=role,
)

inputs = CreateModelInput(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium",
)

step_create_model = CreateModelStep(
    name="CreditCreateModel",
    model=model,
    inputs=inputs,
)

## Pipeline Step: Register model (step_register)


In [None]:
step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]

In [None]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)
step_register = RegisterModel(
    name="CreditRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge", "ml.m5.large"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

## Pipeline Condition Step: Meets accuracy requirements? (cond_gte)


In [None]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)


cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report,
        json_path="classification_metrics.accuracy.value"
    ),
    right=0.7
)

In [None]:
step_cond = ConditionStep(
    name="CreditAccCond",
    conditions=[cond_gte],
    if_steps=[step_register, step_create_model],
    else_steps=[], 
)

## Pipeline Creation: Orchestrate all steps

In [None]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"CreditPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

#### inspect pipeline definition

In [None]:
import json

json.loads(pipeline.definition())

In [None]:
# Submit pipline
pipeline.upsert(role_arn=role)

In [None]:
# Execute pipeline
execution = pipeline.start()

In [None]:
# Describe execution details
execution.describe()

In [None]:
# Wait for pipeline to finish
execution.wait()

### Run Pipeline again, but with explicit param
First run creates model in model registry, but with "pending approval". Since we're caching all steps, start a new execution but pass in Approved

In [None]:
execution = pipeline.start(
    parameters=dict(
        ModelApprovalStatus="Approved",
        input
    )
)