# 세이지메이커 파이프라인을 사용한 자동 모델 미세 조정 워크플로우 생성

# 주의: 이 노트북을 완료하는데 약 30분이 소요됩니다.

# 잠시 기다려 주세요.

# 세이지메이커 파이프라인

아마존 세이지메이커 파이프라인은 다음을 지원합니다.

* **파이프라인(Pipelines)** - 세이지메이커 작업과 리소스 생성을 조정하기 위한 단계와 조건으로 구성된 방향성 비순환 그래프(DAG).
* **처리 작업 단계(Processing Job Steps)** - 세이지메이커에서 데이터 처리 작업을 실행하기 위한 간소화된 관리 경험. 특징 엔지니어링, 데이터 검증, 모델 평가 및 모델 해석 포함.
* **학습 작업 단계(Training Job Steps)** - 학습 데이터 세트의 예시를 제시해 모델이 예측을 학습하는 반복적인 과정.
* **조건부 단계(Conditional Steps)** - 파이프라인에서 분기(branch)의 조건부 실행 제공.
* **모델 등록(Registering Models)** - 아마존 세이지메이커에서 배포 가능한 모델을 생성하기 위해 모델 레지스트리에 모델 패키지 리소스 생성.
* **매개변수화된 실행(Parameterized Executions)** - 제공된 매개변수에 따라 파이프라인 실행을 다르게 할 수 있도록 지원.
* **변환 작업 단계(Transform Job Steps)** - 데이터 세트에서 학습 또는 추론에 방해되는 노이즈나 편향을 제거하기 위한 배치 변환, 대형 데이터 세트에서 추론을 얻기 위한 배치 변환 또는 영구적인 엔드포인트가 필요하지 않으면 추론 실행.

# 우리의 파이프라인

처리 단계: 허깅 페이스의 `transformer` 라이브러리를 사용해 대화 입력을 토크나이저로 변환하는 특징 엔지니어링을 수행합니다.

학습 단계: `diagsum` 데이터 세트를 사용해 대화를 효과적으로 요약할 수 있도록 모델을 미세 조정합니다.

평가 단계: 미세 조정된 모델과 테스트 데이터 세트를 입력 받아, ROUGE 지표를 기반으로 평가 지표가 포함된 JSON 파일을 생성합니다.

조건부 단계: 평가 단계에서 결정된 모델의 지표가 일정 값을 초과하는 경우 이 모델을 등록할지 여부를 결정합니다.

In [14]:
%pip install sagemaker-experiments==0.1.45

Collecting sagemaker-experiments==0.1.45
  Obtaining dependency information for sagemaker-experiments==0.1.45 from https://files.pythonhosted.org/packages/2b/2b/47d105bbcc328c58b1a23948c3fd9b86930d10b33d220d20c9819e75c41b/sagemaker_experiments-0.1.45-py3-none-any.whl.metadata
  Downloading sagemaker_experiments-0.1.45-py3-none-any.whl.metadata (10 kB)
