# Amazon SageMaker Model Monitor
이 노트북은 다음 내용을 보여줍니다:
* Amazon SageMaker에서 머신 러닝 모델을 호스팅하고 추론 요청, 결과 및 메타데이터 캡처하기
* 기준 제약 조건을 생성하기 위해 학습 데이터셋 분석하기
* 제약 조건 위반에 대해 실시간 엔드포인트 또는 배치 변환 모니터링하기

---
## 배경

Amazon SageMaker는 모든 개발자와 데이터 사이언티스트에게 머신 러닝 모델을 빠르게 구축, 학습 및 배포할 수 있는 능력을 제공합니다. Amazon SageMaker는 전체 머신 러닝 워크플로우를 포괄하는 완전 관리형 서비스입니다. 데이터에 레이블을 지정하고 준비하며, 알고리즘을 선택하고, 모델을 학습시킨 다음 배포를 위해 튜닝 및 최적화할 수 있습니다. Amazon SageMaker를 사용하여 모델을 프로덕션에 배포하면 예측을 수행하고 이전보다 더 낮은 비용으로 운영할 수 있습니다.

또한 Amazon SageMaker를 사용하면 배포한 모델의 호출에 대한 입력, 출력 및 메타데이터를 캡처할 수 있습니다. 데이터를 분석하고 품질을 모니터링할 수도 있습니다. 이 노트북에서는 Amazon SageMaker가 이러한 기능을 어떻게 제공하는지 배우게 됩니다.

---
## 설정

시작하려면 다음 사전 요구 사항이 완료되었는지 확인하세요.

* 모델을 호스팅할 AWS 리전을 지정합니다.
* Amazon SageMaker에 Amazon Simple Storage Service(Amazon S3)의 데이터에 접근할 수 있는 권한을 부여하는 데 사용되는 IAM 역할 ARN이 있어야 합니다. 필요한 권한을 미세 조정하는 방법은 문서를 참조하세요.
* 모델 학습에 사용된 데이터, 추가 모델 데이터 및 모델 호출에서 캡처된 데이터를 저장하는 데 사용되는 S3 버킷을 생성합니다. 시연 목적으로 이들 모두에 동일한 버킷을 사용하고 있습니다. 실제로는 서로 다른 보안 정책으로 이들을 분리하고 싶을 수 있습니다.

In [None]:
%%time
# cell 01

# Handful of configuration
%pip install boto3 --upgrade

import os
import boto3
import re
import json
from sagemaker import get_execution_role, session

region= boto3.Session().region_name

role = get_execution_role()
print("RoleArn: {}".format(role))

# You can use a different bucket, but make sure the role you chose for this notebook
# has the s3:PutObject permissions. This is the bucket into which the data is captured
bucket =  session.Session(boto3.Session()).default_bucket()
print("Demo Bucket: {}".format(bucket))
prefix = 'sagemaker/DEMO-ModelMonitor'

data_capture_prefix = '{}/datacapture'.format(prefix)
s3_capture_upload_path = 's3://{}/{}'.format(bucket, data_capture_prefix)
reports_prefix = '{}/reports'.format(prefix)
s3_report_path = 's3://{}/{}'.format(bucket,reports_prefix)
code_prefix = '{}/code'.format(prefix)
s3_code_preprocessor_uri = 's3://{}/{}/{}'.format(bucket,code_prefix, 'preprocessor.py')
s3_code_postprocessor_uri = 's3://{}/{}/{}'.format(bucket,code_prefix, 'postprocessor.py')

print("Capture path: {}".format(s3_capture_upload_path))
print("Report path: {}".format(s3_report_path))
print("Preproc Code path: {}".format(s3_code_preprocessor_uri))
print("Postproc Code path: {}".format(s3_code_postprocessor_uri))

이 노트북의 실행 역할이 진행에 필요한 권한을 가지고 있는지 빠르게 확인할 수 있습니다. 위에서 지정한 S3 버킷에 간단한 테스트 객체를 넣어보세요. 이 명령이 실패하면, 해당 역할에 버킷에 대한 `s3:PutObject` 권한을 추가한 후 다시 시도하세요.

