# 非同期推論による RRCF の学習と推論

In [None]:
from typing import Final
import os
import sagemaker
from sagemaker.sklearn import SKLearnModel
import boto3
import json
from time import sleep
import glob
import pandas as pd
import io
from matplotlib import pyplot as plt

smr_client:Final = boto3.client('sagemaker-runtime')
sm_client:Final = boto3.client('sagemaker')
s3_client:Final = boto3.client('s3')
endpoint_inservice_waiter:Final = sm_client.get_waiter('endpoint_in_service')
role: Final[str] = sagemaker.get_execution_role()
region: Final[str] = sagemaker.Session().boto_region_name
sess = sagemaker.session.Session()
bucket = sess.default_bucket()

## データ準備
* 5_xxx.ipynb のベンチマークデータを使う
* ファイルリストを取得する

In [None]:
# s3_base_uri = sess.upload_data('./bench_data/',key_prefix='bench_data')
s3_base_uri = 's3://sagemaker-us-east-1-290000338583/bench_data'

In [None]:
file_name_list = sorted(glob.glob('./bench_data/**/*.csv'))
file_name_list = ['/'.join(file_name.split('/')[2:]) for file_name in file_name_list]
print(*file_name_list)

## 推論コード作成

In [None]:
source_dir = 'async_src/'
!rm -rf {source_dir}
!mkdir {source_dir}

In [None]:
%%writefile async_src/requirements.txt
rrcf==0.4.3
dill==0.3.4
matplotlib==3.5.3

In [None]:
%%writefile async_src/inference.py
import pandas as pd
import io
import rrcf
import numpy as np
from scipy import stats
import logging
import sys
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))

hps={
    'num_trees':128,
    'shingle_size':10,
    'tree_size':1024
}


