In [2]:
# SageMakerセッションを作成する
import sagemaker
import boto3
import botocore.config
import sagemaker
import pandas as pd
import time
import datetime

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sm = boto3.Session().client(service_name="sagemaker", region_name=region)
s3 = boto3.Session().client(service_name="s3", region_name=region)
featurestore_runtime = boto3.Session().client(service_name="sagemaker-featurestore-runtime", region_name=region)

timestamp = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')

In [3]:
PIPELINE_NAME = f"reviews-bert-pipeline-{timestamp}"
INPUT_S3_URI = "s3://youichiro-amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Software_v1_00.tsv.gz"
FEATURE_STORE_OFFLINE_PREFIX=f"reviews-feature-store-{timestamp}"
FEATURE_GROUP_NAME=f"reviews-feature-group-{timestamp}"
MODEL_NAME=f"reviews-bert-model-{timestamp}"
MODEL_PACKAGE_GROUP_NAME=f"reviews-bert-model-group-{timestamp}"
PREPROCESSING_SCRIPT_PATH="./src/preprocessing.py"
TRAIN_SCRIPT_PATH="./src/train.py"
EVALUATION_SCRIPT_PATH="./src/evaluation.py"

In [4]:
# Experimentを定義する
from smexperiments.experiment import Experiment

pipeline_experiment = Experiment.create(
    experiment_name=PIPELINE_NAME,
    description="Amazon Customer Reviews BERT Pipeline Experiment",
    sagemaker_boto_client=sm,
)
pipeline_experiment_name = pipeline_experiment.experiment_name

In [5]:
# パイプラインのパラメータを設定する
from sagemaker.workflow.parameters import ParameterString, ParameterInteger, ParameterFloat

processing_instance_count    = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type     = ParameterString(name="ProcessingInstanceType", default_value="ml.c5.2xlarge")
train_instance_type          = ParameterString(name="TrainInstanceType", default_value="ml.c5.9xlarge")
train_instance_count         = ParameterInteger(name="TrainInstanceCount", default_value=1)

In [6]:
# SKLearnProcessorを定義する
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={"AWS_DEFAULT_REGION": region},
)

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [8]:
# ProcessingStepを定義する
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processing_step = ProcessingStep(
    name="Processing",
    code=PREPROCESSING_SCRIPT_PATH,
    processor=processor,
    inputs=[
        ProcessingInput(input_name="input", source=INPUT_S3_URI, destination="/opt/ml/processing/input/data/", s3_data_distribution_type="ShardedByS3Key"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/output/bert/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/output/bert/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/output/bert/test"),
    ],
    job_arguments=[
        "--feature-store-offline-prefix",
        FEATURE_STORE_OFFLINE_PREFIX,
        "--feature-group-name",
        FEATURE_GROUP_NAME,
    ],
)

In [10]:
# Estimatorを作成する
from sagemaker.tensorflow import TensorFlow

metrics_definitions = [
    {"Name": "train:loss", "Regex": "loss: ([0-9\\.]+)"},
    {"Name": "train:accuracy", "Regex": "accuracy: ([0-9\\.]+)"},
    {"Name": "validation:loss", "Regex": "val_loss: ([0-9\\.]+)"},
    {"Name": "validation:accuracy", "Regex": "val_accuracy: ([0-9\\.]+)"},
]

estimator = TensorFlow(
    entry_point=TRAIN_SCRIPT_PATH,
    role=role,
    instance_count=train_instance_count,
    instance_type=train_instance_type,
    volume_size=train_volume_size,
    py_version="py37",
    framework_version="2.3.1",
    hyperparameters={
        "epochs": "1",
        "learning_rate": "0.00001",
        "epsilon": "0.00000001",
        "train_batch_size": "128",
        "validation_batch_size": "128",
        "test_batch_size": "128",
        "train_steps_per_epoch": "50",
        "validation_steps": "50",
        "test_steps": "50",
        "max_seq_length": 64,
        "freeze_bert_layer": "False",
        "enable_sagemaker_debugger": "True",
        "enable_checkpointing": "True",
        "enable_tensorboard": "True",
        "run_validation": "True",
        "run_test": "False",
        "run_sample_predictions": "False",
    },
    input_mode="File",
    metric_definitions=metrics_definitions,
    model_dir=f"s3://{bucket}/model"
)

In [11]:
# TrainingStepを定義する
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name="Train",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

In [12]:
from sagemaker.sklearn.processing import SKLearnProcessor

evaluation_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={"AWS_DEFAULT_REGION": region},
    max_runtime_in_seconds=7200,
)

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [13]:
# EvaluationStepを定義する
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(name="EvaluationReport", output_name="metrics", path="evaluation.json")

evaluation_step = ProcessingStep(
    name="EvaluateModel",
    processor=evaluation_processor,
    code=EVALUATION_SCRIPT_PATH,
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/input/model",
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingInputs["input"].S3Input.S3Uri,
            destination="/opt/ml/processing/input/data",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="metrics", source="/opt/ml/processing/output/metrics/", s3_upload_mode="EndOfJob"),
    ],
    property_files=[evaluation_report],
)

In [14]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

In [15]:
deploy_instance_type = ParameterString(name="DeployInstanceType", default_value="ml.m5.4xlarge")
deploy_instance_count = ParameterInteger(name="DeployInstanceCount", default_value=1)

In [16]:
inference_image_uri = sagemaker.image_uris.retrieve(
    framework="tensorflow",
    region=region,
    version="2.3.1",
    instance_type=deploy_instance_type,
    image_scope="inference",
)

In [17]:
from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
    name="RegisterModel",
    estimator=estimator,
    image_uri=inference_image_uri,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/jsonlines"],
    response_types=["application/jsonlines"],
    inference_instances=[deploy_instance_type],
    transform_instances=["ml.m5.4xlarge"],
    model_package_group_name=MODEL_PACKAGE_GROUP_NAME,
    approval_status="PendingManualApproval",
)

In [18]:
from sagemaker.model import Model

model = Model(
    name=MODEL_NAME,
    image_uri=inference_image_uri,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

In [19]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep

create_inputs = CreateModelInput(instance_type=deploy_instance_type)
create_step = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs=create_inputs,
)

In [20]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep, JsonGet

min_accuracy_value = ParameterFloat(name="MinAccuracyValue", default_value=0.10)
minimum_accuracy_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=evaluation_step,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value",
    ),
    right=min_accuracy_value,
)
minimum_accuracy_condition_step = ConditionStep(
    name="AccuracyCondition",
    conditions=[minimum_accuracy_condition],
    if_steps=[register_step, create_step],  # 条件を満たしたらモデル登録に進む
    else_steps=[],  # 満たさなかったらパイプラインを終了する
)

See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


In [21]:
# パイプラインを実行する
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=PIPELINE_NAME,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        train_instance_type,
        train_instance_count,
        deploy_instance_type,
        deploy_instance_count,
        min_accuracy_value,
    ],
    steps=[processing_step, training_step, evaluation_step, minimum_accuracy_condition_step],
    sagemaker_session=sess,
)
pipeline.create(role_arn=role)["PipelineArn"]
execution = pipeline.start()

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: latest.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