Downloading sagemaker_experiments-0.1.45-py3-none-any.whl (42 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.7/42.7 kB[0m [31m894.8 kB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hInstalling collected packages: sagemaker-experiments
Successfully installed sagemaker-experiments-0.1.45
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [15]:
from botocore.exceptions import ClientError

import os
import sagemaker
import logging
import boto3
import sagemaker
import pandas as pd

sess = sagemaker.Session()
bucket = sess.default_bucket()
region = boto3.Session().region_name
role = sagemaker.get_execution_role()

import botocore.config

config = botocore.config.Config(
    user_agent_extra='gaia/1.0'
)

sm = boto3.Session().client(service_name="sagemaker", 
                            region_name=region,
                            config=config)
s3 = boto3.Session().client(service_name="s3", 
                            region_name=region,
                            config=config)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


# S3 소스 위치 설정

In [16]:
%store -r raw_input_data_s3_uri

In [17]:
try:
    raw_input_data_s3_uri
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN THE PREVIOUS NOTEBOOK ")
    print("You did not have the required datasets.       ")
    print("++++++++++++++++++++++++++++++++++++++++++++++")

In [18]:
print(raw_input_data_s3_uri)

s3://sagemaker-us-west-2-079002598131/data-summarization/


In [19]:
if not raw_input_data_s3_uri:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN THE PREVIOUS NOTEBOOK ")
    print("You did not have the required datasets.       ")
    print("++++++++++++++++++++++++++++++++++++++++++++++")
else:
    print("[OK]")

[OK]


# 파이프라인을 `실험(Experiment)`로 추적하기

In [20]:
import time

In [21]:
running_executions = 0
completed_executions = 0

try:
    existing_pipeline_executions_response = sm.list_pipeline_executions(
        PipelineName=pipeline_name,
        SortOrder="Descending",
    )

    if "PipelineExecutionSummaries" in existing_pipeline_executions_response.keys():
        if len(existing_pipeline_executions_response["PipelineExecutionSummaries"]) > 0:
            execution = existing_pipeline_executions_response["PipelineExecutionSummaries"][0]
            if "PipelineExecutionStatus" in execution:
                if execution["PipelineExecutionStatus"] == "Executing":
                    running_executions = running_executions + 1
                else:
                    completed_executions = completed_executions + 1

            print(
                "[INFO] You have {} Pipeline execution(s) currently running and {} execution(s) completed.".format(
                    running_executions, completed_executions
                )
            )
    else:
        print("[OK] Please continue.")
except:
    pass

if running_executions == 0:
    timestamp = int(time.time())
    pipeline_name = "dialogue-summary-pipeline-{}".format(timestamp)
    print("Created Pipeline Name: " + pipeline_name)

Created Pipeline Name: dialogue-summary-pipeline-1698262768


In [22]:
print(pipeline_name)

dialogue-summary-pipeline-1698262768


In [23]:
%store pipeline_name

Stored 'pipeline_name' (str)


In [24]:
from smexperiments.experiment import Experiment

pipeline_experiment = Experiment.create(
    experiment_name=pipeline_name,
    description="Dialogue Summarization Pipeline Experiment",
    sagemaker_boto_client=sm,
)
pipeline_experiment_name = pipeline_experiment.experiment_name
print("Created Pipeline Experiment Name: {}".format(pipeline_experiment_name))

Created Pipeline Experiment Name: dialogue-summary-pipeline-1698262768


In [25]:
print(pipeline_experiment_name)

dialogue-summary-pipeline-1698262768


In [26]:
%store pipeline_experiment_name

Stored 'pipeline_experiment_name' (str)


# `트라이얼(Trial)` 생성

In [27]:
from smexperiments.trial import Trial

In [28]:
%store -r pipeline_trial_name

timestamp = int(time.time())
pipeline_trial = Trial.create(
    trial_name="trial-{}".format(timestamp), experiment_name=pipeline_experiment_name, sagemaker_boto_client=sm
)
pipeline_trial_name = pipeline_trial.trial_name
print("Created Trial Name: {}".format(pipeline_trial_name))

no stored variable or alias pipeline_trial_name
Created Trial Name: trial-1698262771


In [29]:
print(pipeline_trial_name)

trial-1698262771


In [30]:
%store pipeline_trial_name

Stored 'pipeline_trial_name' (str)


# 파이프라인 실행을 매개변수화할 매개변수 정의

파이프라인을 매개변수화하고 파이프라인 정의를 수정하지 않고도 파이프라인 실행 및 스케줄에 사용되는 값을 다양하게 할 수 있도록 워크플로우 매개변수를 정의합니다.

지원되는 매개변수 유형에는 다음 항목이 포함됩니다.

* `ParameterString` - 파이썬 타입 `str`을 나타냄
* `ParameterInteger` - 파이썬 타입 `int`를 나타냄
* `ParameterFloat` - 파이썬 타입 `float`을 나타냄

이러한 매개변수는 파이프라인 실행시 기본값을 제공할 수 있으며, 파이프라인 실행 시 덮어쓸 수 있습니다. 지정된 기본값은 매개변수 유형의 인스턴스여야 합니다.

In [31]:
from sagemaker.workflow.parameters import (
    ParameterString,
    ParameterInteger,
    ParameterFloat
)

# 특징 엔지니어링 단계

In [32]:
%store -r raw_input_data_s3_uri

In [33]:
print(raw_input_data_s3_uri)

s3://sagemaker-us-west-2-079002598131/data-summarization/


In [34]:
!aws s3 ls $raw_input_data_s3_uri

2023-10-25 19:28:06    6544107 dialogsum-1.csv
2023-10-25 19:28:06    6572423 dialogsum-2.csv


# 파이프라인 매개변수 설정
이 매개변수는 전체 파이프라인에서 사용합니다.

In [35]:
model_checkpoint='google/flan-t5-base'

In [36]:
model_checkpoint = ParameterString(
    name="ModelCheckpoint",
    default_value=model_checkpoint,
)

# 처리 매개변수 설정

In [37]:
input_data = ParameterString(
    name="InputData",
    default_value=raw_input_data_s3_uri,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1,
)

processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.c5.2xlarge",
)

train_split_percentage = ParameterFloat(
    name="TrainSplitPercentage",
    default_value=0.90,
)

validation_split_percentage = ParameterFloat(
    name="ValidationSplitPercentage",
    default_value=0.05,
)

test_split_percentage = ParameterFloat(
    name="TestSplitPercentage",
    default_value=0.05,
)

`SKLearnProcessor`프로세서 인스턴스를 생성하고 이를 `ProcessingStep`에서 사용합니다.

In [38]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={"AWS_DEFAULT_REGION": region},
    max_runtime_in_seconds=432000,
)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


### 위의 `경고(WARNING)`를 무시하세요.

### 파이프라인 단계 캐싱 설정
[ISO 8601](https://en.wikipedia.org/wiki/ISO_8601#Durations) 형식을 사용해 일정 기간 동안 파이프라인 단계를 캐싱합니다.  

세이지메이커 파이프라인 단계 캐싱에 대한 자세한 내용은 다음을 참조하세요: [세이지메이커 파이프라인 단계 캐싱 설명서](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html)

In [39]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")

마지막으로 프로세서 인스턴스를 사용해 `ProcessingStep`을 구성하고 입력 및 출력 채널과 파이프라인이 실행될 때 실행될 코드를 함께 구성합니다. 기존 파이썬 SDK에 익숙한 경우, 프로세서 인스턴스의 `run` 메서드와 매우 유사합니다.

`ProcessingStep`의 입력 데이터로 전달된 `input_data` 매개변수에 주목하세요. 이 입력 데이터는 프로세서 인스턴스가 실행될 때 사용됩니다.

처리 작업의 출력 구성에서 지정된 `"train"`, `"validation"`, `"test"` 채널에 주목하세요. 이러한 단계 `Properties`는 후속 단계에서 사용될 수 있으며, 실행 시점에 해당하는 런타임 값으로 해석됩니다. 특히, 학습 단계를 정의할 때 이러한 사용을 강조합니다.

In [40]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processing_inputs = [
    ProcessingInput(
        input_name="raw-input-data",
        source=input_data,
        destination="/opt/ml/processing/input/data/",
        s3_data_distribution_type="ShardedByS3Key",
    )
]

processing_outputs = [
    ProcessingOutput(
        output_name="train",
        s3_upload_mode="EndOfJob",
        source="/opt/ml/processing/output/data/train",
    ),
    ProcessingOutput(
        output_name="validation",
        s3_upload_mode="EndOfJob",
        source="/opt/ml/processing/output/data/validation",
    ),
    ProcessingOutput(
        output_name="test",
        s3_upload_mode="EndOfJob",
        source="/opt/ml/processing/output/data/test",
    ),
]

processing_step = ProcessingStep(
    name="Processing",
    code="preprocess.py",
    processor=processor,
    inputs=processing_inputs,
    outputs=processing_outputs,
    job_arguments=[
        "--train-split-percentage",
        str(train_split_percentage.default_value),
        "--validation-split-percentage",
        str(validation_split_percentage.default_value),
        "--test-split-percentage",
        str(test_split_percentage.default_value),
        "--model-checkpoint",
        str(model_checkpoint.default_value),
    ],
    cache_config=cache_config
)

print(processing_step)

ProcessingStep(name='Processing', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)


# 학습 단계

# 학습 하이퍼파라미터 설정

In [41]:
train_instance_type = ParameterString(name="TrainInstanceType", default_value="ml.c5.9xlarge")
train_instance_count = ParameterInteger(name="TrainInstanceCount", default_value=1)

In [42]:
epochs = ParameterInteger(name="Epochs", default_value=1)
learning_rate = ParameterFloat(name="LearningRate", default_value=0.00001)
weight_decay = ParameterFloat(name="WeightDecay", default_value=0.01)
train_batch_size = ParameterInteger(name="TrainBatchSize", default_value=4)
validation_batch_size = ParameterInteger(name="ValidationBatchSize", default_value=4)
test_batch_size = ParameterInteger(name="TestBatchSize", default_value=4)
train_volume_size = ParameterInteger(name="TrainVolumeSize", default_value=1024)
input_mode = ParameterString(name="InputMode", default_value="FastFile")
train_sample_percentage = ParameterFloat(name="TrainSamplePercentage", default_value=0.01)

### 모델 성능을 추적할 메트릭 설정

In [43]:
metrics_definitions = [
    {"Name": "train:loss", "Regex": "'train_loss': ([0-9\\.]+)"},
    {"Name": "validation:loss", "Regex": "'eval_loss': ([0-9\\.]+)"},
]

### 추정기(Estimator) 생성

추정기(Estimator)와 입력 데이터 세트를 구성합니다. 일반적인 학습 스크립트는 입력 채널에서 데이터를 불러오고, 하이퍼파라미터로 학습을 구성하고, 모델을 학습하고, 나중에 호스팅할 수 있도록 `model_dir`에 모델을 저장합니다.

또한 학습된 모델이 저장될 모델 경로를 지정합니다.

`train_instance_type` 매개변수는 파이프라인의 다른 위치에서도 사용할 수 있게 전달됩니다. 이 경우에 `train_instance_type`은 추정기(Estimator)에 전달됩니다.

In [44]:
from sagemaker.pytorch import PyTorch
import uuid

checkpoint_s3_prefix = "checkpoints/{}".format(str(uuid.uuid4()))
checkpoint_s3_uri = "s3://{}/{}/".format(bucket, checkpoint_s3_prefix)

estimator = PyTorch(
    entry_point="train.py",
    source_dir="src",
    role=role,
    instance_count=train_instance_count,
    instance_type=train_instance_type,
    volume_size=train_volume_size,
    py_version="py39",
    framework_version="1.13",
    hyperparameters={
        "epochs": epochs,
        "learning_rate": learning_rate,
        "weight_decay": weight_decay,        
        "train_batch_size": train_batch_size,
        "validation_batch_size": validation_batch_size,
        "test_batch_size": test_batch_size,
        "model_checkpoint": model_checkpoint,
        "train_sample_percentage": train_sample_percentage,
    },
    input_mode=input_mode,
    metric_definitions=metrics_definitions,
)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


### 학습 단계 구성

마지막으로 추정기 인스턴스를 사용해 `TrainingStep`을 구성하고 `TrainingStep` 입력에서 이전 `ProcessingStep`의 `Properties`를 입력으로 사용하고 파이프라인이 실행될 때 실행될 코드를 함께 구성합니다. 이는 기존 파이썬 SDK에 익숙한 경우 추정기(Estimator)의 `fit` 메서드와 매우 유사합니다.

특히 `TrainingStep`에 `"train"`, `"validation"` 및 `"test"` 출력 채널의 `S3Uri`를 전달합니다. 워크플로우 단계의 `properties` 속성은 대응하는 API 설명 호출의 응답 모델과 일치합니다. 이러한 속성은 자리 표시자 값으로 참조될 수 있으며, 실행 시점에 해결되거나 채워집니다. 예를 들어, `ProcessingStep`의 `properties` 속성은 [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) 응답 모델과 일치합니다.

In [45]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name="Train",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
        ),
        "validation": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
        ),
        "test": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
        ),
    },
    cache_config=cache_config,
)

