In [32]:
# !pip install sagemaker --upgrade

# Set up session

## Session type (local/non-local), region, bucket default, role execution

In [1]:
import sys

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import LocalPipelineSession

# Create a `LocalPipelineSession` object so that each pipeline step will run locally
# To run this pipeline in the cloud, you must change `LocalPipelineSession()` to `PipelineSession()`
local_pipeline_session = LocalPipelineSession()

region = local_pipeline_session.boto_region_name

default_bucket = local_pipeline_session.default_bucket()
prefix = "sagemaker-pipelines-local-mode-example"

role = None  # Role is set below

try:
   role = sagemaker.get_execution_role()
except ValueError:
   iam = boto3.client('iam')
   role = iam.get_role(RoleName='AirflowSageMakerExecutionRole')['Role']['Arn']
   


sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Users/trungtran/Library/Application Support/sagemaker/config.yaml


In [17]:
# !mkdir -p data

In [30]:
# !wget https://raw.githubusercontent.com/jbrownlee/Datasets/master/abalone.csv
# !mv ./abalone.csv ./data/abalone-dataset.csv

--2024-04-08 14:25:20--  https://raw.githubusercontent.com/jbrownlee/Datasets/master/abalone.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 191872 (187K) [text/plain]
Saving to: ‘abalone.csv’


2024-04-08 14:25:20 (2.01 MB/s) - ‘abalone.csv’ saved [191872/191872]



In [27]:
# # Pull the dataset from SageMaker's public S3 bucket and upload it to your own S3 bucket

# local_path = "data/abalone-dataset.csv"

# s3 = boto3.resource("s3")
# s3.Bucket(f"sagemaker-example-files-prod-us-west-2").download_file(
#     "datasets/tabular/uci_abalone/abalone.csv", local_path
# )


## Upload Input data to default bucket

In [34]:
local_path = "data/abalone-dataset.csv"

base_uri = f"s3://{default_bucket}/{prefix}/abalone-data-set"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-ap-southeast-1-879654127886/sagemaker-pipelines-local-mode-example/abalone-data-set/abalone-dataset.csv


## Configuration the parameterize of pipeline
* Processing
* Training
* Transform
* Input data
* MSE threshold

In [5]:
from sagemaker.workflow.parameters import ParameterString, ParameterFloat

processing_instance_count = 1
training_instance_count = 1
transform_instance_count = 1
instance_type = "ml.m5.xlarge"

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)

mse_threshold = ParameterFloat(name="MseThreshold", default_value=7.0)

In [6]:
# !mkdir -p code

In [8]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "1.0-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=local_pipeline_session,
)

# Setup pipeline

## Pre-processing

In [35]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="AbaloneProcess", step_args=processor_args)



## Training step

In [11]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/{prefix}/model"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.5-1",
    instance_type=instance_type,
)

xgb_train = Estimator(
    image_uri=image_uri,
    entry_point="code/abalone.py",
    instance_type=instance_type,
    instance_count=training_instance_count,
    output_path=model_path,
    role=role,
    sagemaker_session=local_pipeline_session,
)

xgb_train.set_hyperparameters(
    objective="reg:squarederror",
    learning_rate=0.01,
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

In [12]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
    name="AbaloneTrain",
    step_args=train_args,
)

## Evaluation Step

In [14]:
from sagemaker.processing import ScriptProcessor

script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=instance_type,
    instance_count=processing_instance_count,
    base_job_name="script-abalone-eval",
    role=role,
    sagemaker_session=local_pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)

## Setup processing task

In [15]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="AbaloneEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

## Create Model step

In [17]:
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    source_dir="code",
    entry_point="inference.py",
    role=role,
    sagemaker_session=local_pipeline_session,
)

In [18]:
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="AbaloneCreateModel", step_args=model.create(instance_type=instance_type)
)

## Transformation Step

In [19]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type=instance_type,
    instance_count=transform_instance_count,
    output_path=f"s3://{default_bucket}/{prefix}/transform",
    sagemaker_session=local_pipeline_session,
)

In [20]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep
from sagemaker.workflow.functions import Join

transform_data = Join(
    on="/",
    values=[
        step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
        "test.csv",
    ],
)

transform_args = transformer.transform(transform_data, content_type="text/csv")

