In [2]:
!pip install -U sagemaker

[0m

### 환경 구성

#### import module

In [3]:
import os
import time
import boto3
import numpy as np
import pandas as pd
import sagemaker
from sagemaker import get_execution_role
from sagemaker.workflow.pipeline_context import PipelineSession


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


#### SageMaker Session Setup

In [4]:
sess = boto3.Session()
sm = sess.client("sagemaker")
role = get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name

pipeline_session = PipelineSession()

model_package_group_name = "PipelineModelPackageGroup"
prefix = "pipeline-model-example"
pipeline_name = "serial-inference-pipeline"    # SageMaker Pipeline name

### Download California Housing dataset and upload to Amazon S3

In [5]:
# create data dirs
data_dir = os.path.join(os.getcwd(), "data")
os.makedirs(data_dir, exist_ok=True)

raw_dir = os.path.join(os.getcwd(), "data/raw")
os.makedirs(raw_dir, exist_ok=True)

In [6]:
# download data from s3
s3 = boto3.client("s3")
s3.download_file(
    f"sagemaker-example-files-prod-{region}",
    "datasets/tabular/california_housing/cal_housing.tgz",
    "cal_housing.tgz",
)

In [7]:
!tar -zxf cal_housing.tgz

tar: CaliforniaHousing/cal_housing.data: Cannot change ownership to uid 10017, gid 166: Operation not permitted
tar: CaliforniaHousing/cal_housing.domain: Cannot change ownership to uid 10017, gid 166: Operation not permitted
tar: Exiting with failure status due to previous errors


In [8]:
columns = [
    "longitude",
    "latitude",
    "housingMedianAge",
    "totalRooms",
    "totalBedrooms",
    "population",
    "households",
    "medianIncome",
    "medianHouseValue",
]
cal_housing_df = pd.read_csv("CaliforniaHousing/cal_housing.data", names=columns, header=None)
cal_housing_df["medianHouseValue"] /= 500000 # Scaling
cal_housing_df.to_csv(f"./data/raw/raw_data_all.csv", header=True, index=False)
rawdata_s3_prefix = "{}/data/raw".format(prefix)
raw_s3 = sagemaker_session.upload_data(path="./data/raw/", key_prefix=rawdata_s3_prefix)
print(raw_s3)

s3://sagemaker-ap-northeast-2-532805286864/pipeline-model-example/data/raw


### Define Parameters to Parameterize Pipeline Execution

In [9]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

# raw input data
input_data = ParameterString(name="inputData", default_value=raw_s3)

# status of newly trained model in registry
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="Approved")

# processing step parameters
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

# training step parameters
training_instance_type = ParameterString("TrainingInstanceType", default_value="ml.m5.xlarge")
training_epochs = ParameterString("TrainingEpochs", default_value="10")

# model performance step parameters
accuracy_mse_threshold = ParameterFloat(name="AccuracyMseThreshold", default_value=0.75)

### Define a Processing Step for Feature Engineering

The below preprocessing script, in addition to creating a scaler, contains the necessary functions for it to be deployed as part of a pipeline model

In [10]:
!mkdir -p code

In [11]:
%%writefile code/preprocess.py

import glob
import numpy as np
import pandas as pd
import os
import json
import joblib
from io import StringIO
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import tarfile

try:
    from sagemaker_containers.beta.framework import (
        content_types,
        encoders,
        env,
        modules,
        transformer,
        worker,
        server,
    )
except ImportError:
    pass

feature_columns = [
    "longitude",
    "latitude",
    "housingMedianAge",
    "totalRooms",
    "totalBedrooms",
    "population",
    "households",
    "medianIncome",
]
label_column = "medianHouseValue"

base_dir = "/opt/ml/processing"
base_output_dir = "/opt/ml/output/"