print(training_step)

TrainingStep(name='Train', display_name=None, description=None, step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None)




# 평가 단계

먼저 모델 평가를 수행하기 위해서 처리 단계에서 지정할 평가 스크립트를 개발합니다.

평가 스크립트 `evaluate_model_metrics.py`는 학습된 모델과 테스트 데이터 세트를 입력으로 받아, 평가 지표가 포함된 JSON 파일을 생성합니다.

파이프라인 실행 후, 생성된 `evaluation.json` 파일을 분석을 위해 검토합니다.

평가 스크립트:

* 모델을 불러옵니다.
* 테스트 데이터를 읽어옵니다.
* 테스트 데이터에 대한 여러 예측을 수행합니다.
* 평가 보고서를 작성합니다.
* 평가 보고서를 평가 디렉터리에 저장합니다.

다음으로 `SKLearnProcessor` 인스턴스를 생성하고 이를 `ProcessingStep`에서 사용합니다.

프로세서에 전달된 `processing_instance_type` 매개변수에 주목하세요.

In [46]:
from sagemaker.sklearn.processing import SKLearnProcessor

evaluation_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={"AWS_DEFAULT_REGION": region},
    max_runtime_in_seconds=432000,
)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


### 위의 `경고(WARNING)`를 무시하세요.

