In [8]:
import boto3
import sagemaker
import sagemaker.session


region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = "sgmpipeline-pratap"
model_package_group_name = f"AbaloneModelPackageGroupName"

In [9]:
print(region)
print(role)
print(type(default_bucket))

us-east-1
arn:aws:iam::499554417458:role/service-role/AmazonSageMakerServiceCatalogProductsUseRole
<class 'str'>


#### Download the dataset into your account's default Amazon S3 bucket.

In [10]:
!mkdir -p data
local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset.csv",
    local_path
)

base_uri = f"s3://{default_bucket}/abalone"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sgmpipeline-pratap/abalone/abalone-dataset.csv


#### Download a second dataset for batch transformation after your model is created.

In [11]:
local_path = "data/abalone-dataset-batch"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch",
    local_path
)

base_uri = f"s3://{default_bucket}/abalone"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

s3://sgmpipeline-pratap/abalone/abalone-dataset-batch


#### Define Pipeline Parameters

In [12]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)


processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)

#### Step 3: Define a Processing Step for Feature Engineering
##### This section shows how to create a processing step to prepare the data from the dataset for training.
###### To create a processing step
###### Create a directory for the processing script.

In [13]:
!mkdir -p abalone

In [14]:
%%writefile abalone/preprocessing.py
import argparse
import os
import requests
import tempfile
import numpy as np
import pandas as pd


from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Because this is a headerless CSV file, specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/abalone-dataset.csv",
        header=None, 
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)
    )
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler())
        ]
    )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore"))
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features)
        ]
    )
    
    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)
    
    X = np.concatenate((y_pre, X_pre), axis=1)
    
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])

    
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Overwriting abalone/preprocessing.py


#### Create an instance of an SKLearnProcessor to pass in to the processing step.

In [15]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
)

In [16]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="AbaloneProcess",
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="abalone/preprocessing.py",
)

#### Step 4: Define a Training step

##### This section shows how to use the SageMaker XGBoost Algorithm to train a logistic regression model on the training data output from the processing steps.

#### To define a training step

##### Specify the model path where you want to save the models from training.

In [17]:
model_path = f"s3://{default_bucket}/AbaloneTrain"

In [18]:
print(model_path)

s3://sgmpipeline-pratap/AbaloneTrain


##### Configure an estimator for the XGBoost algorithm and the input dataset. The training_instance_type is passed into the estimator. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to model_dir so that it can be hosted later. SageMaker uploads the model to Amazon S3 in the form of a model.tar.gz at the end of the training job.

In [19]:
from sagemaker.estimator import Estimator


image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0
)

##### Create a TrainingStep using the estimator instance and properties of the ProcessingStep. In particular, pass in the S3Uri of the "train" and "validation" output channel to the TrainingStep.

In [20]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="AbaloneTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
)

#### Step 5: Define a Processing Step for Model Evaluation

##### Create a file in the /abalone directory named evaluation.py. This script is used in a processing step to perform model evaluation. It takes a trained model and the test dataset as input, then produces a JSON file containing classification evaluation metrics. These metrics include a precision, recall, and F1 score for each label, and accuracy and ROC curve for the model.

In [21]:
%%writefile abalone/evaluation.py
import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost


from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    
    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    
    X_test = xgboost.DMatrix(df.values)
    
    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {
                "value": mse,
                "standard_deviation": std
            },
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting abalone/evaluation.py


##### Create an instance of a ScriptProcessor that is used to create a ProcessingStep.

In [22]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-abalone-eval",
    role=role,
)

##### Create a ProcessingStep using the processor instance, the input and output channels, and the  evaluation.py script. In particular, pass in the S3ModelArtifacts property from the step_train training step, as well as the S3Uri of the "test" output channel of the step_process processing step. This is very similar to a processor instance's run method in the SageMaker Python SDK. 

In [23]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)
step_eval = ProcessingStep(
    name="AbaloneEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="abalone/evaluation.py",
    property_files=[evaluation_report],
)

#### Step 6: Define a CreateModelStep for Batch Transformation

##### To define a CreateModelStep for batch transformation

###### Create a SageMaker model. Pass in the S3ModelArtifacts property from the step_train training step.

In [24]:
from sagemaker.model import Model


model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)

###### Define the model input for your SageMaker model.

In [25]:
from sagemaker.inputs import CreateModelInput


inputs = CreateModelInput(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium",
)

###### Create your CreateModelStep using the CreateModelInput and SageMaker model instance you defined.