if __name__ == "__main__":
    df = pd.read_csv(f"{base_dir}/input/raw_data_all.csv")
    feature_data = df.drop(label_column, axis=1, inplace=False)
    label_data = df[label_column]
    x_train, x_test, y_train, y_test = train_test_split(feature_data, label_data, test_size=0.33)

    scaler = StandardScaler()

    scaler.fit(x_train)
    x_train = scaler.transform(x_train)
    x_test = scaler.transform(x_test)

    train_dataset = pd.concat([pd.DataFrame(x_train), y_train.reset_index(drop=True)], axis=1)
    test_dataset = pd.concat([pd.DataFrame(x_test), y_test.reset_index(drop=True)], axis=1)

    train_dataset.columns = feature_columns + [label_column]
    test_dataset.columns = feature_columns + [label_column]

    train_dataset.to_csv(f"{base_dir}/train/train.csv", header=True, index=False)
    test_dataset.to_csv(f"{base_dir}/test/test.csv", header=True, index=False)
    joblib.dump(scaler, "model.joblib")
    with tarfile.open(f"{base_dir}/scaler_model/model.tar.gz", "w:gz") as tar_handle:
        tar_handle.add(f"model.joblib")


def input_fn(input_data, content_type):
    """Parse input data payload

    We currently only take csv input. Since we need to process both labelled
    and unlabelled data we first determine whether the label column is present
    by looking at how many columns were provided.
    """
    if content_type == "text/csv":
        # Read the raw input data as CSV.
        df = pd.read_csv(StringIO(input_data), header=None)

        if len(df.columns) == len(feature_columns) + 1:
            # This is a labelled example, includes the ring label
            df.columns = feature_columns + [label_column]
        elif len(df.columns) == len(feature_columns):
            # This is an unlabelled example.
            df.columns = feature_columns

        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))


def output_fn(prediction, accept):
    """Format prediction output

    The default accept/content-type between containers for serial inference is JSON.
    We also want to set the ContentType or mimetype as the same value as accept so the next
    container can read the response payload correctly.
    """
    if accept == "application/json":
        instances = []
        for row in prediction.tolist():
            instances.append(row)
        json_output = {"instances": instances}

        return worker.Response(json.dumps(json_output), mimetype=accept)
    elif accept == "text/csv":
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this script.".format(accept))


def predict_fn(input_data, model):
    """Preprocess input data

    We implement this because the default predict_fn uses .predict(), but our model is a preprocessor
    so we want to use .transform().

    The output is returned in the following order:

        rest of features either one hot encoded or standardized
    """
    features = model.transform(input_data)

    if label_column in input_data:
        # Return the label (as the first column) and the set of features.
        return np.insert(features, 0, input_data[label_column], axis=1)
    else:
        # Return only the set of features
        return features


def model_fn(model_dir):
    """Deserialize fitted model"""
    preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
    return preprocessor

Overwriting code/preprocess.py


### Pipelie Session

In [12]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version = sklearn_framework_version,
    instance_type = "ml.m5.large",
    instance_count = processing_instance_count,
    base_job_name = "sklearn-housing-data-process",
    role=role,
    sagemaker_session=pipeline_session,
)

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="scaler_model", source="/opt/ml/processing/scaler_model"),
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocess.py",
    job_name="housing-data-process",
)
    



In [13]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

step_process = ProcessingStep(
    name="PreprocessData",
    step_args=processor_args,
)

### Define a Training Step to Train Model

In [14]:
%%writefile code/train.py

import argparse
import numpy as np
import os
import tensorflow as tf
import pandas as pd

feature_columns = [
    "longitude",
    "latitude",
    "housingMedianAge",
    "totalRooms",
    "totalBedrooms",
    "population",
    "households",
    "medianIncome",
]
label_column = "medianHouseValue"