프로세서 인스턴스를 사용해 `ProcessingStep`을 구성하고, 입력 및 출력 채널과 파이프라인이 실행될 때 실행될 코드를 함께 구성합니다. 이는 기존 파이썬 SDK에 익숙한 경우, 프로세서 인스턴스의 `run` 메서드와 매우 유사합니다.

`TrainingStep`과 `ProcessingStep`의 `properties` 속성은 각각 [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) 및 [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) 응답 객체의 모델과 일치합니다.

In [47]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(name="EvaluationReport", output_name="metrics", path="evaluation.json")

In [48]:
evaluation_step = ProcessingStep(
    name="EvaluateModel",
    processor=evaluation_processor,
    code="evaluate_model_metrics.py",
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/input/model"            
            
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/input/data"       
        ),
    ],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output/metrics/",
            output_name="metrics", 
            s3_upload_mode="EndOfJob"            
        ),
    ],
    property_files=[evaluation_report],
)

In [49]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

print(model_metrics)



<sagemaker.model_metrics.ModelMetrics object at 0x7ff12b1f2590>


# 모델 등록 단계

![](img/pipeline-5.png)

학습 단계에 사용된 추정기 인스턴스를 사용해 `RegisterModel` 인스턴스를 생성합니다. 파이프라인에서 `RegisterModel`을 실행한 결과는 모델 패키지입니다. 모델 패키지는 추론에 필요한 모든 요소를 패키지화한 재사용 가능한 모델 아티팩트 추상화입니다. 기본적으로 이 패키지는 사용할 추론 이미지를 정의하는 추론 사양과 선택적으로 모델 가중치 위치로 구성됩니다.