In [26]:
from sagemaker.workflow.steps import CreateModelStep


step_create_model = CreateModelStep(
    name="AbaloneCreateModel",
    model=model,
    inputs=inputs,
)

#### Step 7: Define a TransformStep to Perform Batch Transformation

##### To define a TransformStep to perform batch transformation

###### Create a transformer instance with the appropriate compute instance type, instance count, and desired output Amazon S3 bucket URI. Pass in the ModelName property from the step_create_model CreateModel step.

In [27]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform"
)

###### Create a TransformStep using the transformer instance you defined and the batch_data pipeline parameter.

In [28]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="AbaloneTransform",
    transformer=transformer,
    inputs=TransformInput(data=batch_data)
)

#### Step 8: Define a RegisterModel Step to Create a Model Package
##### To define a RegisterModel step to create a model package
###### Construct a RegisterModel step using the estimator instance you used for the training step . Pass in the S3ModelArtifacts property from the step_train training step and specify a ModelPackageGroup. SageMaker Pipelines creates this ModelPackageGroup for you.

In [29]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)
step_register = RegisterModel(
    name="AbaloneRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

#### Step 9: Define a Condition Step to Verify Model Accuracy
##### To define a condition step to verify model accuracy
###### Define a ConditionLessThanOrEqualTo condition using the accuracy value found in the output of the model evaluation processing step, step_eval. Get this output using the property file you indexed in the processing step and the respective JSONPath of the mean squared error value, "mse".

In [30]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=6.0
)

The class JsonGet has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


###### Construct a ConditionStep. Pass the ConditionEquals condition in, then set the model package registration and batch transformation steps as the next steps if the condition passes.

In [31]:
step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[], 
)

#### Step 10: Create a pipeline

##### To create a pipeline
##### 1.Define the following for your pipeline: name, parameters, and steps. Names must be unique within an (account, region) pair.

In [32]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

In [33]:
import json

json.loads(pipeline.definition())

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sgmpipeline-pratap/abalone/abalone-dataset.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sgmpipeline-pratap/abalone/abalone-dataset-batch'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AbaloneProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'Insta

In [34]:
print(step_eval)

ProcessingStep(name='AbaloneEval', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)


In [35]:
pipeline.upsert(role_arn=role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:499554417458:pipeline/abalonepipeline',
 'ResponseMetadata': {'RequestId': '0b395949-2807-4d6e-96c8-bf6c9e546772',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '0b395949-2807-4d6e-96c8-bf6c9e546772',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Mon, 27 Dec 2021 11:25:22 GMT'},
  'RetryAttempts': 0}}

In [36]:
execution = pipeline.start()

In [37]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:499554417458:pipeline/abalonepipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:499554417458:pipeline/abalonepipeline/execution/jwvznf1ht9t5',
 'PipelineExecutionDisplayName': 'execution-1640604329486',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'abalonepipeline',
  'TrialName': 'jwvznf1ht9t5'},
 'CreationTime': datetime.datetime(2021, 12, 27, 11, 25, 29, 406000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2021, 12, 27, 11, 25, 29, 406000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': 'c0f365b2-3970-46ed-87c2-c48a6bcecc76',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'c0f365b2-3970-46ed-87c2-c48a6bcecc76',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '486',
   'date': 'Mon, 27 Dec 2021 11:25:30 GMT'},
  'RetryAttempts': 0}}

In [38]:
execution.wait()

In [39]:
execution.list_steps()

[{'StepName': 'AbaloneTransform',
  'StartTime': datetime.datetime(2021, 12, 27, 11, 37, 32, 455000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 12, 27, 11, 42, 13, 570000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-1:499554417458:transform-job/pipelines-jwvznf1ht9t5-abalonetransform-yojtr65yjz'}}},
 {'StepName': 'AbaloneCreateModel',
  'StartTime': datetime.datetime(2021, 12, 27, 11, 37, 30, 524000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 12, 27, 11, 37, 31, 660000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:499554417458:model/pipelines-jwvznf1ht9t5-abalonecreatemodel-pie2wbelj5'}}},
 {'StepName': 'AbaloneRegisterModel',
  'StartTime': datetime.datetime(2021, 12, 27, 11, 37, 30, 524000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 12, 27, 11, 37, 31, 597000, tzinfo=tzloc

In [79]:
!cd /opt/ml/processing/train

/bin/sh: line 0: cd: /opt/ml/processing/train: No such file or directory
