In [2]:
import boto3
import sagemaker
import sagemaker.session


region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"AbaloneModelPackageGroupName"

In [3]:
!mkdir -p data
local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset.csv",
    local_path
)

base_uri = f"s3://{default_bucket}/abalone"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-us-east-1-053718699370/abalone/abalone-dataset.csv


In [4]:
local_path = "data/abalone-dataset-batch.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch",
    local_path
)

base_uri = f"s3://{default_bucket}/abalone"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

s3://sagemaker-us-east-1-053718699370/abalone/abalone-dataset-batch.csv


In [5]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)

In [9]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.t3.medium",
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
)

In [10]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="AbaloneProcess",
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="abalone/preprocessing.py",
)

In [15]:
model_path = f"s3://{default_bucket}/AbaloneTrain"

In [102]:
from sagemaker.estimator import Estimator


image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m4.xlarge"
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type="ml.m4.xlarge",
    instance_count=1,
    output_path=model_path,
    role=role,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0
)

In [103]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="AbaloneTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
)

In [104]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m4.xlarge",
    instance_count=1,
    base_job_name="script-abalone-eval",
    role=role,
)

In [105]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

step_eval = ProcessingStep(
    name="AbaloneEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="abalone/evaluation.py",
    property_files=[evaluation_report],
)

---
## From here the implementations drifts from the tutorial as the SDK changed

# Step 6': Define a CreateModelStep for Batch Transformation

Get the model from the step_train. The model files will be stored in the direction pointed by ```step_train.properties.ModelArtifacts.S3ModelArtifacts```

In [106]:
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.model import Model


model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=PipelineSession(),
    role=role,
)

Use PipelineModel with the previous model. I don't understand why yet but this is the new registry 

In [107]:
#from sagemaker.workflow.model_step import ModelStep
from sagemaker import PipelineModel

step_model_create = ModelStep(
   name="AbaloneCreateModel",
   step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium")
)

pipeline_model = PipelineModel(
   models=[model],
   role=role,sagemaker_session=model.sagemaker_session,
)

# Step 7': Define a TransformStep to Perform Batch Transformation

In [108]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_model_create.properties.ModelName,
    instance_type="ml.m5.large",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform"
)

In [109]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="AbaloneTransform",
    transformer=transformer,
    inputs=TransformInput(data=batch_data)
)

# Step 8': Define a Condition Step to Verify Model Accuracy

In [110]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

# Step 9': Define a RegisterModel Step to Create a Model Package

Using the solution using the [ModelStep](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-model).

In [111]:
register_model_step_args = pipeline_model.register(
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.m4.xlarge"],
    transform_instances=["ml.m4.xlarge"],
    model_package_group_name='sipgroup',
    model_metrics=model_metrics,
)

In [112]:
step_model_registration = ModelStep(
   name="AbaloneRegisterModel",
   step_args=register_model_step_args,
)

# Step 10: Define a Condition Step to Verify Model Accuracy

In [113]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=6.0
)

In [114]:
step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_model_registration, step_model_create, step_transform],
    else_steps=[], 
)

# Step 11: Create Pipeline and hope for the best

In [115]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        model_approval_status,
        input_data,
        batch_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

In [100]:
import json

json.loads(pipeline.definition())

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-053718699370/abalone/abalone-dataset.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-053718699370/abalone/abalone-dataset-batch.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AbaloneProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.t3.medium',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemak

# Step 12: Running the Pipeline 

## Start the Pipeline

In [116]:
pipeline.upsert(role_arn=role)

Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:053718699370:pipeline/AbalonePipeline',
 'ResponseMetadata': {'RequestId': '96717bb4-968e-41d3-9674-37a2485ccba7',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '96717bb4-968e-41d3-9674-37a2485ccba7',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Sun, 02 Jul 2023 22:24:21 GMT'},
  'RetryAttempts': 0}}

In [117]:
execution = pipeline.start()

In [118]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:053718699370:pipeline/AbalonePipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:053718699370:pipeline/AbalonePipeline/execution/es5vibvdddx2',
 'PipelineExecutionDisplayName': 'execution-1688336753612',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'abalonepipeline',
  'TrialName': 'es5vibvdddx2'},
 'CreationTime': datetime.datetime(2023, 7, 2, 22, 25, 53, 516000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 7, 2, 22, 25, 53, 516000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:053718699370:user-profile/d-f2blqzae5sen/maurygreen-1687361455364',
  'UserProfileName': 'maurygreen-1687361455364',
  'DomainId': 'd-f2blqzae5sen'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:053718699370:user-profile/d-f2blqzae5sen/maurygreen-1687361455364',
  'UserProfileName': 'maurygreen-1687361455364',
  'DomainId': 'd-f2blqzae5

In [119]:
execution.wait()

In [120]:
execution.list_steps()

[{'StepName': 'AbaloneTransform',
  'StartTime': datetime.datetime(2023, 7, 2, 22, 46, 49, 193000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 7, 2, 22, 51, 39, 706000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-1:053718699370:transform-job/pipelines-es5vibvdddx2-AbaloneTransform-jT9lSHaaT5'}},
  'AttemptCount': 0},
 {'StepName': 'AbaloneRegisterModel-RegisterModel',
  'StartTime': datetime.datetime(2023, 7, 2, 22, 46, 47, 474000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 7, 2, 22, 46, 48, 635000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:053718699370:model-package/sipgroup/1'}},
  'AttemptCount': 0},
 {'StepName': 'AbaloneCreateModel-CreateModel',
  'StartTime': datetime.datetime(2023, 7, 2, 22, 46, 47, 474000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 7, 2, 22, 46, 48, 817000, tzinfo=tzlocal()),
  'St