모델 패키지 그룹은 모델 패키지의 모음입니다. 특정 ML 비즈니스 문제를 위해 모델 패키지 그룹을 생성하고 해당 그룹에 버전/모델 패키지를 계속 추가할 수 있습니다. 일반적으로 고객은 세이지메이커 워크플로우 파이프라인을 위해 모델 패키지 그룹을 생성해, 각 워크플로우 파이프라인 실행마다 그룹에 버전/모델 패키지를 계속 추가할 수 있습니다.

`RegisterModel`의 구성은 기존 파이썬 SDK에 익숙한 경우 추정기 인스턴스의 `register` 메서드와 매우 유사합니다.

특히 `TrainingStep`의 `properties`에서 `S3ModelArtifacts`를 `RegisterModel`에 전달합니다. `TrainingStep`의 `properties` 속성은 [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) 응답 객체의 모델과 일치합니다.

특히 모델 레지스트리 및 CI/CD 작업에서 나중에 사용할 특정 모델 패키지 그룹 이름을 제공합니다.

In [50]:
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")

deploy_instance_type = ParameterString(name="DeployInstanceType", default_value="ml.m5.4xlarge")

deploy_instance_count = ParameterInteger(name="DeployInstanceCount", default_value=1)

In [51]:
import time

timestamp = int(time.time())

model_package_group_name = f"Summarization-{timestamp}"

print(model_package_group_name)

Summarization-1698262850


In [52]:
inference_image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=region,
    version="1.13",
    instance_type=deploy_instance_type,
    image_scope="inference",
)
print(inference_image_uri)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py39


763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference:1.13-cpu-py39


In [53]:
from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
    name="Summarization",
    estimator=estimator,
    image_uri=inference_image_uri,  # we have to specify, by default it's using training image
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/jsonlines"],
    response_types=["application/jsonlines"],
    inference_instances=[deploy_instance_type],
    transform_instances=[deploy_instance_type],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

# 배포를 위한 모델 생성 단계

In [54]:
from sagemaker.model import Model

model_name = "model-{}".format(timestamp)

model = Model(
    name=model_name,
    image_uri=inference_image_uri,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

In [55]:
from sagemaker.inputs import CreateModelInput

create_inputs = CreateModelInput(
    instance_type=deploy_instance_type,
)

In [56]:
from sagemaker.workflow.steps import CreateModelStep

create_step = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs=create_inputs,
)

# 조건부 배포 단계
![](img/pipeline-6.png)

마지막으로 평가 단계에서 결정된 모델의 메트릭이 지정된 임계값을 초과하는 경우에만 이 모델을 등록하고자 합니다. `ConditionStep`을 사용하면 파이프라인에서 단계 속성의 조건에 따라 파이프라인 DAG에서 조건부 실행할 수 있습니다.

아래에서 다음을 수행합니다.

* 평가 단계 출력에 있는 평가 메트릭에 대한 조건을 정의합니다.
* `ConditionStep`의 조건 목록에서 조건을 사용합니다.
* `ConditionStep`의 `if_steps`에 `RegisterModel` 단계 컬렉션을 전달합니다.

In [57]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)

min_rouge_value = ParameterFloat(name="MinRouge1Value", default_value=0.005)

min_rouge_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=evaluation_step,
        property_file=evaluation_report,
        json_path="metrics.eval_rouge1.value",
    ),
    right=min_rouge_value,  # eval_loss
)