In [None]:
# cell 02
# Upload some test files
boto3.Session().resource('s3').Bucket(bucket).Object("test_upload/test.txt").upload_file('test_data/upload-test-file.txt')
print("Success! You are all set to proceed.")

# 옵션 1: 실시간 엔드포인트를 통한 모델 모니터링

## PART A: Amazon SageMaker 엔드포인트에서 실시간 추론 데이터 캡처하기
데이터 캡처 기능을 실제로 보여주기 위한 엔드포인트를 생성합니다.

### 사전 훈련된 모델을 Amazon S3에 업로드하기
이 코드는 배포할 준비가 된 사전 훈련된 XGBoost 모델을 업로드합니다. 이 모델은 SageMaker의 XGB Churn Prediction 노트북을 사용하여 훈련되었습니다. 이 단계에서 자신의 사전 훈련된 모델을 사용할 수도 있습니다. 이미 Amazon S3에 사전 훈련된 모델이 있다면, s3_key를 지정하여 추가할 수 있습니다.

In [None]:
# cell 03
model_file = open("model/xgb-churn-prediction-model.tar.gz", 'rb')
s3_key = os.path.join(prefix, 'xgb-churn-prediction-model.tar.gz')
boto3.Session().resource('s3').Bucket(bucket).Object(s3_key).upload_fileobj(model_file)

### Amazon SageMaker에 모델 배포하기
사전 훈련된 이탈 예측 모델을 배포하는 것부터 시작하겠습니다. 여기서는 이미지와 모델 데이터로 모델 객체를 생성합니다.

In [None]:
# cell 04
from time import gmtime, strftime
from sagemaker.model import Model
from sagemaker.image_uris import retrieve

model_name = "DEMO-xgb-churn-pred-model-monitor-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
model_url = 'https://{}.s3-{}.amazonaws.com/{}/xgb-churn-prediction-model.tar.gz'.format(bucket, region, prefix)
image_uri = retrieve(region=boto3.Session().region_name, framework='xgboost', version='0.90-2')

model = Model(image_uri=image_uri, model_data=model_url, role=role)

모델 데이터 품질 모니터링을 위한 데이터 캡처를 활성화하기 위해, `DataCaptureConfig`라는 새로운 캡처 옵션을 지정합니다. 이 구성을 통해 요청 페이로드, 응답 페이로드 또는 둘 다 캡처할 수 있습니다. 캡처 구성은 모든 변형에 적용됩니다. 배포를 진행하겠습니다.

In [None]:
# cell 05
from sagemaker.model_monitor import DataCaptureConfig

endpoint_name = 'DEMO-xgb-churn-pred-model-monitor-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("EndpointName={}".format(endpoint_name))

data_capture_config = DataCaptureConfig(
                        enable_capture=True,
                        sampling_percentage=100,
                        destination_s3_uri=s3_capture_upload_path)

predictor = model.deploy(initial_instance_count=1,
                instance_type='ml.m4.xlarge',
                endpoint_name=endpoint_name,
                data_capture_config=data_capture_config)

### 배포된 모델 호출하기

이제 이 엔드포인트로 데이터를 전송하여 실시간으로 추론을 얻을 수 있습니다. 이전 단계에서 데이터 캡처를 활성화했기 때문에, 요청 및 응답 페이로드는 일부 추가 메타데이터와 함께 DataCaptureConfig에서 지정한 Amazon Simple Storage Service(Amazon S3) 위치에 저장됩니다.

이 단계는 포함된 샘플 데이터로 약 2분 동안 엔드포인트를 호출합니다. 데이터는 지정된 샘플링 비율에 따라 캡처되며, 데이터 캡처 옵션이 꺼질 때까지 캡처가 계속됩니다.

In [None]:
# cell 06
from sagemaker.predictor import Predictor
import sagemaker
import time

predictor = Predictor(endpoint_name=endpoint_name, serializer=sagemaker.serializers.CSVSerializer())

# get a subset of test data for a quick test
!head -120 test_data/test-dataset-input-cols.csv > test_data/test_sample.csv
print("Sending test traffic to the endpoint {}. \nPlease wait...".format(endpoint_name))

with open('test_data/test_sample.csv', 'r') as f:
    for row in f:
        payload = row.rstrip('\n')
        response = predictor.predict(data=payload)
        time.sleep(0.5)
        