step_transform = TransformStep(name="AbaloneTransform", step_args=transform_args)


## Handle fail step

In [21]:
from sagemaker.workflow.fail_step import FailStep

step_fail = FailStep(
    name="AbaloneMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

In [22]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_create_model, step_transform],
    else_steps=[step_fail],
)

In [23]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"LocalModelPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=local_pipeline_session,
)

# Execute the pipeline

In [25]:
pipeline.upsert(role_arn=role)


{'PipelineArn': 'LocalModelPipeline'}

In [36]:
execution = pipeline.start()

INFO:sagemaker.local.entities:Starting execution for pipeline LocalModelPipeline. Execution ID is 13a73e55-1a1a-4186-955e-10f9252aa090
INFO:sagemaker.local.entities:Starting pipeline step: 'AbaloneProcess'
INFO:sagemaker.local.image:'Docker Compose' found using Docker CLI.
INFO:sagemaker.local.local_session:Starting processing job
INFO:sagemaker.local.image:Using the long-lived AWS credentials found in session
INFO:sagemaker.local.image:docker compose file: 
networks:
  sagemaker-local:
    name: sagemaker-local
services:
  algo-1-537dx:
    container_name: u53jkpe0te-algo-1-537dx
    entrypoint:
    - python3
    - /opt/ml/processing/input/code/preprocessing.py
    environment:
    - '[Masked]'
    - '[Masked]'
    image: 121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-scikit-learn:1.0-1-cpu-py3
    networks:
      sagemaker-local:
        aliases:
        - algo-1-537dx
    stdin_open: true
    tty: true
    volumes:
    - /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0

 Container u53jkpe0te-algo-1-537dx  Creating
 algo-1-537dx The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested 
 Container u53jkpe0te-algo-1-537dx  Created
Attaching to u53jkpe0te-algo-1-537dx
u53jkpe0te-algo-1-537dx exited with code 0
Aborting on container exit...
 Container u53jkpe0te-algo-1-537dx  Stopping
 Container u53jkpe0te-algo-1-537dx  Stopped


INFO:sagemaker.local.image:===== Job Complete =====
INFO:sagemaker.local.entities:Pipeline step 'AbaloneProcess' SUCCEEDED.
INFO:sagemaker.local.entities:Starting pipeline step: 'AbaloneTrain'
INFO:sagemaker.local.image:'Docker Compose' found using Docker CLI.
INFO:sagemaker.local.local_session:Starting training job
INFO:sagemaker.local.image:Using the long-lived AWS credentials found in session
INFO:sagemaker.local.image:docker compose file: 
networks:
  sagemaker-local:
    name: sagemaker-local
services:
  algo-1-r8i9d:
    command: train
    container_name: ivj5rbwx8e-algo-1-r8i9d
    environment:
    - '[Masked]'
    - '[Masked]'
    - '[Masked]'
    - '[Masked]'
    - '[Masked]'
    image: 121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-xgboost:1.5-1
    networks:
      sagemaker-local:
        aliases:
        - algo-1-r8i9d
    stdin_open: true
    tty: true
    volumes:
    - /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpyrgi8yqe/algo-1-r8i9d/output/d

Login Succeeded



What's Next?
  View a summary of image vulnerabilities and recommendations → docker scout quickview 121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-xgboost:1.5-1
INFO:sagemaker.local.image:image pulled: 121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-xgboost:1.5-1
INFO:sagemaker.local.image:docker command: docker compose -f /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpyrgi8yqe/docker-compose.yaml up --build --abort-on-container-exit


 Container ivj5rbwx8e-algo-1-r8i9d  Creating
 algo-1-r8i9d The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested 
 Container ivj5rbwx8e-algo-1-r8i9d  Created