min_rouge_condition_step = ConditionStep(
    name="EvaluationCondition",
    conditions=[min_rouge_condition],
    if_steps=[register_step, create_step],  # success, continue with model registration
    else_steps=[],  # fail, end the pipeline
)

See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


# 파라미터, 단계, 조건으로 구성된 파이프라인 정의하기

이제 모든 것을 하나의 워크플로우 파이프라인으로 묶어 실행하고, 심지어 예약할 수 있도록 설정해봅니다.

파이프라인에는 `이름(name)`, `파라미터(parameters)`, 그리고 `단계(steps)`가 필요합니다. 이름은 `(계정(account), 지역(region))` 쌍 내에서 고유해야 하므로 이름에 타임스탬프를 추가합니다.

참고 사항

* 정의에 사용된 모든 파라미터는 존재해야 합니다.
* 파이프라인에 전달된 단계는 실행 순서에 있을 필요는 없습니다. 세이지메이커 워크플로우 서비스는 데이터 종속성 DAG를 해결해 단계별로 실행을 완료합니다.
* 단계는 파이프라인 단계 목록 또는 단일 조건부 단계의 if/else 목록에 고유해야 합니다.

# 파이프라인을 세이지메이커에 등록해 실행하기

이제 파이프라인 정의를 워크플로우 서비스에 등록합니다. 전달된 역할은 워크플로우 서비스가 단계별로 정의된 모든 작업을 생성하는데 사용됩니다.

# 파이프라인 생성

### 아래의 `경고(WARNING)`를 무시하세요.

In [58]:
from sagemaker.workflow.pipeline import Pipeline

existing_pipelines = 0

existing_pipelines_response = sm.list_pipelines(
    PipelineNamePrefix=pipeline_name,
    SortOrder="Descending",
)

if "PipelineSummaries" in existing_pipelines_response.keys():
    if len(existing_pipelines_response["PipelineSummaries"]) > 0:
        existing_pipelines = existing_pipelines + 1
        print("[INFO] You already have created {} pipeline with name {}.".format(existing_pipelines, pipeline_name))
    else:
        pass

if existing_pipelines == 0:  # 파이프라인은 한번만 생성합니다.
    pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            input_data,
            processing_instance_count,
            processing_instance_type,
            train_split_percentage,
            validation_split_percentage,
            test_split_percentage,
            train_instance_type,
            train_instance_count,
            epochs,
            learning_rate,
            weight_decay,
            train_sample_percentage,
            train_batch_size,
            validation_batch_size,
            test_batch_size,
            train_volume_size,
            input_mode,
            min_rouge_value,
            model_approval_status,
            deploy_instance_type,
            deploy_instance_count,
            model_checkpoint.to_string(),
        ],
        steps=[processing_step, training_step, evaluation_step, min_rouge_condition_step],
        sagemaker_session=sess,
    )

    pipeline.upsert(role_arn=role)["PipelineArn"]
    print("Created pipeline with name {}".format(pipeline_name))
else:
    print(
        "****************************************************************************************************************"
    )
    print(
        "You have already create a pipeline with the name {}. This is OK. Please continue to the next cell.".format(
            pipeline_name
        )
    )
    print(
        "****************************************************************************************************************"
    )

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Using provided s3_resource


INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Created pipeline with name dialogue-summary-pipeline-1698262768


### 위의 `경고(WARNING)`를 무시하세요.

# 파이프라인 시작

### 아래의 `경고(WARNING)`를 무시하세요.

In [59]:
running_executions = 0
completed_executions = 0

if existing_pipelines > 0:
    existing_pipeline_executions_response = sm.list_pipeline_executions(
        PipelineName=pipeline_name,
        SortOrder="Descending",
    )

    if "PipelineExecutionSummaries" in existing_pipeline_executions_response.keys():
        if len(existing_pipeline_executions_response["PipelineExecutionSummaries"]) > 0:
            execution = existing_pipeline_executions_response["PipelineExecutionSummaries"][0]
            if "PipelineExecutionStatus" in execution:
                if execution["PipelineExecutionStatus"] == "Executing":
                    running_executions = running_executions + 1
                else:
                    completed_executions = completed_executions + 1

            print(
                "[INFO] You have {} Pipeline execution(s) currently running and {} execution(s) completed.".format(
                    running_executions, completed_executions
                )
            )
    else:
        pass
else:
    pass