print("Done!")        

### 캡처된 데이터 보기

이제 Amazon S3에 저장된 데이터 캡처 파일을 나열해 보겠습니다. 호출이 발생한 시간을 기준으로 정리된 다양한 시간대의 파일들을 볼 수 있을 것입니다. Amazon S3 경로 형식은 다음과 같습니다:

`s3://{destination-bucket-prefix}/{endpoint-name}/{variant-name}/yyyy/mm/dd/hh/filename.jsonl`

In [None]:
# cell 07
s3_client = boto3.Session().client('s3')
current_endpoint_capture_prefix = '{}/{}'.format(data_capture_prefix, endpoint_name)
result = s3_client.list_objects(Bucket=bucket, Prefix=current_endpoint_capture_prefix)
capture_files = [capture_file.get("Key") for capture_file in result.get('Contents')]
print("Found Capture Files:")
print("\n ".join(capture_files))

다음으로, 단일 캡처 파일의 내용을 확인해보겠습니다. 여기서는 Amazon SageMaker 특정 JSON-line 형식의 파일에 캡처된 모든 데이터를 볼 수 있습니다. 캡처된 파일의 처음 몇 줄을 빠르게 살펴보겠습니다.

In [None]:
# cell 08
def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get('Body').read().decode("utf-8")

capture_file = get_obj_body(capture_files[-1])
print(capture_file[:2000])

I'll translate the content once you provide the JSON file. Please share the content you'd like me to translate to Korean.

In [None]:
# cell 09
import json
print(json.dumps(json.loads(capture_file.split('\n')[0]), indent=2))

보시다시피, 각 추론 요청은 jsonl 파일에서 한 줄로 캡처됩니다. 이 줄에는 입력과 출력이 함께 병합되어 있습니다. 예제에서는 ContentType을 `text/csv`로 제공했으며, 이는 `observedContentType` 값에 반영됩니다. 또한 캡처 형식에서 입력 및 출력 페이로드를 인코딩하는 데 사용한 인코딩을 `encoding` 값으로 표시합니다.

요약하자면, 새로운 파라미터를 사용하여 엔드포인트에 대한 입력 또는 출력 페이로드를 캡처하는 방법을 살펴보았습니다. 또한 캡처된 형식이 Amazon S3에서 어떻게 보이는지도 확인했습니다. 다음으로, Amazon SageMaker가 Amazon S3에 수집된 데이터를 모니터링하는 데 어떻게 도움이 되는지 계속 살펴보겠습니다.

## PART B: 모델 모니터 - 기준선 설정 및 지속적 모니터링

데이터를 수집하는 것 외에도, Amazon SageMaker는 엔드포인트에서 관찰된 데이터를 모니터링하고 평가할 수 있는 기능을 제공합니다. 이를 위해:
1. 실시간 트래픽을 비교할 기준선(베이스라인)을 생성합니다.
1. 기준선이 준비되면, 지속적으로 평가하고 기준선과 비교하는 일정을 설정합니다.

### 1. 제약 조건 제안과 기준/훈련 데이터셋

모델을 훈련시킨 훈련 데이터셋은 일반적으로 좋은 기준 데이터셋입니다. 훈련 데이터셋의 데이터 스키마와 추론 데이터셋의 스키마가 정확히 일치해야 함(즉, 특성의 수와 순서)에 유의하세요.

훈련 데이터셋에서 Amazon SageMaker에 기준 `constraints`(제약 조건)를 제안하고 데이터를 탐색하기 위한 설명적인 `statistics`(통계)를 생성하도록 요청할 수 있습니다. 이 예제에서는 이 예제에 포함된 사전 훈련된 모델을 훈련시키는 데 사용된 훈련 데이터셋을 업로드하세요. 이미 Amazon S3에 데이터셋이 있다면 직접 가리킬 수 있습니다.

In [None]:
# cell 10
# copy over the training dataset to Amazon S3 (if you already have it in Amazon S3, you could reuse it)
baseline_prefix = prefix + '/baselining'
baseline_data_prefix = baseline_prefix + '/data'
baseline_results_prefix = baseline_prefix + '/results'