Attaching to ivj5rbwx8e-algo-1-r8i9d
ivj5rbwx8e-algo-1-r8i9d  | <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
ivj5rbwx8e-algo-1-r8i9d  | <jemalloc>: (This is the expected behaviour if you are running under QEMU)
ivj5rbwx8e-algo-1-r8i9d  |   from pandas import MultiIndex, Int64Index
ivj5rbwx8e-algo-1-r8i9d  | [2024-04-08 07:44:36.192 42dc2ee5a3f7:1 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None
ivj5rbwx8e-algo-1-r8i9d  | [2024-04-08 07:44:36.299 42dc2ee5a3f7:1 INFO profiler_config_parser.py:111] Unable to find config at /opt/ml/input/config/profilerconfig.json. Profiler is disabled.
ivj5rbwx8e-algo-1-r8i9d  | [2024-04-08:07:44:38:INFO] Imported framework sagemaker_xgboost_container.training
ivj5rbwx8e-algo-1-r8i9d

INFO:root:creating /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpyrgi8yqe/artifacts/output/data
INFO:root:copying /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpyrgi8yqe/model/xgboost-model -> /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpyrgi8yqe/artifacts/model


ivj5rbwx8e-algo-1-r8i9d exited with code 0
Aborting on container exit...
 Container ivj5rbwx8e-algo-1-r8i9d  Stopping
 Container ivj5rbwx8e-algo-1-r8i9d  Stopped


INFO:sagemaker.local.image:===== Job Complete =====
INFO:sagemaker.local.entities:Pipeline step 'AbaloneTrain' SUCCEEDED.
INFO:sagemaker.local.entities:Starting pipeline step: 'AbaloneEval'
INFO:sagemaker.local.image:'Docker Compose' found using Docker CLI.
INFO:sagemaker.local.local_session:Starting processing job
INFO:sagemaker.local.image:Using the long-lived AWS credentials found in session
INFO:sagemaker.local.image:docker compose file: 
networks:
  sagemaker-local:
    name: sagemaker-local
services:
  algo-1-9gxlu:
    container_name: usexdrcyif-algo-1-9gxlu
    entrypoint:
    - python3
    - /opt/ml/processing/input/code/evaluation.py
    environment:
    - '[Masked]'
    - '[Masked]'
    image: 121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-xgboost:1.5-1
    networks:
      sagemaker-local:
        aliases:
        - algo-1-9gxlu
    stdin_open: true
    tty: true
    volumes:
    - /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpa3bxboxg/algo-1-9gxlu

 Container usexdrcyif-algo-1-9gxlu  Creating
 algo-1-9gxlu The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested 
 Container usexdrcyif-algo-1-9gxlu  Created
Attaching to usexdrcyif-algo-1-9gxlu
usexdrcyif-algo-1-9gxlu  | <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
usexdrcyif-algo-1-9gxlu  | <jemalloc>: (This is the expected behaviour if you are running under QEMU)
usexdrcyif-algo-1-9gxlu  |   from pandas import MultiIndex, Int64Index
usexdrcyif-algo-1-9gxlu exited with code 0
Aborting on container exit...
 Container usexdrcyif-algo-1-9gxlu  Stopping
 Container usexdrcyif-algo-1-9gxlu  Stopped


INFO:sagemaker.local.image:===== Job Complete =====
INFO:sagemaker.local.entities:Pipeline step 'AbaloneEval' SUCCEEDED.
INFO:sagemaker.local.entities:Starting pipeline step: 'AbaloneMSECond'
INFO:sagemaker.local.entities:Pipeline step 'AbaloneMSECond' SUCCEEDED.
INFO:sagemaker.local.entities:Starting pipeline step: 'AbaloneCreateModel-RepackModel-0'
INFO:sagemaker.local.image:'Docker Compose' found using Docker CLI.
INFO:sagemaker.local.local_session:Starting training job
INFO:sagemaker.local.image:Using the long-lived AWS credentials found in session
INFO:sagemaker.local.image:docker compose file: 
networks:
  sagemaker-local:
    name: sagemaker-local
services:
  algo-1-5gw4d:
    command: train
    container_name: 9e65ccx0dv-algo-1-5gw4d
    environment:
    - '[Masked]'
    - '[Masked]'
    - '[Masked]'
    - '[Masked]'
    - '[Masked]'
    image: 121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3
    networks:
      sagemaker-local:
        al

Login Succeeded



What's Next?
  View a summary of image vulnerabilities and recommendations → docker scout quickview 121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3
INFO:sagemaker.local.image:image pulled: 121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3
INFO:sagemaker.local.image:docker command: docker compose -f /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpm20_rqcu/docker-compose.yaml up --build --abort-on-container-exit


 Container 9e65ccx0dv-algo-1-5gw4d  Creating
 algo-1-5gw4d The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested 
 Container 9e65ccx0dv-algo-1-5gw4d  Created
Attaching to 9e65ccx0dv-algo-1-5gw4d
9e65ccx0dv-algo-1-5gw4d  | 2024-04-08 07:56:59,725 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training
9e65ccx0dv-algo-1-5gw4d  | 2024-04-08 07:56:59,734 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)
9e65ccx0dv-algo-1-5gw4d  | 2024-04-08 07:56:59,739 sagemaker-training-toolkit INFO     Failed to parse hyperparameter model_archive value s3://sagemaker-ap-southeast-1-879654127886/sagemaker-pipelines-local-mode-example/model/AbaloneTrain-1712561402-30d6/output/model.tar.gz to Json.
9e65ccx0dv-algo-1-5gw4d  | Returning the value itself
9e65ccx0dv-algo-1-5gw4d  | 2024-04-08 07:56:59,815 sagemaker_sklearn_container.training INFO     Invoking u

