# 2023 AI/ML UnicornGym - Track 1

@jesamkim | 2023-Feb-08

# 4. SageMaker Endpoint (Single Model Endpoint)

### setup environment

In [1]:
%load_ext autoreload
%autoreload 2

# src 폴더 경로 설정
import sys
sys.path.append('./src')

<div class="alert alert-warning"><h4>주의</h4><p>
아래 코드 셀은 핸즈온에 필요한 라이브러리들을 설치하고, 주피터 노트북 커널을 셧다운시킵니다. 
    
노트북 커널이 셧다운된다면, 아래 코드 셀에서 <b><font color='darkred'>install_needed = False</font></b>로 변경 후, 코드 셀을 다시 실행해 주세요. 이 작업은 한 번만 수행하면 됩니다. 
</p></div>

In [2]:
import sys, IPython

#install_needed = True
install_needed = False

if install_needed:
    print("===> Installing deps and restarting kernel. Please change 'install_needed = False' and run this code cell again.")
    !{sys.executable} -m pip install -U sagemaker locust pyngrok xgboost==1.3.1
    IPython.Application.instance().kernel.do_shutdown(True)
else:
    import sagemaker, xgboost
    print(f'SageMaker API version={sagemaker.__version__}, XGBoost version={xgboost.__version__}')

SageMaker API version=2.131.1, XGBoost version=1.7.3


In [3]:
%store -r
XGB_FRAMEWORK_VERSION = '1.3-1'
DATASET_PATH = './dataset'

In [4]:
import boto3
import sagemaker
import json

sess = sagemaker.Session()
sm_session = sagemaker.session.Session()
boto_session = boto3.session.Session()
sm_client = boto3.client("sagemaker")
bucket = sm_session.default_bucket()
role = sagemaker.get_execution_role()
region = boto_session.region_name

print((bucket, role))

('sagemaker-us-east-1-376278017302', 'arn:aws:iam::376278017302:role/service-role/AmazonSageMaker-ExecutionRole-20230112T204234')


<br>

## 4-1. Create Model Serving Script

---

아래 코드 셀은 src 디렉토리에 SageMaker 추론 스크립트를 저장합니다.

#### Option 1.
- `model_fn(model_dir)`: S3의 `model_dir`에 저장된 모델 아티팩트를 로드합니다.
- `input_fn(request_body, content_type)`: 입력 데이터를 전처리합니다. `content_type`은 입력 데이터 종류에 따라 다양하게 처리 가능합니다. (예: `application/x-npy`, `application/json`, `application/csv`등)
- `predict_fn(input_object, model)`: `input_fn(...)`을 통해 들어온 데이터에 대해 추론을 수행합니다.
- `output_fn(prediction, accept_type)`: `predict_fn(...)`에서 받은 추론 결과를 후처리를 거쳐 프론트엔드로 전송합니다.

#### Option 2.
- `model_fn(model_dir)`: S3의 model_dir에 저장된 모델 아티팩트를 로드합니다.
- `transform_fn(model, request_body, content_type, accept_type)`: `input_fn(...), predict_fn(...), output_fn(...)`을 `transform_fn(...)`으로 통합할 수 있습니다.

In [32]:
%%writefile src/inference.py

import os
import time
import json
import pickle as pkl
import numpy as np
from io import BytesIO
import xgboost as xgb
import sagemaker_xgboost_container.encoder as xgb_encoders

# 데이터 프레임 컬럼 수
NUM_FEATURES= 8  

def model_fn(model_dir):
    """
    Deserialize and return fitted model.
    """
    model_file = "xgboost-model"
    model = xgb.Booster()
    model.load_model(os.path.join(model_dir, model_file))
    return model
                     

def input_fn(request_body, request_content_type):
    """
    The SageMaker XGBoost model server receives the request data body and the content type,
    and invokes the `input_fn`.
    Return a DMatrix (an object that can be passed to predict_fn).
    """
    print("Content type: ", request_content_type)
    if request_content_type == "application/x-npy":        
        stream = BytesIO(request_body)
        array = np.frombuffer(stream.getvalue())
        array = array.reshape(int(len(array)/NUM_FEATURES), NUM_FEATURES)
        return xgb.DMatrix(array)
    elif request_content_type == "text/csv":
        return xgb_encoders.csv_to_dmatrix(request_body.rstrip("\n"))
    elif request_content_type == "text/libsvm":
        return xgb_encoders.libsvm_to_dmatrix(request_body)
    else:
        raise ValueError(
            "Content type {} is not supported.".format(request_content_type)
        )
        