baseline_data_uri = 's3://{}/{}'.format(bucket,baseline_data_prefix)
baseline_results_uri = 's3://{}/{}'.format(bucket, baseline_results_prefix)
print('Baseline data uri: {}'.format(baseline_data_uri))
print('Baseline results uri: {}'.format(baseline_results_uri))


In [None]:
# cell 11
training_data_file = open("test_data/training-dataset-with-header.csv", 'rb')
s3_key = os.path.join(baseline_prefix, 'data', 'training-dataset-with-header.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(s3_key).upload_fileobj(training_data_file)

#### 학습 데이터셋으로 기준 작업 생성하기

이제 Amazon S3에 학습 데이터가 준비되었으니, 제약 조건을 `suggest`하는 작업을 시작하겠습니다. `DefaultModelMonitor.suggest_baseline(..)`은 제약 조건을 생성하기 위해 Amazon SageMaker에서 제공하는 Model Monitor 컨테이너를 사용하여 `ProcessingJob`을 시작합니다.

In [None]:
# cell 12
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

my_default_monitor_baseline = my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri+'/training-dataset-with-header.csv',
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True
)

#### 생성된 제약 조건 및 통계 탐색하기

In [None]:
# cell 13
s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Files:")
print("\n ".join(report_files))

In [None]:
# cell 14
import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

In [None]:
# cell 15
constraints_df = pd.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df.head(10)

### 2. 데이터 품질 문제를 위한 수집된 데이터 분석하기

위에서 데이터를 수집했으면, Monitoring Schedules를 사용하여 데이터를 분석하고 모니터링하세요.

#### 일정 생성하기

In [None]:
# cell 16
# First, copy over some test scripts to the S3 bucket so that they can be used for pre and post processing
boto3.Session().resource('s3').Bucket(bucket).Object(code_prefix+"/preprocessor.py").upload_file('preprocessor.py')
boto3.Session().resource('s3').Bucket(bucket).Object(code_prefix+"/postprocessor.py").upload_file('postprocessor.py')

앞서 생성한 엔드포인트에 대한 모델 모니터링 일정을 생성할 수 있습니다. 기준 리소스(제약 조건 및 통계)를 사용하여 실시간 트래픽과 비교할 수 있습니다.

In [None]:
# cell 17
from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

mon_schedule_name = 'DEMO-xgb-churn-pred-model-monitor-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint_name,
    #record_preprocessor_script=pre_processor_script,
    post_analytics_processor_script=s3_code_postprocessor_uri,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,

)

#### 인공 트래픽 생성 시작하기
아래 셀은 엔드포인트로 트래픽을 보내는 스레드를 시작합니다. 이 스레드를 종료하려면 커널을 중지해야 합니다. 트래픽이 없으면 처리할 데이터가 없기 때문에 모니터링 작업이 `Failed`로 표시됩니다.

In [None]:
# cell 18
from threading import Thread
from time import sleep
import time

endpoint_name=predictor.endpoint_name
runtime_client = boto3.client('runtime.sagemaker')

# (just repeating code from above for convenience/ able to run this section independently)
def invoke_endpoint(ep_name, file_name, runtime_client):
    with open(file_name, 'r') as f:
        for row in f:
            payload = row.rstrip('\n')
            response = runtime_client.invoke_endpoint(EndpointName=ep_name,
                                          ContentType='text/csv', 
                                          Body=payload)
            response['Body'].read()
            time.sleep(1)
            
def invoke_endpoint_forever():
    while True:
        invoke_endpoint(endpoint_name, 'test_data/test-dataset-input-cols.csv', runtime_client)
        
thread = Thread(target = invoke_endpoint_forever)
thread.start()

# Note that you need to stop the kernel to stop the invocations

#### 일정 설명 및 검사하기
일정을 설명하면 MonitoringScheduleStatus가 Scheduled로 변경되는 것을 확인할 수 있습니다.

In [None]:
# cell 19
desc_schedule_result = my_default_monitor.describe_schedule()
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

#### 실행 목록 보기
일정은 이전에 지정한 간격으로 작업을 시작합니다. 여기서는 최근 다섯 번의 실행을 나열합니다. 시간별 일정을 생성한 후 이를 시작하는 경우, 실행 목록이 비어 있을 수 있습니다. 실행이 시작되는 것을 보려면 시간 경계(UTC 기준)를 넘길 때까지 기다려야 할 수 있습니다. 아래 코드에는 대기 로직이 포함되어 있습니다.