INFO:root:copying /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpm20_rqcu/algo-1-5gw4d/output/success -> /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpm20_rqcu/artifacts/output
INFO:root:creating /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpm20_rqcu/artifacts/output/data
INFO:root:creating /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpm20_rqcu/artifacts/model/code
INFO:root:copying /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpm20_rqcu/model/code/_repack_script_launcher.sh -> /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpm20_rqcu/artifacts/model/code
INFO:root:copying /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpm20_rqcu/model/code/evaluation.py -> /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpm20_rqcu/artifacts/model/code
INFO:root:copying /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmpm20_rqcu/model/code/preprocessing.py -> /private/var/folders/11/8pq7lf2166bb

9e65ccx0dv-algo-1-5gw4d exited with code 0
Aborting on container exit...
 Container 9e65ccx0dv-algo-1-5gw4d  Stopping
 Container 9e65ccx0dv-algo-1-5gw4d  Stopped


INFO:sagemaker.local.image:===== Job Complete =====
INFO:sagemaker.local.entities:Pipeline step 'AbaloneCreateModel-RepackModel-0' SUCCEEDED.
INFO:sagemaker.local.entities:Starting pipeline step: 'AbaloneCreateModel-CreateModel'
INFO:sagemaker.local.entities:Pipeline step 'AbaloneCreateModel-CreateModel' SUCCEEDED.
INFO:sagemaker.local.entities:Starting pipeline step: 'AbaloneTransform'
INFO:sagemaker.local.image:'Docker Compose' found using Docker CLI.
INFO:sagemaker.local.image:serving
INFO:sagemaker.local.image:creating hosting dir in /private/var/folders/11/8pq7lf2166bbtfcvp2m1z20c0000gn/T/tmp7odp2l7m
INFO:sagemaker.local.image:Using the long-lived AWS credentials found in session
INFO:sagemaker.local.image:docker compose file: 
networks:
  sagemaker-local:
    name: sagemaker-local
services:
  algo-1-9x0ds:
    command: serve
    container_name: 9lv208xgcl-algo-1-9x0ds
    environment:
    - '[Masked]'
    - '[Masked]'
    - '[Masked]'
    - '[Masked]'
    - '[Masked]'
    - '[Mas

Attaching to 9lv208xgcl-algo-1-9x0ds


INFO:sagemaker.local.entities:Checking if serving container is up, attempt: 10
INFO:sagemaker.local.entities:Container still not up, got: -1


9lv208xgcl-algo-1-9x0ds  | <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
9lv208xgcl-algo-1-9x0ds  | <jemalloc>: (This is the expected behaviour if you are running under QEMU)


INFO:sagemaker.local.entities:Checking if serving container is up, attempt: 15
INFO:sagemaker.local.entities:Container still not up, got: -1