def predict_fn(input_data, model):
    """
    SageMaker XGBoost model server invokes `predict_fn` on the return value of `input_fn`.

    Return a two-dimensional NumPy array (predictions and scores)
    """
    start_time = time.time()
    y_probs = model.predict(input_data)
    print("--- Inference time: %s secs ---" % (time.time() - start_time))    
    y_preds = [1 if e >= 0.5 else 0 for e in y_probs] 
    #feature_contribs = model.predict(input_data, pred_contribs=True, validate_features=False)
    return np.vstack((y_preds, y_probs))


def output_fn(predictions, content_type="application/json"):
    """
    After invoking predict_fn, the model server invokes `output_fn`.
    """
    if content_type == "text/csv":
        return ','.join(str(x) for x in outputs)
    elif content_type == "application/json":
        outputs = json.dumps({
            'pred': predictions[0,:].tolist(),
            'prob': predictions[1,:].tolist()
        })        
        
        return outputs
    else:
        raise ValueError("Content type {} is not supported.".format(content_type))

Overwriting src/inference.py


<br>

## 4-2. Check Inference Results & Debugging
로컬 엔드포인트나 호스팅 엔드포인트 배포 전, 로컬 환경 상에서 직접 추론을 수행하여 결과를 확인합니다.

In [6]:
#!rm -rf model && mkdir model && tar -xzvf model.tar.gz -C model

In [45]:
import xgboost as xgb
import numpy as np
import pandas as pd

model = xgb.Booster()
model.load_model("model/xgboost-model")

train_df = pd.read_csv(f'{DATASET_PATH}/train.csv')
test_df = pd.read_csv(f'{DATASET_PATH}/test.csv')
y_test = test_df.iloc[:, 0].astype('int')
test_df = test_df.drop('Unnamed: 0', axis=1)
dtest = xgb.DMatrix(test_df)

In [33]:
y_prob = model.predict(dtest)
y_pred = np.array([1 if e >= 0.5 else 0 for e in y_prob])
y_prob[0], y_pred[0]

(0.99374384, 1)

<br>

## 4-3. Deploy a trained model from Amazon S3
---

SageMaker API의 `Model` 클래스는 훈련한 모델을 서빙하기 위한 모델 아티팩트와 도커 이미지를 정의합니다. 
`Model` 클래스 인스턴스 호출 시 AWS에서 사전 빌드한 도커 이미지 URL을 직접 가져올 수도 있지만, Model의 자식 클래스로(예: `XGBoostModel`, `TensorFlowModel`) 초기화하면 파라메터에 버전만 지정하는 것만으로 편리하게 추론을 수행하는 환경을 정의할 수 있습니다.

### Upload model artifacts to S3
압축한 모델 아티팩트를 Amazon S3로 복사합니다.

In [35]:
prefix = 'titanic-ml/deploy'
s3_path = f's3://{bucket}/{prefix}/model.tar.gz'
!aws s3 cp ./model/model.tar.gz {s3_path}

upload: model/model.tar.gz to s3://sagemaker-us-east-1-376278017302/titanic-ml/deploy/model.tar.gz


<br>

## 4-4. Deploy to Hosting Instance

로컬 모드에서 충분히 디버깅했으면 실제 호스팅 인스턴스로 배포할 차례입니다. 코드는 로컬 배포와 거의 동일하며, `instance_type`만 다르다는 점을 주목해 주세요! 

### Create Model

In [36]:
from sagemaker.xgboost.model import XGBoostModel

xgb_model = XGBoostModel(
    model_data=s3_path,
    role=role,
    entry_point="src/inference.py",
    framework_version=XGB_FRAMEWORK_VERSION,
)

### Create Endpoint

SageMaker SDK는 `deploy(...)` 메소드를 호출 시, `create-endpoint-config`와 `create-endpoint`를 같이 수행합니다. 좀 더 세분화된 파라메터 조정을 원하면 AWS CLI나 boto3 SDK client 활용을 권장 드립니다.

In [37]:
xgb_predictor = xgb_model.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.xlarge', 
    wait=False
)

### Wait for the endpoint jobs to complete

엔드포인트가 생성될 때까지 기다립니다. 엔드포인트가 가리키는 호스팅 리소스를 프로비저닝하는 데에 몇 분의 시간이 소요됩니다. 

In [38]:
from IPython.core.display import display, HTML
def make_endpoint_link(region, endpoint_name, endpoint_task):
    endpoint_link = f'<b><a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={region}#/endpoints/{endpoint_name}">{endpoint_task} Review Endpoint</a></b>'   
    return endpoint_link 
        