참고: 시간별 일정이라도 Amazon SageMaker는 실행을 예약하기 위해 20분의 버퍼 기간을 가집니다. 실행이 시간 경계로부터 0분에서 약 20분 사이에 시작되는 것을 볼 수 있습니다. 이는 예상된 동작이며 백엔드의 부하 분산을 위해 수행됩니다.

In [None]:
# cell 20
mon_executions = my_default_monitor.list_executions()
print("We created a hourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.\nWe will have to wait till we hit the hour...")

while len(mon_executions) == 0:
    print("Waiting for the 1st execution to happen...")
    time.sleep(60)
    mon_executions = my_default_monitor.list_executions()    

#### 특정 실행 검사하기(최신 실행)
이전 셀에서 최신 완료 또는 실패한 예약 실행을 선택했습니다. 다음은 가능한 최종 상태와 각각의 의미입니다:
* Completed - 모니터링 실행이 완료되었으며 위반 보고서에서 문제가 발견되지 않았음을 의미합니다.
* CompletedWithViolations - 실행은 완료되었지만 제약 조건 위반이 감지되었음을 의미합니다.
* Failed - 클라이언트 오류(아마도 잘못된 역할 권한) 또는 인프라 문제로 인해 모니터링 실행이 실패했습니다. 정확히 무슨 일이 일어났는지 확인하려면 FailureReason과 ExitMessage를 추가로 검토해야 합니다.
* Stopped - 작업이 최대 실행 시간을 초과했거나 수동으로 중지되었습니다.

In [None]:
# cell 21
latest_execution = mon_executions[-1] # latest execution's index is -1, second to last is -2 and so on..
time.sleep(60)
latest_execution.wait(logs=False)

print("Latest execution status: {}".format(latest_execution.describe()['ProcessingJobStatus']))
print("Latest execution result: {}".format(latest_execution.describe()['ExitMessage']))

latest_job = latest_execution.describe()
if (latest_job['ProcessingJobStatus'] != 'Completed'):
        print("====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures.")

In [None]:
# cell 22
report_uri=latest_execution.output.destination
print('Report Uri: {}'.format(report_uri))

#### 생성된 보고서 목록

In [None]:
# cell 23
from urllib.parse import urlparse
s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip('/')
print('Report bucket: {}'.format(report_bucket))
print('Report key: {}'.format(report_key))

s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Report Files:")
print("\n ".join(report_files))

#### 위반 보고서

기준선과 비교하여 위반 사항이 있으면 여기에 나열됩니다.

In [None]:
# cell 24
violations = my_default_monitor.latest_monitoring_constraint_violations()
pd.set_option('display.max_colwidth', None)
constraints_df = pd.json_normalize(violations.body_dict["violations"])
constraints_df.head(10)

#### 기타 명령어
모니터링 일정을 시작하고 중지할 수도 있습니다.

In [None]:
# cell 25
#my_default_monitor.stop_monitoring_schedule()
#my_default_monitor.start_monitoring_schedule()

## 리소스 삭제하기

데이터를 계속 캡처하기 위해 엔드포인트를 실행 상태로 유지할 수 있습니다. 더 많은 데이터를 수집하거나 이 엔드포인트를 더 이상 사용할 계획이 없다면, 추가 요금이 발생하지 않도록 엔드포인트를 삭제해야 합니다. 엔드포인트를 삭제해도 모델 호출 중에 캡처된 데이터는 삭제되지 않습니다. 해당 데이터는 직접 삭제할 때까지 Amazon S3에 유지됩니다.

하지만 그 전에 먼저 일정을 삭제해야 합니다.

In [None]:
# cell 26
my_default_monitor.delete_monitoring_schedule()
time.sleep(60) # actually wait for the deletion

In [None]:
# cell 27
predictor.delete_endpoint()

In [None]:
# cell 28
predictor.delete_model()

# 옵션 2: 배치 변환을 통한 모델 모니터링

## 파트 A: 배치 변환 작업에서 데이터 캡처하기
데이터 캡처 기능을 실제로 보여주기 위해 배치 변환 작업을 생성합니다.

