In [1]:
import boto3
import sagemaker
import sagemaker.session 
from sagemaker.workflow.pipeline_context import PipelineSession

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session() 
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket() 
pipeline_session = PipelineSession()
model_package_group_name = "AbalonePackageGroupDemo"

In [2]:
role

'arn:aws:iam::866824485776:role/service-role/AmazonSageMaker-ExecutionRole-20240725T121088'

In [3]:
# Pipeline
# Preprocessing
# Training
# evaluation
# Conditional Evaluation
# Model registration


# Features of dataset
```
length – The longest shell measurement of the abalone.

diameter – The diameter of the abalone perpendicular to its length.

height – The height of the abalone with meat in the shell.

whole_weight – The weight of the whole abalone.

shucked_weight – The weight of the meat removed from the abalone.

viscera_weight – The weight of the abalone viscera after bleeding.

shell_weight – The weight of the abalone shell after meat removal and drying.

sex – The sex of the abalone. One of 'M', 'F', or 'I', where 'I' is an infant abalone.

rings – The number of rings in the abalone shell.```

In [4]:
# Download the dataset

!mkdir -p data

In [6]:
local_path = "data/abalone-dataset.csv" 

s3 = boto3.resource("s3") 
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset.csv",
    local_path)

In [11]:
base_uri = f"s3://{default_bucket}/abalone"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path = local_path,
    desired_s3_uri=base_uri)
print(input_data_uri) 

s3://sagemaker-us-east-1-866824485776/abalone/abalone-dataset-batch.csv


In [12]:
local_path = "data/abalone-dataset-batch.csv" 

s3 = boto3.resource("s3") 
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch",
    local_path)

base_uri = f"s3://{default_bucket}/abalone"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path = local_path,
    desired_s3_uri=base_uri)
print(input_data_uri) 

s3://sagemaker-us-east-1-866824485776/abalone/abalone-dataset-batch.csv


In [13]:
print(batch_data_uri)

s3://sagemaker-us-east-1-866824485776/abalone/abalone-dataset-batch.csv


# Define Pipeline Parameters

In [10]:
from sagemaker.workflow.parameters import ParameterInteger,ParameterString

In [14]:
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",default_value=1)

model_approval_status = ParameterString(
    name="ModelApprovalStatus",default_value="PendingManualApproval")

input_data = ParameterString(
    name="InputData",default_value=input_data_uri)

batch_data = ParameterString(
    name="BatchData",default_value=batch_data_uri)

In [15]:
!mkdir abalone

In [16]:
%%writefile abalone/preprocessing.py
import argparse
import os
import requests
import tempfile
import numpy as np
import pandas as pd


from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Because this is a headerless CSV file, specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/abalone-dataset.csv",
        header=None, 
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)
    )
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler())
        ]
    )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore"))
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features)
        ]
    )
    
    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)
    
    X = np.concatenate((y_pre, X_pre), axis=1)
    
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])

    
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Writing abalone/preprocessing.py


In [17]:
import numpy as np 
X = np.random.randn(100,10) 

In [18]:
train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])

In [19]:
train.shape, validation.shape, test.shape

((70, 10), (15, 10), (15, 10))

In [20]:
train, validation, test = np.split(X, [int(.7*len(X)), int(.80*len(X))])

In [21]:
train.shape, validation.shape, test.shape

((70, 10), (10, 10), (20, 10))

In [22]:
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-pipeline-abalone",
    sagemaker_session=pipeline_session, 
    role=role)

In [23]:
from sagemaker.processing import ProcessingInput, ProcessingOutput 
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs = [ProcessingInput(source=input_data, destination = "/opt/ml/processing/input")],
    outputs = [
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="abalone/preprocessing.py")

step_process = ProcessingStep(
    name="AbaloneProcessing",
    step_args=processor_args)



In [24]:
# Define the Training step
model_path = f"s3://{default_bucket}/AbaloneTrain"

In [28]:
from sagemaker.estimator import Estimator 

image_uri = sagemaker.image_uris.retrieve(framework="xgboost", region=region, 
                                          version="latest", py_version="py310", 
                                          instance_type= "ml.m5.xlarge")

xgb_train = Estimator(image_uri=image_uri,
                      instance_type="ml.m5.xlarge",
                      instance_count=1, 
                      output_path = model_path, 
                      sagemaker_session=pipeline_session,
                      role=role )

xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0
)

In [29]:
from sagemaker.inputs import TrainingInput 
from sagemaker.workflow.steps import TrainingStep

train_args = xgb_train.fit(
    inputs = {
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv"),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv"),
    })

step_train = TrainingStep(
    name="AbaloneDemoTrain",
    step_args=train_args)



In [30]:
%%writefile abalone/evaluation.py
import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost


from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    
    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    
    X_test = xgboost.DMatrix(df.values)
    
    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {
                "value": mse,
                "standard_deviation": std
            },
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Writing abalone/evaluation.py


In [31]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-abalone-eval",
    sagemaker_session=pipeline_session,
    role=role,
)


In [32]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

eval_args = script_eval.run(
        inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="abalone/evaluation.py",
)

step_eval = ProcessingStep(
    name="AbaloneEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

In [33]:
# Create the Model
from sagemaker.model import Model 

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

In [34]:
from sagemaker.inputs import CreateModelInput


inputs = CreateModelInput(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium",
)

In [35]:
from sagemaker.workflow.steps import CreateModelStep


step_create_model = CreateModelStep(
    name="AbaloneCreateModel",
    model=model,
    inputs=inputs,
)

In [36]:
# Batch Transformation

from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform"
)

In [37]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="AbaloneTransform",
    transformer=transformer,
    inputs=TransformInput(data=batch_data)
)

In [38]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)
step_register = RegisterModel(
    name="AbaloneRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)



In [39]:
# condition step

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=6.0
)

step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[], 
)

In [40]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        model_approval_status,
        input_data,
        batch_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

In [41]:
import json 
json.loads(pipeline.definition())



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-866824485776/abalone/abalone-dataset-batch.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-866824485776/abalone/abalone-dataset-batch.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AbaloneProcessing',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.co

In [42]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:866824485776:pipeline/AbalonePipeline',
 'ResponseMetadata': {'RequestId': '77a6e1c4-ca0c-4440-bd15-816329fc2e3c',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '77a6e1c4-ca0c-4440-bd15-816329fc2e3c',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Thu, 08 Aug 2024 03:20:34 GMT'},
  'RetryAttempts': 0}}

In [43]:
execution = pipeline.start() 

In [44]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:866824485776:pipeline/AbalonePipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:866824485776:pipeline/AbalonePipeline/execution/owmdx9309x6v',
 'PipelineExecutionDisplayName': 'execution-1723087266914',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'abalonepipeline',
  'TrialName': 'owmdx9309x6v'},
 'CreationTime': datetime.datetime(2024, 8, 8, 3, 21, 6, 856000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 8, 8, 3, 21, 6, 856000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:866824485776:user-profile/d-cjg9gqfsnguv/default-1722760116629',
  'UserProfileName': 'default-1722760116629',
  'DomainId': 'd-cjg9gqfsnguv',
  'IamIdentity': {'Arn': 'arn:aws:sts::866824485776:assumed-role/AmazonSageMaker-ExecutionRole-20240725T121088/SageMaker',
   'PrincipalId': 'AROA4TUWD7OIN6HE7MKBM:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:

In [None]:
execution.wait() # wait till the execution to complete

In [None]:
execution.list_steps()