endpoint_link = make_endpoint_link(region, xgb_predictor.endpoint_name, '[Deploy model from S3]')
display(HTML(endpoint_link))

In [39]:
sess.wait_for_endpoint(xgb_predictor.endpoint_name, poll=5)

----------------------!

{'EndpointName': 'sagemaker-xgboost-2023-02-08-05-34-18-143',
 'EndpointArn': 'arn:aws:sagemaker:us-east-1:376278017302:endpoint/sagemaker-xgboost-2023-02-08-05-34-18-143',
 'EndpointConfigName': 'sagemaker-xgboost-2023-02-08-05-34-18-143',
 'ProductionVariants': [{'VariantName': 'AllTraffic',
   'DeployedImages': [{'SpecifiedImage': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.3-1',
     'ResolvedImage': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost@sha256:cc1509099c9c2b3fd9116d28928e994c22f144f32e8e9bbb7a4402c19d709250',
     'ResolutionTime': datetime.datetime(2023, 2, 8, 5, 34, 18, 927000, tzinfo=tzlocal())}],
   'CurrentWeight': 1.0,
   'DesiredWeight': 1.0,
   'CurrentInstanceCount': 1,
   'DesiredInstanceCount': 1}],
 'EndpointStatus': 'InService',
 'CreationTime': datetime.datetime(2023, 2, 8, 5, 34, 18, 385000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 2, 8, 5, 36, 43, 700000, tzinfo=tzlocal()),
 'ResponseMetadata': 

### Prediction - SageMaker SDK & text/csv
샘플 데이터에 대해 추론을 수행합니다.

In [40]:
from sagemaker.serializers import CSVSerializer, NumpySerializer
from sagemaker.deserializers import JSONDeserializer
xgb_predictor.serializer = CSVSerializer()
xgb_predictor.deserializer = JSONDeserializer() 

outputs = xgb_predictor.predict(test_df.values[0:4,:])
y_test_sample = y_test[0:4].values
y_pred_sample = outputs['pred']; y_prob_sample = outputs['prob']
y_test_sample, y_pred_sample, y_prob_sample

(array([12, 14, 24, 26]),
 [1.0, 1.0, 1.0, 1.0],
 [0.9937438368797302,
  0.9923297762870789,
  0.9923297762870789,
  0.9937438368797302])

### Prediction - boto3 SDK & application/x-npy

위의 코드 셀처럼 SageMaker SDK의 `predict(...)` 메소드로 추론을 수행할 수도 있지만, 이번에는 boto3의 `invoke_endpoint(...)` 메소드로 추론을 수행해 보겠습니다.
Boto3는 서비스 레벨의 저수준(low-level) SDK로, ML 실험에 초점을 맞춰 일부 기능들이 추상화된 고수준(high-level) SDK인 SageMaker SDK와 달리 SageMaker API를 완벽하게 제어할 수 있습으며, 프로덕션 및 자동화 작업에 적합합니다.

In [41]:
runtime_client = boto3.client('sagemaker-runtime')
endpoint_name = xgb_model.endpoint_name

response = runtime_client.invoke_endpoint(
    EndpointName=endpoint_name, 
    ContentType='application/x-npy',
    Accept='application/json',
    Body=test_df.values[0:4,:].tobytes()
)

print(json.loads(response['Body'].read().decode()))

{'pred': [1.0, 1.0, 1.0, 1.0], 'prob': [0.9937438368797302, 0.9923297762870789, 0.9923297762870789, 0.9937438368797302]}


### Prediction - boto3 SDK & text/csv

In [42]:
import io
from io import StringIO
csv_file = io.StringIO()
test_df[0:4].to_csv(csv_file, sep=",", header=False, index=False)
payload = csv_file.getvalue()

response = runtime_client.invoke_endpoint(
    EndpointName=endpoint_name, 
    ContentType='text/csv',
    Accept='application/json',
    Body=payload
)

print(json.loads(response['Body'].read().decode()))

{'pred': [1.0, 1.0, 1.0, 1.0], 'prob': [0.9937438368797302, 0.9923297762870789, 0.9923297762870789, 0.9937438368797302]}


<br>

다음 모듈에서 재사용할 변수들을 저장합니다. 만약 다음 모듈로 진행하지 않는다면 아래 섹션의 코드 셀을 주석 해제 후 실행해 주세요.

In [50]:
%store endpoint_name test_df s3_path

Stored 'endpoint_name' (str)
Stored 'test_df' (DataFrame)
Stored 's3_path' (str)


<br>

### (Optional) Endpoint Clean-up

In [51]:
xgb_predictor.delete_endpoint()
xgb_model.delete_model()