### 1) 사전 훈련된 모델을 Amazon S3에 업로드하기
이 코드는 배포할 준비가 된 사전 훈련된 XGBoost 모델을 업로드합니다. 이 모델은 SageMaker의 XGB Churn Prediction 노트북을 사용하여 훈련되었습니다. 이 단계에서 자신의 사전 훈련된 모델을 사용할 수도 있습니다. Amazon S3에 이미 사전 훈련된 모델이 있는 경우, s3_key를 지정하여 추가할 수 있습니다.

In [None]:
model_file = open("model/xgb-churn-prediction-model.tar.gz", "rb")
s3_key = os.path.join(prefix, "xgb-churn-prediction-model.tar.gz")
boto3.Session().resource("s3").Bucket(bucket).Object(s3_key).upload_fileobj(model_file)

In [None]:
from time import gmtime, strftime
from sagemaker.model import Model
from sagemaker.image_uris import retrieve

model_name = "DEMO-xgb-churn-pred-model-monitor-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
model_url = "https://{}.s3-{}.amazonaws.com/{}/xgb-churn-prediction-model.tar.gz".format(
    bucket, region, prefix
)

image_uri = retrieve("xgboost", boto3.Session().region_name, "0.90-1")

model = Model(image_uri=image_uri, model_data=model_url, role=role)

### 2) 배치 추론에 사용될 테스트 데이터 업로드하기

In [None]:
!aws s3 cp test_data/test-dataset-input-cols.csv s3://{bucket}/transform-input/test-dataset-input-cols.csv

### 3) 배치 변환 작업 생성하기

In [None]:
from sagemaker.inputs import BatchDataCaptureConfig

In [None]:
transfomer = model.transformer(
    instance_count=1,
    instance_type="ml.m4.xlarge",
    accept="text/csv",
    assemble_with="Line",
)

transfomer.transform(
    "s3://{}/transform-input".format(bucket),
    content_type="text/csv",
    split_type="Line",
    # configure the data capturing
    batch_data_capture_config=BatchDataCaptureConfig(
        destination_s3_uri=s3_capture_upload_path,
    ),
    wait=True,
)

### 4) 배치 변환 캡처 데이터 검사하기

`s3_capture_upload_path` 아래에는 두 개의 디렉토리가 있습니다. 하나는 `/input`이고 다른 하나는 `/output`입니다. `/input` 아래에는 변환 입력에 대한 캡처된 데이터 파일이 있고, `/output` 아래에는 변환 출력에 대한 캡처된 데이터 파일이 있습니다. 배치 변환 데이터 캡처는 엔드포인트 데이터 캡처와 달리, 데이터를 캡처하여 s3에 로그하지 않습니다. 이는 엄청난 양의 중복을 생성할 수 있기 때문입니다. 대신, 배치 변환은 매니페스트에 데이터를 캡처합니다. 매니페스트에는 소스 변환 입력 또는 출력 s3 위치가 포함됩니다.

캡처된 데이터를 살펴보겠습니다.

In [None]:
!aws s3 ls {s3_capture_upload_path}/input/ --recursive

In [None]:
s3 = boto3.client("s3")

captured_input_s3_key = [
    k["Key"]
    for k in s3.list_objects_v2(Bucket=bucket, Prefix=f"{data_capture_prefix}/input/")["Contents"]
]
assert len(captured_input_s3_key) > 0

In [None]:
sample_input_body = s3.get_object(Bucket=bucket, Key=captured_input_s3_key[0])["Body"]
sample_input_content = json.loads(sample_input_body.read())

In [None]:
sample_input_content

데이터 중복을 피하기 위해 캡처된 데이터는 매니페스트 파일입니다. 각 매니페스트는 소스 객체의 Amazon S3 위치를 포함하는 JSONL 파일입니다.

In [None]:
!aws s3 ls {s3_capture_upload_path}/output/ --recursive

In [None]:
captured_input_s3_key = [
    k["Key"]
    for k in s3.list_objects_v2(Bucket=bucket, Prefix=f"{data_capture_prefix}/output/")["Contents"]
]
assert len(captured_input_s3_key) > 0
sample_output_body = s3.get_object(Bucket=bucket, Key=captured_input_s3_key[0])["Body"]
sample_output_content = json.loads(sample_output_body.read())