def calc_score(df):
    # RCF 準備
    logger.debug('preparing RCF...')
    data = df[0].astype(float).values
    points = rrcf.shingle(data, size=hps['shingle_size'])
    points = np.vstack([point for point in points])
    n = points.shape[0]
    sample_size_range = (n // hps['tree_size'], hps['tree_size'])
    logger.debug('prepared RCF')
    
    # RCF を生成
    logger.debug('generating RCF...')
    forest = []
    while len(forest) < hps['num_trees']:
        ixs = np.random.choice(n, size=sample_size_range,
                               replace=False)
        trees = [rrcf.RCTree(points[ix], index_labels=ix) for ix in ixs]
        forest.extend(trees)
    logger.debug('generated RCF')
    
    # 異常スコア算出
    logger.debug('calculating score...')
    avg_codisp = pd.Series(0.0, index=np.arange(n))
    index = np.zeros(n)
    for tree in forest:
        codisp = pd.Series({leaf : tree.codisp(leaf) for leaf in tree.leaves})
        avg_codisp[codisp.index] += codisp
        np.add.at(index, codisp.index.values, 1)
    avg_codisp /= index
    logger.debug('calculated score')
    
    # result の整理
    logger.debug('organizing score...')
    columns = [i for i in range(points.shape[1])]
    result_df = pd.DataFrame(points, columns=columns, dtype='float')
    result_df['score'] = pd.Series(avg_codisp)
    result_df['scaled_score'] = result_df['score']/result_df['score'].max()
    
    logger.debug('organized score')
    return result_df, forest

def calc_threshold(df):
    logger.debug('calculating calc_threshold...')
    df['zscore'] = stats.zscore(df['score'])
    df['anomaly'] = df['zscore'].apply(lambda x: True if x>3 else False)
    return df

def model_fn(model_dir):
    return None
def input_fn(input_data, content_type):
    if content_type=='text/csv':
        df = pd.read_csv(io.StringIO(input_data), header=None)
    else:
        raise TypeError('allowed only text/csv')
    return df
def predict_fn(transformed_data, model):
    result_df, forest = calc_score(transformed_data)
    result_df = calc_threshold(result_df)
    return result_df
def output_fn(df, accept_type):
    buffer = io.StringIO()
    df.to_csv(buffer, index=False)
    return buffer.getvalue()

## 推論コードアップロード

In [None]:
%cd {source_dir}
!tar zcvf sourcedir.tar.gz ./*
%cd ..

In [None]:
source_s3_uri:Final[str] = sagemaker.session.Session().upload_data(
    f'./{source_dir}/sourcedir.tar.gz',
    key_prefix = 'rrcf_async'
)
print(source_s3_uri)

## 非同期推論エンドポイント作成

In [None]:
# 名前の設定
model_name: Final[str] = 'rrcf-async'
endpoint_config_name: Final[str] = model_name + 'EndpointConfig'
endpoint_name: Final[str] = model_name + 'Endpoint'
role: Final[str] = sagemaker.get_execution_role()

In [None]:
# コンテナイメージの URI を取得
container_image_uri: Final[str] = sagemaker.image_uris.retrieve(
    "sklearn",  # SKLearn のマネージドコンテナを利用
    sagemaker.session.Session().boto_region_name, # ECR のリージョンを指定
    version='1.0-1', # SKLearn のバージョンを指定
    instance_type = 'ml.c5.xlarge', # インスタンスタイプを指定
    image_scope = 'inference' # 推論コンテナを指定
)
print(container_image_uri)

## c5.9xlarge

In [None]:
# Model 作成
response = sm_client.create_model(
    ModelName=model_name,
    PrimaryContainer={
        'Image': container_image_uri,
        'Environment': {
            'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
            'SAGEMAKER_PROGRAM': 'inference.py',
            'SAGEMAKER_REGION': region,
            'SAGEMAKER_SUBMIT_DIRECTORY': source_s3_uri}
    },
    ExecutionRoleArn=role,
)
# EndpointConfig 作成
response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            'VariantName': 'AllTrafic',
            'ModelName': model_name,
            'InitialInstanceCount':1,
            'InstanceType': 'ml.c5.9xlarge',
        },
    ],
    AsyncInferenceConfig={
        "OutputConfig": {
            "S3OutputPath": f"s3://{bucket}/rrcf_async/output"
        },
    }
)
# Endpoint 作成
response = sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name,
)
# Endpoint が有効化されるまで待つ
endpoint_inservice_waiter.wait(
    EndpointName=endpoint_name,
    WaiterConfig={'Delay': 5,}
)

## 非同期推論

In [None]:
%%time
output_key_list = []
# 推論キュー追加
for file_name in file_name_list:
    response = smr_client.invoke_endpoint_async(
        EndpointName=endpoint_name, 
        InputLocation=f'{s3_base_uri}/{file_name}',
        ContentType='text/csv',
        Accept='text/csv',
    )
    output_s3_uri = response['OutputLocation']
    output_key = output_s3_uri.replace(f's3://{bucket}/','')
    output_key_list.append(output_key)
# 推論完了確認
for output_key in output_key_list:    
    while True:
        result = s3_client.list_objects(Bucket=bucket, Prefix=output_key)
        exists = True if "Contents" in result else False
        if exists:
            break
        else:
            sleep(0.1)

In [None]:
sm_client.delete_endpoint(EndpointName=endpoint_name)
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
sm_client.delete_model(ModelName=model_name)

## c5.xlarge x 1

In [None]:
# Model 作成
response = sm_client.create_model(
    ModelName=model_name,
    PrimaryContainer={
        'Image': container_image_uri,
        'Environment': {
            'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
            'SAGEMAKER_PROGRAM': 'inference.py',
            'SAGEMAKER_REGION': region,
            'SAGEMAKER_SUBMIT_DIRECTORY': source_s3_uri}
    },
    ExecutionRoleArn=role,
)
# EndpointConfig 作成
response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            'VariantName': 'AllTrafic',
            'ModelName': model_name,
            'InitialInstanceCount': 1,
            'InstanceType': 'ml.c5.xlarge',
        },
    ],
    AsyncInferenceConfig={
        "OutputConfig": {
            "S3OutputPath": f"s3://{bucket}/rrcf_async/output"
        },
    }
)
# Endpoint 作成
response = sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name,
)
# Endpoint が有効化されるまで待つ
endpoint_inservice_waiter.wait(
    EndpointName=endpoint_name,
    WaiterConfig={'Delay': 5,}
)

In [None]:
%%time
output_key_list = []
# 推論キュー追加
for file_name in file_name_list:
    response = smr_client.invoke_endpoint_async(
        EndpointName=endpoint_name, 
        InputLocation=f'{s3_base_uri}/{file_name}',
        ContentType='text/csv',
        Accept='text/csv',
    )
    output_s3_uri = response['OutputLocation']
    output_key = output_s3_uri.replace(f's3://{bucket}/','')
    output_key_list.append(output_key)
# 推論完了確認
for output_key in output_key_list:    
    while True:
        result = s3_client.list_objects(Bucket=bucket, Prefix=output_key)
        exists = True if "Contents" in result else False
        if exists:
            break
        else:
            sleep(0.1)

In [None]:
sm_client.delete_endpoint(EndpointName=endpoint_name)
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
sm_client.delete_model(ModelName=model_name)

## c5.xlarge x 8

In [None]:
# Model 作成
response = sm_client.create_model(
    ModelName=model_name,
    PrimaryContainer={
        'Image': container_image_uri,
        'Environment': {
            'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
            'SAGEMAKER_PROGRAM': 'inference.py',
            'SAGEMAKER_REGION': region,
            'SAGEMAKER_SUBMIT_DIRECTORY': source_s3_uri}
    },
    ExecutionRoleArn=role,
)
# EndpointConfig 作成
response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            'VariantName': 'AllTrafic',
            'ModelName': model_name,
            'InitialInstanceCount': 8,
            'InstanceType': 'ml.c5.xlarge',
        },
    ],
    AsyncInferenceConfig={
        "OutputConfig": {
            "S3OutputPath": f"s3://{bucket}/rrcf_async/output"
        },
    }
)
# Endpoint 作成
response = sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name,
)
# Endpoint が有効化されるまで待つ
endpoint_inservice_waiter.wait(
    EndpointName=endpoint_name,
    WaiterConfig={'Delay': 5,}
)

In [None]:
%%time
output_key_list = []
# 推論キュー追加
for file_name in file_name_list:
    response = smr_client.invoke_endpoint_async(
        EndpointName=endpoint_name, 
        InputLocation=f'{s3_base_uri}/{file_name}',
        ContentType='text/csv',
        Accept='text/csv',
    )
    output_s3_uri = response['OutputLocation']
    output_key = output_s3_uri.replace(f's3://{bucket}/','')
    output_key_list.append(output_key)
# 推論完了確認
for output_key in output_key_list:    
    while True:
        result = s3_client.list_objects(Bucket=bucket, Prefix=output_key)
        exists = True if "Contents" in result else False
        if exists:
            break
        else:
            sleep(0.1)

In [None]:
sm_client.delete_endpoint(EndpointName=endpoint_name)
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
sm_client.delete_model(ModelName=model_name)

## c5.4xlarge x 2

In [None]:
# Model 作成
response = sm_client.create_model(
    ModelName=model_name,
    PrimaryContainer={
        'Image': container_image_uri,
        'Environment': {
            'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
            'SAGEMAKER_PROGRAM': 'inference.py',
            'SAGEMAKER_REGION': region,
            'SAGEMAKER_SUBMIT_DIRECTORY': source_s3_uri}
    },
    ExecutionRoleArn=role,
)
# EndpointConfig 作成
response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            'VariantName': 'AllTrafic',
            'ModelName': model_name,
            'InitialInstanceCount': 2,
            'InstanceType': 'ml.c5.4xlarge',
        },
    ],
    AsyncInferenceConfig={
        "OutputConfig": {
            "S3OutputPath": f"s3://{bucket}/rrcf_async/output"
        },
    }
)
# Endpoint 作成
response = sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name,
)
# Endpoint が有効化されるまで待つ
endpoint_inservice_waiter.wait(
    EndpointName=endpoint_name,
    WaiterConfig={'Delay': 5,}
)

In [None]:
%%time
output_key_list = []
# 推論キュー追加
for file_name in file_name_list:
    response = smr_client.invoke_endpoint_async(
        EndpointName=endpoint_name, 
        InputLocation=f'{s3_base_uri}/{file_name}',
        ContentType='text/csv',
        Accept='text/csv',
    )
    output_s3_uri = response['OutputLocation']
    output_key = output_s3_uri.replace(f's3://{bucket}/','')
    output_key_list.append(output_key)
# 推論完了確認
for output_key in output_key_list:    
    while True:
        result = s3_client.list_objects(Bucket=bucket, Prefix=output_key)
        exists = True if "Contents" in result else False
        if exists:
            break
        else:
            sleep(0.1)

In [None]:
sm_client.delete_endpoint(EndpointName=endpoint_name)
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
sm_client.delete_model(ModelName=model_name)