if running_executions == 0:  # 한 번에 하나의 파이프라인 실행만 허용해서 필요한 리소스를 제한합니다.
    execution = pipeline.start()
    running_executions = running_executions + 1
    print("Started pipeline {}.  Ignore any warnings above.".format(pipeline_name))
    print(execution.arn)
else:
    print(
        "********************************************************************************************************************"
    )
    print(
        "You have already launched {} pipeline execution(s).  This is OK.  Please continue to see the next cell.".format(
            running_executions
        )
    )
    print(
        "********************************************************************************************************************"
    )

arn:aws:sagemaker:us-west-2:079002598131:pipeline/dialogue-summary-pipeline-1698262768/execution/zn482a3g3fqo


### 위의 `경고(WARNING)`를 무시하세요.

# 파이프라인이 완료될 때까지 기다리기

### 다음 셀은 약 40분이 소요됩니다. 잠시 기다려 주세요.

In [60]:
%%time

import time
from pprint import pprint

executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

while pipeline_execution_status == "Executing":
    try:
        executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
        pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
    except Exception as e:
        print("Please wait...")
        time.sleep(30)

pprint(executions_response)

Executing
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
[{'PipelineExecutionArn': 'arn:aws:sagemaker:us-west-2:079002598131:pipeline/dialogue-summary-pipeline-1698262768/execution/zn482a3g3fqo',
  'PipelineExecutionDisplayName': 'execution-1698262875378',
  'PipelineExecutionStatus': 'Succeeded',
  'StartTime': datetime.datetime(2023, 10, 25, 19, 41, 15, 325000, tzinfo=tzlocal())}]
CPU times: user 12.4 s, sys: 766 ms, total: 13.2 s
Wall time: 23min 3s


### 위의 파이프라인이 완료될 때까지 기다려 주세요.

# 파이프라인 완료 후 실행 단계 및 상태 목록

In [61]:
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
pipeline_execution_arn = executions_response[0]["PipelineExecutionArn"]

print("Pipeline execution status {}".format(pipeline_execution_status))
print("Pipeline execution arn {}".format(pipeline_execution_arn))

Pipeline execution status Succeeded
Pipeline execution arn arn:aws:sagemaker:us-west-2:079002598131:pipeline/dialogue-summary-pipeline-1698262768/execution/zn482a3g3fqo


In [62]:
from pprint import pprint

steps = sm.list_pipeline_execution_steps(PipelineExecutionArn=pipeline_execution_arn)

pprint(steps)

{'PipelineExecutionSteps': [{'AttemptCount': 0,
                             'EndTime': datetime.datetime(2023, 10, 25, 20, 4, 19, 538000, tzinfo=tzlocal()),
                             'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-west-2:079002598131:model-package/Summarization-1698262850/1'}},
                             'StartTime': datetime.datetime(2023, 10, 25, 20, 4, 18, 648000, tzinfo=tzlocal()),
                             'StepName': 'Summarization-RegisterModel',
                             'StepStatus': 'Succeeded'},
                            {'AttemptCount': 0,
                             'EndTime': datetime.datetime(2023, 10, 25, 20, 4, 19, 908000, tzinfo=tzlocal()),
                             'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-west-2:079002598131:model/pipelines-zn482a3g3fqo-createmodel-jurnxxdlaz'}},
                             'StartTime': datetime.datetime(2023, 10, 25, 20, 4, 18, 648000, tzinfo=tzlocal()),
                    

# 파이프라인이 생성한 모든 아티팩트 목록

In [63]:
processing_job_name = None
training_job_name = None