In [None]:
sample_output_content

요약하자면, 새로운 매개변수를 사용하여 배치 변환 작업의 입력 또는 출력 페이로드를 캡처하는 방법을 확인했습니다. 또한 Amazon S3에서 캡처된 형식이 어떻게 보이는지도 확인했습니다. 다음으로, Amazon SageMaker가 Amazon S3에서 수집된 데이터를 모니터링하는 데 어떻게 도움이 되는지 계속 살펴보겠습니다.

## 파트 B: 모델 모니터 - 기준선 및 지속적 모니터링

### 5) 모델 모니터에서 사용할 기준선 생성하기

데이터 수집 외에도 Amazon SageMaker는 배치 변환이 관찰한 데이터를 모니터링하고 평가하는 기능을 제공합니다. 이를 위해:
1. 실시간 트래픽과 비교할 기준선을 생성합니다.
1. 기준선이 준비되면 지속적으로 평가하고 기준선과 비교하는 일정을 설정합니다.

일반적으로 이 작업은 변환 작업과 병행하여 수행할 수 있습니다.

모델을 훈련시킨 훈련 데이터셋은 일반적으로 좋은 기준선 데이터셋입니다. 훈련 데이터셋 스키마와 추론 데이터셋 스키마가 정확히 일치해야 합니다(즉, 특성의 수와 순서).

훈련 데이터셋에서 Amazon SageMaker에게 기준선 `제약 조건` 세트를 제안하고 데이터를 탐색하기 위한 설명적 `통계`를 생성하도록 요청할 수 있습니다. 이 예제에서는 이 예제에 포함된 사전 훈련된 모델을 훈련하는 데 사용된 훈련 데이터셋을 업로드합니다. Amazon S3에 이미 있는 경우 직접 가리킬 수 있습니다.

In [None]:
# copy over the training dataset to Amazon S3 (if you already have it in Amazon S3, you could reuse it)
baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = "s3://{}/{}".format(bucket, baseline_data_prefix)
baseline_results_uri = "s3://{}/{}".format(bucket, baseline_results_prefix)
print("Baseline data uri: {}".format(baseline_data_uri))
print("Baseline results uri: {}".format(baseline_results_uri))

In [None]:
training_data_file = open("test_data/training-dataset-with-header.csv", "rb")
s3_key = os.path.join(baseline_prefix, "data", "training-dataset-with-header.csv")
boto3.Session().resource("s3").Bucket(bucket).Object(s3_key).upload_fileobj(training_data_file)

이제 Amazon S3에 훈련 데이터가 준비되었으므로, 제약 조건을 `제안`하는 작업을 시작합니다. `DefaultModelMonitor.suggest_baseline(..)` 은 제약 조건을 생성하기 위해 Amazon SageMaker에서 제공하는 Model Monitor 컨테이너를 사용하여 `ProcessingJob`을 시작합니다.

In [None]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri + "/training-dataset-with-header.csv",
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True,
)

### 생성된 제약 조건 및 통계 탐색

In [None]:
s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Files:")
print("\n ".join(report_files))

In [None]:
import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

In [None]:
constraints_df = pd.json_normalize(
    baseline_job.suggested_constraints().body_dict["features"]
)
constraints_df.head(10)

### 6) 모니터링 일정

### 일정 생성

모델 모니터링 일정을 생성할 수 있습니다. 기준 리소스(제약 조건 및 통계)를 사용하여 배치 변환 추론 입력 및 출력과 비교합니다.

In [None]:
from sagemaker.model_monitor import CronExpressionGenerator
from sagemaker.model_monitor import BatchTransformInput
from sagemaker.model_monitor import MonitoringDatasetFormat
from time import gmtime, strftime

statistics_path = "{}/statistics.json".format(baseline_results_uri)
constraints_path = "{}/constraints.json".format(baseline_results_uri)

mon_schedule_name = "DEMO-xgb-churn-pred-model-monitor-schedule-" + strftime(
    "%Y-%m-%d-%H-%M-%S", gmtime()
)
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    batch_transform_input=BatchTransformInput(
        data_captured_destination_s3_uri=s3_capture_upload_path,
        destination="/opt/ml/processing/input",
        dataset_format=MonitoringDatasetFormat.csv(header=False),
    ),
    output_s3_uri=s3_report_path,
    statistics=statistics_path,
    constraints=constraints_path,
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