def parse_args():
    parser = argparse.ArgumentParser()
    
    # hyperparameters sent by the client are passed as command-line arguements to to script
    parser.add_argument("--epochs", type=int, default=1)
    parser.add_argument("--batch_size", type=int, default=64)
    parser.add_argument("--learning_rate", type=float, default=0.1)
    
    # data directories
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
    
    # model directory
    parser.add_argument("--sm-model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    
    return parser.parse_known_args()

def get_train_data(train_dir):
    train_data = pd.read_csv(os.path.join(train_dir, "train.csv"))
    x_train = train_data[feature_columns].to_numpy()
    y_train = train_data[label_column].to_numpy()
    print("x train: ", x_train.shape, ", y train: ", y_train.shape)
    
    return x_train, y_train

def get_test_data(test_dir):
    test_data = pd.read_csv(os.path.join(test_dir, "test.csv"))
    x_test = test_data[feature_columns].to_numpy()
    y_test = test_data[label_column].to_numpy()
    
    return x_test, y_test

def get_model():
    inputs = tf.keras.Input(shape=(8,0))
    hidden_1 = tf.keras.layers.Dense(8, activation="tanh")(inpus)
    hidden_2 = tf.keras.layers.Dense(4, activation="sigmoid")(hidden_1)
    outputs = tf.keras.layers.Dense(1)(hidden_2)
    return tf.keras.Model(inputs=inputs, outputs=outputs)

if __name__ == "__main__":
    args, _ = parse_args()
    
    print("Training data location: {}".format(args.train))
    print("Test data location: {}".format(args.test))
    x_train, y_train = get_train_data(args.train)
    x_test, y_test = get_test_data(args.test)
    
    batch_size = args.batch_size
    epochs = args.epochs
    learning_rate = args.learning_rate
    print(
        "batch_size = {}, epochs = {}, learning_rate = {}".format(batch_size, epochs, learning_rate)
    )
    
    model = get_model()
    optimizer = tf.keras.optimizer.SGD(learning_rate)
    model.fit(
        x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test)
    )
    
    # evaluate on test set
    scores = model.evaluate(x_test, y_test, batch_size, verbose=2)
    print("\nTest MSE: ", scores)
    
    # save model
    model.save(args.sm_model_dir + "/1")
    

Overwriting code/train.py


In [15]:
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.model_step import ModelStep
import time

# Where to store the trained model
model_path = f"s3://{bucket}/{prefix}/model/"

hyperparameters = {"epochs": training_epochs }
tensorflow_version = "2.4.1"
python_version = "py37"

tf2_estimator = TensorFlow(
    source_dir="code",
    entry_point="train.py",
    instance_type=training_instance_type,
    instance_count=1,
    framework_version=tensorflow_version,
    role=role,
    base_job_name="tensorflow-train-model",
    output_path=model_path,
    hyperparameters=hyperparameters,
    py_version=python_version,
    sagemaker_session=pipeline_session,
)

# Note how the input to the training job directly references the output of the previous step
train_args = tf2_estimator.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

step_train_model = TrainingStep(name="TrainTensorflowModel", step_args=train_args)

### 임시 테스트

In [17]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig

pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        training_epochs,
        input_data,
    ],
    steps=[step_process, step_train_model],
    pipeline_definition_config=pipeline_definition_config,
)

import json

definition = json.loads(pipeline.definition())
print(definition)

pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


{'Version': '2020-12-01', 'Metadata': {}, 'Parameters': [{'Name': 'ProcessingInstanceType', 'Type': 'String', 'DefaultValue': 'ml.m5.xlarge'}, {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1}, {'Name': 'TrainingInstanceType', 'Type': 'String', 'DefaultValue': 'ml.m5.xlarge'}, {'Name': 'TrainingEpochs', 'Type': 'String', 'DefaultValue': '100'}, {'Name': 'inputData', 'Type': 'String', 'DefaultValue': 's3://sagemaker-ap-northeast-2-532805286864/pipeline-model-example/data/raw'}], 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'}, 'TrialName': {'Get': 'Execution.PipelineExecutionId'}}, 'Steps': [{'Name': 'PreprocessData', 'Type': 'Processing', 'Arguments': {'ProcessingJobName': 'sklearn-housing-data-process', 'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.large', 'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'}, 'VolumeSizeInGB': 30}}, 'AppSpecification': {'ImageUri': '366743142698.dkr.ecr.ap-northeast-

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"