# SageMaker Pipelines Step by Step

## Environment

In [1]:
import sagemaker

role = sagemaker.get_execution_role()

## 데이터셋 준비

In [2]:
import os
import tensorflow as tf

sess = sagemaker.Session()
bucket = sess.default_bucket() 

data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)

train_dir = os.path.join(os.getcwd(), 'data/train')
os.makedirs(train_dir, exist_ok=True)

test_dir = os.path.join(os.getcwd(), 'data/test')
os.makedirs(test_dir, exist_ok=True)

raw_dir = os.path.join(os.getcwd(), 'data/raw')
os.makedirs(raw_dir, exist_ok=True)

In [3]:
import numpy as np
from tensorflow.python.keras.datasets import boston_housing
from sklearn.preprocessing import StandardScaler

(x_train, y_train), (x_test, y_test) = boston_housing.load_data()

np.save(os.path.join(raw_dir, 'x_train.npy'), x_train)
np.save(os.path.join(raw_dir, 'x_test.npy'), x_test)
np.save(os.path.join(train_dir, 'y_train.npy'), y_train)
np.save(os.path.join(test_dir, 'y_test.npy'), y_test)

s3_prefix = 'sm-pipelines'
rawdata_s3_prefix = '{}/data/raw'.format(s3_prefix)
raw_s3 = sess.upload_data(path='./data/raw/', key_prefix=rawdata_s3_prefix)
print(raw_s3)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/boston_housing.npz
s3://sagemaker-ap-northeast-2-889750940888/sm-pipelines/data/raw


## Pipelines

### Pipelines 변수 선언

In [4]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat
)

### ProcessingStep
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)

processing_input_data = ParameterString(
    name="InputData",
    default_value=raw_s3,
)

### TrainingStep
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)

training_instance_count = ParameterInteger(
    name="TrainingInstanceCount",
    default_value= 1
)

# training_hp_epochs = ParameterInteger(
training_hp_epochs = ParameterString(
    name="TrainingHPEpochs",
    default_value= '5'
)

# training_hp_batch_size = ParameterInteger(
training_hp_batch_size = ParameterString(
    name="TrainingHPBatchSize",
    default_value= '128'
)

# training_hp_learning_rate = ParameterFloat(
training_hp_learning_rate = ParameterString(
    name="TrainingHPLearningRate",
    default_value= '0.01'
)

### Caching config
https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html

In [10]:
cache_config = sagemaker.workflow.steps.CacheConfig(enable_caching=True, expire_after="1d")

## ProcessingStep

In [11]:
# !pygmentize preprocessing.py

In [12]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sm-pipelines",
    role=role,
)
print("input_data: \n", processing_input_data)

input_data: 
 s3://sagemaker-ap-northeast-2-889750940888/sm-pipelines/data/raw


In [32]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

output_destination = 's3://{}/{}/data'.format(bucket, s3_prefix)

step_process = ProcessingStep(
    name="Processing",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=raw_s3,
                        destination='/opt/ml/processing/input',
                        s3_data_distribution_type='ShardedByS3Key')
    ],
    outputs=[ProcessingOutput(output_name="train",
                              source='/opt/ml/processing/train',
                              destination='{}/train'.format(output_destination)),
             ProcessingOutput(output_name="test",
                              source='/opt/ml/processing/test',
                              destination='{}/test'.format(output_destination))
    ],
#     job_arguments=["--split_rate", f"{split_rate}"],    
    code= 'preprocessing.py',
    cache_config=cache_config
)

In [33]:
!aws s3 cp {train_dir}/y_train.npy {output_destination}/train/y_train.npy
!aws s3 cp {test_dir}/y_test.npy {output_destination}/test/y_test.npy

upload: data/train/y_train.npy to s3://sagemaker-ap-northeast-2-889750940888/sm-pipelines/data/train/y_train.npy
upload: data/test/y_test.npy to s3://sagemaker-ap-northeast-2-889750940888/sm-pipelines/data/test/y_test.npy


In [34]:
!aws s3 ls {output_destination} --recursive

2021-12-10 03:34:37      10736 sm-pipelines/data/raw/x_test.npy
2021-12-10 03:34:37      42144 sm-pipelines/data/raw/x_train.npy
2021-12-10 03:57:45      10736 sm-pipelines/data/test/x_test.npy
2021-12-10 04:05:10        944 sm-pipelines/data/test/y_test.npy
2021-12-10 03:57:44      42144 sm-pipelines/data/train/x_train.npy
2021-12-10 04:05:10       3360 sm-pipelines/data/train/y_train.npy


In [35]:
# step_process.properties.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri

## TrainingStep

In [36]:
from sagemaker.tensorflow import TensorFlow

hyperparameters = {'epochs': training_hp_epochs,
                   'batch_size': training_hp_batch_size,
                   'learning_rate': training_hp_learning_rate}

metric_definitions = [{'Name': 'loss',
                       'Regex': ' loss: ([0-9\\.]+)'},
                     {'Name': 'val_loss',
                       'Regex': ' val_loss: ([0-9\\.]+)'}]

estimator = TensorFlow(source_dir='train_model',
                       entry_point='train.py',
#                       model_dir=model_dir,
                       instance_type=training_instance_type,
                       instance_count=training_instance_count,
                       hyperparameters=hyperparameters,
                       metric_definitions=metric_definitions,
                       role=role,
                       base_job_name='sm-pipelines',
                       framework_version='2.1',
                       py_version='py3')

In [37]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
    name="Training",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data= step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "test": TrainingInput(
            s3_data= step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
#     cache_config=cache_config
)

## CreateModelStep

In [38]:
from sagemaker.model import Model
    
model = Model(
    image_uri= step_train.properties.AlgorithmSpecification.TrainingImage,
    model_data= step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

In [39]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep

inputs = CreateModelInput(
    instance_type="ml.m5.large",
)
step_create_model = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs=inputs,
)

## Pipelines 정의

In [40]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = 'sm-pipelines-demo'
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        processing_input_data,
        training_instance_type,
        training_instance_count,
        training_hp_epochs,
        training_hp_batch_size,
        training_hp_learning_rate
    ],
    steps=[step_process, step_train, step_create_model],
)

**파이프라인을 SageMaker에 제출하고 실행하기**

In [41]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:889750940888:pipeline/sm-pipelines-demo',
 'ResponseMetadata': {'RequestId': '7fd864a2-45ef-454e-ad94-ff1e01ede244',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '7fd864a2-45ef-454e-ad94-ff1e01ede244',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '90',
   'date': 'Fri, 10 Dec 2021 04:05:12 GMT'},
  'RetryAttempts': 0}}

In [42]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType="ml.c5.xlarge",
        ProcessingInstanceCount=2,
        TrainingHPEpochs=100
    )    
)