---

### 7) 일정 설명 및 검사

일정을 설명하면 MonitoringScheduleStatus가 Scheduled로 변경되는 것을 확인할 수 있습니다.

In [None]:
desc_schedule_result = my_default_monitor.describe_schedule()
print("Schedule status: {}".format(desc_schedule_result["MonitoringScheduleStatus"]))

### 실행 목록 조회
일정은 이전에 지정한 간격으로 작업을 시작합니다. 여기서는 최근 다섯 개의 실행을 나열합니다. 시간별 일정을 생성한 후 이를 실행하는 경우 실행 목록이 비어 있을 수 있습니다. 실행이 시작되는 것을 보려면 (UTC 기준) 시간 경계를 넘어갈 때까지 기다려야 할 수 있습니다. 아래 코드에는 대기 로직이 포함되어 있습니다.

참고: 시간별 일정이라도 Amazon SageMaker는 실행을 예약하기 위한 20분의 버퍼 기간을 가집니다. 실행이 시간 경계로부터 0분에서 약 20분 사이에 시작되는 것을 볼 수 있습니다. 이는 예상된 동작이며 백엔드의 부하 분산을 위해 수행됩니다.

In [None]:
import time

mon_executions = my_default_monitor.list_executions()
print(
    "We created a hourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.\nWe will have to wait till we hit the hour..."
)

while len(mon_executions) == 0:
    print("Waiting for the 1st execution to happen...")
    time.sleep(60)
    mon_executions = my_default_monitor.list_executions()

### 특정 실행 검사(최신 실행)
이전 셀에서 최근에 완료되거나 실패한 예약된 실행을 선택했습니다. 다음은 가능한 최종 상태와 각각의 의미입니다:
* Completed - 모니터링 실행이 완료되었으며 위반 보고서에서 문제가 발견되지 않았음을 의미합니다.
* CompletedWithViolations - 실행은 완료되었지만 제약 조건 위반이 감지되었음을 의미합니다.
* Failed - 모니터링 실행이 실패했습니다. 클라이언트 오류(아마도 잘못된 역할 권한) 또는 인프라 문제 때문일 수 있습니다. 정확히 무슨 일이 일어났는지 확인하려면 FailureReason과 ExitMessage를 추가로 검토해야 합니다.
* Stopped - 작업이 최대 실행 시간을 초과했거나 수동으로 중지되었습니다.

In [None]:
latest_execution = mon_executions[
    -1
]  # latest execution's index is -1, second to last is -2 and so on..
# time.sleep(60)
latest_execution.wait(logs=False)

print("Latest execution status: {}".format(latest_execution.describe()["ProcessingJobStatus"]))
print("Latest execution result: {}".format(latest_execution.describe()["ExitMessage"]))

latest_job = latest_execution.describe()
if latest_job["ProcessingJobStatus"] != "Completed":
    print(
        "====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures."
    )

In [None]:
report_uri = latest_execution.output.destination
print("Report Uri: {}".format(report_uri))

### 생성된 보고서 목록

In [None]:
from urllib.parse import urlparse

s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip("/")
print("Report bucket: {}".format(report_bucket))
print("Report key: {}".format(report_key))

s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Report Files:")
print("\n ".join(report_files))

### 위반 보고서

기준선과 비교하여 위반 사항이 있으면 여기에 나열됩니다.

In [None]:
violations = my_default_monitor.latest_monitoring_constraint_violations()
constraints_df = pd.json_normalize(violations.body_dict["violations"])
constraints_df.head(10)

### 기타 명령
모니터링 일정을 시작하고 중지할 수도 있습니다.

In [None]:
# my_default_monitor.stop_monitoring_schedule()
# my_default_monitor.start_monitoring_schedule()

## 8) 리소스 삭제

In [None]:
# my_default_monitor.stop_monitoring_schedule()
# my_default_monitor.delete_monitoring_schedule()
# time.sleep(60)  # actually wait for the deletion

In [None]:
# predictor.delete_model()