9lv208xgcl-algo-1-9x0ds  |   from pandas import MultiIndex, Int64Index
9lv208xgcl-algo-1-9x0ds  | [2024-04-08:07:57:21:INFO] No GPUs detected (normal if no gpus installed)
9lv208xgcl-algo-1-9x0ds  | [2024-04-08:07:57:21:INFO] No GPUs detected (normal if no gpus installed)
9lv208xgcl-algo-1-9x0ds  | [2024-04-08:07:57:21:INFO] nginx config: 
9lv208xgcl-algo-1-9x0ds  | worker_processes auto;
9lv208xgcl-algo-1-9x0ds  | daemon off;
9lv208xgcl-algo-1-9x0ds  | pid /tmp/nginx.pid;
9lv208xgcl-algo-1-9x0ds  | error_log  /dev/stderr;
9lv208xgcl-algo-1-9x0ds  | 
9lv208xgcl-algo-1-9x0ds  | worker_rlimit_nofile 4096;
9lv208xgcl-algo-1-9x0ds  | 
9lv208xgcl-algo-1-9x0ds  | events {
9lv208xgcl-algo-1-9x0ds  |   worker_connections 2048;
9lv208xgcl-algo-1-9x0ds  | }
9lv208xgcl-algo-1-9x0ds  | 
9lv208xgcl-algo-1-9x0ds  | http {
9lv208xgcl-algo-1-9x0ds  |   include /etc/nginx/mime.types;
9lv208xgcl-algo-1-9x0ds  |   default_type application/octet-stream;
9lv208xgcl-algo-1-9x0ds  |   access_log /dev/stdout 

INFO:sagemaker.local.entities:Checking if serving container is up, attempt: 20
INFO:sagemaker.local.entities:Container still not up, got: -1
INFO:sagemaker.local.entities:Checking if serving container is up, attempt: 25
INFO:sagemaker.local.entities:Container still not up, got: 502


9lv208xgcl-algo-1-9x0ds  | Processing /opt/ml/code
9lv208xgcl-algo-1-9x0ds  |   Preparing metadata (setup.py) ... [?25l2024/04/08 07:57:28 [crit] 32#32: *1 connect() to unix:/tmp/gunicorn.sock failed (2: No such file or directory) while connecting to upstream, client: 192.168.65.1, server: , request: "GET /ping HTTP/1.1", upstream: "http://unix:/tmp/gunicorn.sock:/ping", host: "localhost:8080"
9lv208xgcl-algo-1-9x0ds  | 192.168.65.1 - - [08/Apr/2024:07:57:28 +0000] "GET /ping HTTP/1.1" 502 182 "-" "python-urllib3/1.26.18"
9lv208xgcl-algo-1-9x0ds  | done
9lv208xgcl-algo-1-9x0ds  | [?25hBuilding wheels for collected packages: inference
9lv208xgcl-algo-1-9x0ds  |   Building wheel for inference (setup.py) ... [?25ldone
9lv208xgcl-algo-1-9x0ds  | [?25h  Created wheel for inference: filename=inference-1.0.0-py2.py3-none-any.whl size=17002 sha256=7b3f631ffe47de3468b4bb9338b8eacb7fa093fc1d9b6aca1dc74182cb1219c3
9lv208xgcl-algo-1-9x0ds  |   Stored in directory: /home/model-server/tmp/pip-ep

INFO:sagemaker.local.entities:Checking if serving container is up, attempt: 30
INFO:sagemaker.local.entities:Container still not up, got: 502


9lv208xgcl-algo-1-9x0ds  | Successfully installed inference-1.0.0
9lv208xgcl-algo-1-9x0ds  | [0m2024/04/08 07:57:33 [crit] 32#32: *3 connect() to unix:/tmp/gunicorn.sock failed (2: No such file or directory) while connecting to upstream, client: 192.168.65.1, server: , request: "GET /ping HTTP/1.1", upstream: "http://unix:/tmp/gunicorn.sock:/ping", host: "localhost:8080"
9lv208xgcl-algo-1-9x0ds  | 192.168.65.1 - - [08/Apr/2024:07:57:33 +0000] "GET /ping HTTP/1.1" 502 182 "-" "python-urllib3/1.26.18"
9lv208xgcl-algo-1-9x0ds  | [2024-04-08 07:57:36 +0000] [62] [INFO] Starting gunicorn 19.10.0
9lv208xgcl-algo-1-9x0ds  | [2024-04-08 07:57:36 +0000] [62] [INFO] Listening at: unix:/tmp/gunicorn.sock (62)
9lv208xgcl-algo-1-9x0ds  | [2024-04-08 07:57:36 +0000] [62] [INFO] Using worker: gevent
9lv208xgcl-algo-1-9x0ds  |   return io.open(fd, *args, **kwargs)
9lv208xgcl-algo-1-9x0ds  | [2024-04-08 07:57:36 +0000] [68] [INFO] Booting worker with pid: 68
9lv208xgcl-algo-1-9x0ds  | [2024-04-08 07:5

INFO:sagemaker.local.entities:Checking if serving container is up, attempt: 35


9lv208xgcl-algo-1-9x0ds  | <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
9lv208xgcl-algo-1-9x0ds  | <jemalloc>: (This is the expected behaviour if you are running under QEMU)
9lv208xgcl-algo-1-9x0ds  | <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
9lv208xgcl-algo-1-9x0ds  | <jemalloc>: (This is the expected behaviour if you are running under QEMU)
9lv208xgcl-algo-1-9x0ds  | <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
9lv208xgcl-algo-1-9x0ds  | <jemalloc>: (This is the expected behaviour if you are running under QEMU)
9lv208xgcl-algo-1-9x0ds  | <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
9lv208xgcl-algo-1-9x0ds  | <jemalloc>: (This is the expected behaviour if you are running under QEMU)
9lv208xgcl-algo-1-9x0ds  |   from pandas import MultiIndex, Int64Index
9lv208xgcl-algo-1-9x0ds  |   from pandas import MultiIndex, Int64Index
9lv208xgcl-algo-1-9x0ds  |   from pandas import MultiIndex, In

INFO:sagemaker.local.entities:Pipeline step 'AbaloneTransform' SUCCEEDED.
INFO:sagemaker.local.entities:Pipeline execution 13a73e55-1a1a-4186-955e-10f9252aa090 SUCCEEDED


## Output pipeline to JSON

In [None]:
import json

definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-ap-southeast-1-879654127886/sagemaker-pipelines-local-mode-example/abalone-data-set/abalone-dataset.csv'},
  {'Name': 'MseThreshold', 'Type': 'Float', 'DefaultValue': 7.0}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AbaloneProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': 1,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-scikit-learn:1.0-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/preprocessing.py']},
    'RoleArn': 'arn:aws:iam::879654127886:role/AirflowSageMakerExecutionRole',
    'ProcessingInputs': [{'InputNa

In [37]:
steps = execution.list_steps()
steps

{'PipelineExecutionSteps': [{'EndTime': 1712561402.308772,
   'Metadata': {'ProcessingJob': {'Arn': 'AbaloneProcess-1712561387-53ed'}},
   'StartTime': 1712561387.224455,
   'StepName': 'AbaloneProcess',
   'StepStatus': 'Succeeded'},
  {'EndTime': 1712562306.733172,
   'Metadata': {'TrainingJob': {'Arn': 'AbaloneTrain-1712561402-30d6'}},
   'StartTime': 1712561402.309519,
   'StepName': 'AbaloneTrain',
   'StepStatus': 'Succeeded'},
  {'EndTime': 1712562324.267074,
   'Metadata': {'ProcessingJob': {'Arn': 'AbaloneEval-1712562306-90ac'}},
   'StartTime': 1712562306.736914,
   'StepName': 'AbaloneEval',
   'StepStatus': 'Succeeded'},
  {'EndTime': 1712562324.620099,
   'Metadata': {'Condition': {'Outcome': True}},
   'StartTime': 1712562324.268884,
   'StepName': 'AbaloneMSECond',
   'StepStatus': 'Succeeded'},
  {'EndTime': 1712563026.678106,
   'Metadata': {'TrainingJob': {'Arn': 'AbaloneCreateModel-RepackModel-0-1712562324-8755'}},
   'StartTime': 1712562324.620788,
   'StepDescripti

## Get Output values

In [38]:
# Get output files from processing job

processing_job_name = steps["PipelineExecutionSteps"][0]["Metadata"]["ProcessingJob"]["Arn"]
outputs = local_pipeline_session.sagemaker_client.describe_processing_job(
    ProcessingJobName=processing_job_name
)["ProcessingOutputConfig"]["Outputs"]
for key in outputs:
    print(outputs[key]["S3Output"]["S3Uri"])

s3://sagemaker-ap-southeast-1-879654127886/LocalModelPipeline/13a73e55-1a1a-4186-955e-10f9252aa090/AbaloneProcess/output/train
s3://sagemaker-ap-southeast-1-879654127886/LocalModelPipeline/13a73e55-1a1a-4186-955e-10f9252aa090/AbaloneProcess/output/validation
s3://sagemaker-ap-southeast-1-879654127886/LocalModelPipeline/13a73e55-1a1a-4186-955e-10f9252aa090/AbaloneProcess/output/test


In [39]:
# Get output from training job
training_job_name = steps["PipelineExecutionSteps"][1]["Metadata"]["TrainingJob"]["Arn"]
outputs = local_pipeline_session.sagemaker_client.describe_training_job(
    TrainingJobName=training_job_name
)
print("Model location : ", outputs["ModelArtifacts"]["S3ModelArtifacts"])

Model location :  s3://sagemaker-ap-southeast-1-879654127886/sagemaker-pipelines-local-mode-example/model/AbaloneTrain-1712561402-30d6/output/model.tar.gz


In [40]:
# Get output from model evaluation step (processing job)
processing_job_name = steps["PipelineExecutionSteps"][2]["Metadata"]["ProcessingJob"]["Arn"]
outputs = local_pipeline_session.sagemaker_client.describe_processing_job(
    ProcessingJobName=processing_job_name
)["ProcessingOutputConfig"]["Outputs"]
for key in outputs:
    print(outputs[key]["S3Output"]["S3Uri"])

s3://sagemaker-ap-southeast-1-879654127886/LocalModelPipeline/13a73e55-1a1a-4186-955e-10f9252aa090/AbaloneEval/output/evaluation


In [46]:
# Get output of ModelStep
import json

model_name = steps["PipelineExecutionSteps"][-2]["Metadata"]["Model"]["Arn"]
outputs = local_pipeline_session.sagemaker_client.describe_model(ModelName=model_name)
print(outputs)

{'ModelName': 'AbaloneCreateModel-CreateModel-1712563026-ff46', 'CreationTime': datetime.datetime(2024, 4, 8, 14, 57, 6, 685493), 'ExecutionRoleArn': 'local:arn-does-not-matter', 'ModelArn': 'local:arn-does-not-matter', 'PrimaryContainer': {'Image': '121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-xgboost:1.5-1', 'Environment': {'SAGEMAKER_PROGRAM': 'inference.py', 'SAGEMAKER_SUBMIT_DIRECTORY': '/opt/ml/model/code', 'SAGEMAKER_CONTAINER_LOG_LEVEL': '20', 'SAGEMAKER_REGION': 'ap-southeast-1'}, 'ModelDataUrl': 's3://sagemaker-ap-southeast-1-879654127886/sagemaker-xgboost-2024-04-08-07-03-57-578/AbaloneCreateModel-RepackModel-0-1712562324-8755/output/model.tar.gz'}}


In [48]:
# Get output from the TransformStep

transform_job_name = steps["PipelineExecutionSteps"][-1]["Metadata"]["TransformJob"]["Arn"]
outputs = local_pipeline_session.sagemaker_client.describe_transform_job(
    TransformJobName=transform_job_name
)
print(outputs)

{'TransformJobStatus': 'Completed', 'ModelName': 'AbaloneCreateModel-CreateModel-1712563026-ff46', 'TransformJobName': 'AbaloneTransform-1712563026-a2f4', 'TransformJobArn': 'local:arn-does-not-matter', 'TransformEndTime': datetime.datetime(2024, 4, 8, 14, 58, 1, 796574), 'CreationTime': datetime.datetime(2024, 4, 8, 14, 57, 55, 675007), 'TransformStartTime': datetime.datetime(2024, 4, 8, 14, 57, 55, 675007), 'Environment': {}, 'BatchStrategy': 'MultiRecord', 'TransformResources': {'InstanceCount': 1, 'InstanceType': 'ml.m5.xlarge'}, 'TransformOutput': {'S3OutputPath': 's3://sagemaker-ap-southeast-1-879654127886/sagemaker-pipelines-local-mode-example/transform'}, 'TransformInput': {'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix', 'S3Uri': 's3://sagemaker-ap-southeast-1-879654127886/LocalModelPipeline/13a73e55-1a1a-4186-955e-10f9252aa090/AbaloneProcess/output/test/test.csv'}}, 'ContentType': 'text/csv'}}


# Transition to running pipeline as SageMaker Managed Pipeline
We will now use a non-local PipelineSession object to re-run the Pipeline steps via SageMaker as a managed service. This will run all pipeline steps as SageMaker-managed processes. This will also allow us to view and track the results directly in the SageMaker Studio UI.

In [49]:
from sagemaker.workflow.pipeline_context import PipelineSession

pipeline_session = PipelineSession()

In [50]:
# Recreate the SKLearnProcessor with non-local session

framework_version = "1.0-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=pipeline_session,  # use non-local session
)

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="AbaloneProcess", step_args=processor_args)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [51]:
print(f"image_uri: {image_uri}")
print(f"model_path: {model_path}")

image_uri: 121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-xgboost:1.5-1
model_path: s3://sagemaker-ap-southeast-1-879654127886/sagemaker-pipelines-local-mode-example/model


In [52]:
# Recreate the Estimator instance with non-local session

xgb_train = Estimator(
    image_uri=image_uri,
    entry_point="code/abalone.py",
    instance_type=instance_type,
    instance_count=training_instance_count,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,  # use non-local session
)

xgb_train.set_hyperparameters(
    objective="reg:squarederror",
    learning_rate=0.01,
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

step_train = TrainingStep(
    name="AbaloneTrain",
    step_args=train_args,
)

In [53]:
# Recreate the Script Processor instance with non-local session

script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=instance_type,
    instance_count=processing_instance_count,
    base_job_name="script-abalone-eval",
    role=role,
    sagemaker_session=pipeline_session,  # use non-local session
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

step_eval = ProcessingStep(
    name="AbaloneEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

In [54]:
# Recreate the Model instance with non-local session

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    source_dir="code",
    entry_point="inference.py",
    role=role,
    sagemaker_session=pipeline_session,  # use non-local session
)

step_create_model = ModelStep(
    name="AbaloneCreateModel", step_args=model.create(instance_type=instance_type)
)

In [55]:
# Recreate the Transformer instance with non-local session

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type=instance_type,
    instance_count=transform_instance_count,
    output_path=f"s3://{default_bucket}/{prefix}/transform",
    sagemaker_session=pipeline_session,  # use non-local session
)

transform_data = Join(
    on="/",
    values=[
        step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
        "test.csv",
    ],
)

transform_args = transformer.transform(transform_data, content_type="text/csv")

step_transform = TransformStep(name="AbaloneTransform", step_args=transform_args)

In [56]:
# Recreate the Step condition with new step instances

step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_create_model, step_transform],
    else_steps=[step_fail],
)

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"LocalModelPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=local_pipeline_session,
)

In [63]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"SM-Managed-Pipeline"
sm_pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
)

In [64]:
sm_pipeline.upsert(role_arn=role)




{'PipelineArn': 'arn:aws:sagemaker:ap-southeast-1:879654127886:pipeline/SM-Managed-Pipeline',
 'ResponseMetadata': {'RequestId': '52fef7e9-a6ad-43a7-940e-b89dc613f2b9',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '52fef7e9-a6ad-43a7-940e-b89dc613f2b9',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '92',
   'date': 'Mon, 08 Apr 2024 08:13:57 GMT'},
  'RetryAttempts': 0}}

In [65]:
# start execution of SageMaker-managed pipeline
sm_execution = sm_pipeline.start()

In [66]:
sm_execution.wait(delay=60, max_attempts=60)


In [67]:
sm_execution.list_steps()


[{'StepName': 'AbaloneTransform',
  'StartTime': datetime.datetime(2024, 4, 8, 15, 28, 9, 565000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 4, 8, 15, 33, 1, 819000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:ap-southeast-1:879654127886:transform-job/pipelines-g9s99w1fka0h-AbaloneTransform-mpA8lDtxLK'}},
  'AttemptCount': 1},
 {'StepName': 'AbaloneCreateModel-CreateModel',
  'StartTime': datetime.datetime(2024, 4, 8, 15, 28, 8, 28000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 4, 8, 15, 28, 9, 252000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:ap-southeast-1:879654127886:model/pipelines-g9s99w1fka0h-AbaloneCreateModel-C-URGhEs7Ljc'}},
  'AttemptCount': 1},
 {'StepName': 'AbaloneCreateModel-RepackModel-0',
  'StartTime': datetime.datetime(2024, 4, 8, 15, 25, 32, 505000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 4, 8, 15, 28, 7, 44