In [64]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(steps["PipelineExecutionSteps"]):
    print(execution_step)
    # LineageTableVisualizer가 처리 단계(Processing Step)를 처리하는 데 버그가 있어서 이런 방식으로 수행합니다.
    if execution_step["StepName"] == "Processing":
        processing_job_name = execution_step["Metadata"]["ProcessingJob"]["Arn"].split("/")[-1]
        print(processing_job_name)
        display(viz.show(processing_job_name=processing_job_name))
    elif execution_step["StepName"] == "Train":
        training_job_name = execution_step["Metadata"]["TrainingJob"]["Arn"].split("/")[-1]
        print(training_job_name)
        display(viz.show(training_job_name=training_job_name))
    else:
        display(viz.show(pipeline_execution_step=execution_step))
        time.sleep(5)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
{'StepName': 'Processing', 'StartTime': datetime.datetime(2023, 10, 25, 19, 41, 16, 584000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 10, 25, 19, 48, 5, 957000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:079002598131:processing-job/pipelines-zn482a3g3fqo-Processing-HHXWfIoro5'}}}
pipelines-zn482a3g3fqo-Processing-HHXWfIoro5


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...864640ba832fa6b/input/code/preprocess.py,Input,DataSet,ContributedTo,artifact
1,s3://...-west-2-079002598131/data-summarization/,Input,DataSet,ContributedTo,artifact
2,24661...om/sagemaker-scikit-learn:0.23-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...2768/zn482a3g3fqo/Processing/output/test,Output,DataSet,Produced,artifact
4,s3://...n482a3g3fqo/Processing/output/validation,Output,DataSet,Produced,artifact
5,s3://...768/zn482a3g3fqo/Processing/output/train,Output,DataSet,Produced,artifact


{'StepName': 'Train', 'StartTime': datetime.datetime(2023, 10, 25, 19, 48, 6, 762000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 10, 25, 19, 55, 36, 310000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:079002598131:training-job/pipelines-zn482a3g3fqo-Train-mW5MOxvAl8'}}}
pipelines-zn482a3g3fqo-Train-mW5MOxvAl8


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...2768/zn482a3g3fqo/Processing/output/test,Input,DataSet,ContributedTo,artifact
1,s3://...n482a3g3fqo/Processing/output/validation,Input,DataSet,ContributedTo,artifact
2,s3://...768/zn482a3g3fqo/Processing/output/train,Input,DataSet,ContributedTo,artifact
3,76310...onaws.com/pytorch-training:1.13-cpu-py39,Input,Image,ContributedTo,artifact
4,s3://...fqo-Train-mW5MOxvAl8/output/model.tar.gz,Output,Model,Produced,artifact


{'StepName': 'EvaluateModel', 'StartTime': datetime.datetime(2023, 10, 25, 19, 55, 37, 423000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 10, 25, 20, 4, 16, 660000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:079002598131:processing-job/pipelines-zn482a3g3fqo-EvaluateModel-CaKjzdlrUm'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...cb1/input/code/evaluate_model_metrics.py,Input,DataSet,ContributedTo,artifact
1,s3://...2768/zn482a3g3fqo/Processing/output/test,Input,DataSet,ContributedTo,artifact
2,s3://...fqo-Train-mW5MOxvAl8/output/model.tar.gz,Input,Model,ContributedTo,artifact
3,24661...om/sagemaker-scikit-learn:0.23-1-cpu-py3,Input,Image,ContributedTo,artifact
4,s3://...7c439c38c9a334a10e12fbcb1/output/metrics,Output,DataSet,Produced,artifact


{'StepName': 'EvaluationCondition', 'StartTime': datetime.datetime(2023, 10, 25, 20, 4, 17, 696000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 10, 25, 20, 4, 18, 222000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'Condition': {'Outcome': 'True'}}}


None

{'StepName': 'CreateModel', 'StartTime': datetime.datetime(2023, 10, 25, 20, 4, 18, 648000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 10, 25, 20, 4, 19, 908000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-west-2:079002598131:model/pipelines-zn482a3g3fqo-createmodel-jurnxxdlaz'}}}


None

{'StepName': 'Summarization-RegisterModel', 'StartTime': datetime.datetime(2023, 10, 25, 20, 4, 18, 648000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2023, 10, 25, 20, 4, 19, 538000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-west-2:079002598131:model-package/Summarization-1698262850/1'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...fqo-Train-mW5MOxvAl8/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,76310...naws.com/pytorch-inference:1.13-cpu-py39,Input,Image,ContributedTo,artifact
2,Summarization-1698262850-1-PendingManualApprov...,Input,Approval,ContributedTo,action
3,Summarization-1698262850-1698264259-aws-model-...,Output,ModelGroup,AssociatedWith,context


## 실행 결과를 실험의 시도로 추가하기

In [65]:
# -aws-processing-job은 ProcessingJob에 의해 할당된 기본 이름입니다.
processing_job_tc = "{}-aws-processing-job".format(processing_job_name)
print(processing_job_tc)

pipelines-zn482a3g3fqo-Processing-HHXWfIoro5-aws-processing-job


In [66]:
response = sm.associate_trial_component(TrialComponentName=processing_job_tc, TrialName=pipeline_trial_name)

In [67]:
# -aws-training-job은 TrainingJob에 의해 할당된 기본 이름입니다.
training_job_tc = "{}-aws-training-job".format(training_job_name)
print(training_job_tc)

pipelines-zn482a3g3fqo-Train-mW5MOxvAl8-aws-training-job


In [68]:
response = sm.associate_trial_component(TrialComponentName=training_job_tc, TrialName=pipeline_trial_name)