In [1]:
import tensorflow as tf, os, tarfile, json, numpy as np, base64, sagemaker, boto3
from sagemaker.tensorflow import TensorFlowModel
from io import BytesIO
from matplotlib import pyplot as plt
from PIL import Image
from time import sleep
sm_client = boto3.client('sagemaker')
smr_client = boto3.client('sagemaker-runtime')
sm_role = sagemaker.get_execution_role()
sess = sagemaker.session.Session()
bucket = sess.default_bucket()

## 使用するモデルのロードと動作確認

In [2]:
model = tf.keras.applications.mobilenet_v2.MobileNetV2()
model.summary()

[2021-07-27 06:43:07.987 tensorflow-2-3-cpu-py3-ml-c5-large-e43af8eadf999ccc51decfde869f:3259 INFO utils.py:27] RULE_JOB_STOP_SIGNAL_FILENAME: None
[2021-07-27 06:43:08.144 tensorflow-2-3-cpu-py3-ml-c5-large-e43af8eadf999ccc51decfde869f:3259 INFO profiler_config_parser.py:102] Unable to find config at /opt/ml/input/config/profilerconfig.json. Profiler is disabled.
Model: "mobilenetv2_1.00_224"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1_pad (ZeroPadding2D)       (None, 225, 225, 3)  0           input_1[0][0]                    
__________________________________________________________________________________________________
Conv1 (Co

In [None]:
# サンプル画像をダウンロード
file = tf.keras.utils.get_file(
    f'{os.getcwd()}/mountains.jpg',
    'https://storage.googleapis.com/gcptutorials.com/examples/mountains.jpg')

# 分類クラスをダウンロード
labels_path = tf.keras.utils.get_file(
    f'{os.getcwd()}/ImageNetLabels.txt',
    'https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt')
labels = list(np.array(open(labels_path).read().splitlines())[1:])

In [None]:
with open('./code/labels.txt','wt') as f:
    for txt in labels:
        f.write(txt+'\n')

In [None]:
# 画像のresizeと前処理結果の確認
img = Image.open(file).resize((model.input_shape[1],model.input_shape[2]))
img_arr = ((np.array(img)-127.5)/127.5).astype(np.float32).reshape(-1,model.input_shape[1],model.input_shape[2],3)
img

In [None]:
# モデルの動作確認
print(labels[np.argmax(model.predict(img_arr))])

In [None]:
# 既存の h5 モデルをロード
# model = tf.keras.models.load_model('classifier.h5')

# 保存ディレクトリを指定
model_dir = './0001'

# tar.gz の出力先を指定
tar_dir = 'MyModel'
os.makedirs(tar_dir, exist_ok=True)
tar_name = os.path.join(tar_dir, 'model.tar.gz')

# モデルを SavedModel 形式で保存
model.save(model_dir)

# tar.gz ファイルを出力
with tarfile.open(tar_name, mode='w:gz') as tar:
    tar.add(model_dir)

In [None]:
import sagemaker

# S3 にアップロードして、返り値としてS3のURIを受け取る
model_s3_path = f's3://{bucket}/{tar_dir}'

model_s3_uri = sagemaker.s3.S3Uploader.upload(
    local_path = tar_name,
    desired_s3_uri = model_s3_path
)

print(model_s3_uri)


In [None]:
# Sagemaker SDK でマネージドコンテナの URI を取得
container_image_uri = sagemaker.image_uris.retrieve(
    "tensorflow",  # TensorFlow のマネージドコンテナを利用
    sagemaker.session.Session().boto_region_name, # ECR のリージョンを指定
    version='2.4', # TensorFlow のバージョンを指定
    instance_type = 'ml.m5.large', # インスタンスタイプを指定
    image_scope = 'inference' # 推論コンテナを指定
)

print(container_image_uri)


## SageMaker Python SDK で Hosting

In [None]:
model_name = 'MyTFModelFromSMSDK'
endpoint_config_name = model_name + 'Endpoint'
endpoint_name = endpoint_config_name

In [None]:
# モデルとコンテナの指定
tf_model = TensorFlowModel(
    name = model_name,
    model_data=model_s3_uri, # モデルの S3 URI
    role= sm_role, # 割り当てるロール
    image_uri = container_image_uri, # コンテナイメージの S3 URI
)
# デプロイ(endpoint 生成)
predictor = tf_model.deploy(
    endpoint_name=endpoint_name, # エンドポイントの名前
    initial_instance_count=1, # インスタンス数
    instance_type='ml.m5.large', # インスタンスタイプ
)


In [None]:
img = Image.open(file).resize((model.input_shape[1],model.input_shape[2]))
img_arr = ((np.array(img)-127.5)/127.5).astype(np.float32).reshape(-1,model.input_shape[1],model.input_shape[2],3)

In [None]:
result = np.argmax(predictor.predict(img_arr)['predictions'][0])
print(labels[result])

In [None]:
r = sm_client.delete_endpoint(EndpointName=endpoint_name)
r = sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
r = sm_client.delete_model(ModelName=model_name)

## Boto3 で Hosting

In [None]:
model_name = 'MyTFModelAddProcessFromBoto3'
endpoint_config_name = model_name + 'EndpointConfig'
endpoint_name = model_name + 'Endpoint'

In [None]:
response = sm_client.create_model(
    ModelName=model_name,
    PrimaryContainer={
        # SageMaker SDK の時と同じ URI を指定
        'Image': container_image_uri,
        # SageMaker SDK の時と同じ URI を指定
        'ModelDataUrl': model_s3_uri,
    },
    # SageMaker SDK の時と同じ role を指定
    ExecutionRoleArn=sm_role,
)


In [None]:
response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            'VariantName': 'AllTrafic',
            'ModelName': model_name,
            'InitialInstanceCount': 1,
            'InstanceType': 'ml.m5.xlarge',
        },
    ],
)


In [None]:
response = sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name,
)
while True:
    status = sm_client.describe_endpoint(EndpointName=endpoint_name)['EndpointStatus']
    if status in ['InService','RollingBack','SystemUpdating','OutOfService']:
        print('!')
        print(status)
        break
    else:
        print('.',end='')
        sleep(5)

In [None]:
# リストを文字列にして渡すパターン
request_args = {
    'EndpointName': 'MyTFModelEndpointFromBoto3',
    'ContentType' : 'application/json',
    'Accept' : 'application/json',
    'Body' : str(img_arr.tolist())
}
response = smr_client.invoke_endpoint(**request_args)
result = np.argmax(json.load(response['Body'])['predictions'][0])
print(labels[result])

In [None]:
# jsonにして渡すパターン
request_args = {
    'EndpointName': 'MyTFModelEndpointFromBoto3',
    'ContentType' : 'application/json',
    'Accept' : 'application/json',
    'Body' : json.dumps({"instances": img_arr.tolist()})
}
response = smr_client.invoke_endpoint(**request_args)
result = np.argmax(json.load(response['Body'])['predictions'][0])
print(labels[result])

In [None]:
r = sm_client.delete_endpoint(EndpointName=endpoint_name)
r = sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
r = sm_client.delete_model(ModelName=model_name)

## 前処理/後処理追加
* リスト形式でデータを作成し(た後で json形式に変換し)て predict を行うが、 inference.py を使うことで前処理/後処理を endpoint 側うことも可能。
  * 重い画像の前処理を潤沢なエンドポイントのコンピューティングリソースで実行することで、呼び出し側　(Lambda など)の頻繁かつ長期的に処理するコンピューティングリソースのスペックを低減できる
  * 呼び出し側が前処理を意識せずに実装できるようになる(呼び出し側はデータサイエンティストの領域に入らずに済み、エンドポイントで実行する前処理までをDSの領域にできる）
* 以下を例に実装する。  
    * 前処理の例）画像分類であれば、画像のバイナリデータを base64 エンコーディングしたものを直接送りつけて、 endpoint 側でリストに変換可能
    * 後処理の例）softmax の結果から一番可能性の高い値を取得し、そのインデックスからラベルに変換する

### SageMaker Python SDK でホスティング

In [None]:
!pygmentize ./code/inference.py

In [None]:
model_name = 'MyTFModelAddProcessFromSMSDK'
endpoint_config_name = model_name + 'Endpoint'
endpoint_name = endpoint_config_name

In [None]:
# すでにあった場合の削除
r = sm_client.delete_endpoint(EndpointName=endpoint_name)
r = sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
r = sm_client.delete_model(ModelName=model_name)

In [None]:
# tar.gz の出力先を指定
tar_dir = 'MyModelAddProcess'
code_dir = './code'
os.makedirs(tar_dir, exist_ok=True)
tar_name = os.path.join(tar_dir, 'model.tar.gz')
with tarfile.open(tar_name, mode='w:gz') as tar:
    tar.add(model_dir)

In [None]:
model_s3_path = f's3://{bucket}/{tar_dir}'

model_s3_uri = sagemaker.s3.S3Uploader.upload(
    local_path = tar_name,
    desired_s3_uri = model_s3_path
)
print(model_s3_uri)

#### inference.py と必要なファイルの設定
* entry_point 引数で `inference.py` (名前固定)を指定すると `input_handler` と `output_handler` を推論前後に実行してくれる
* 必要なモジュール等がある場合は `source_dir` 引数に格納してあるディレクトリを指定すると一緒に読み込むが、 inference.py が `source_dir` のルートに存在する必要がある
* ホスティング先の展開ディレクトリは `/opt/ml/model/code` になるので、テキストファイルを読み込む時は絶対パスで指定するとよい（カレントディレクトリは `/sagemaker` で実行される）

In [None]:
# モデルとコンテナの指定
tf_model = TensorFlowModel(
    name = model_name,
    model_data=model_s3_uri, # モデルの S3 URI
    role= sm_role, # 割り当てるロール
    image_uri = container_image_uri, # コンテナイメージの S3 URI
    entry_point = './code/inference.py',
    source_dir = './code/'
)
# デプロイ(endpoint 生成)
predictor = tf_model.deploy(
    endpoint_name=endpoint_name,
    initial_instance_count=1, # インスタンス数
    instance_type='ml.m5.xlarge', # インスタンスタイプ
)


In [None]:
# 推論
with open('./mountains.jpg', 'rb') as img:
    data = img.read()
bio = BytesIO()
bio.write(data)
b64_data = base64.b64encode(bio.getvalue()).decode('utf-8')
json_b64 = json.dumps({'b64_image':b64_data})
request_args = {
    'EndpointName': endpoint_name,
    'ContentType' : 'application/json',
    'Accept' : 'application/json',
    'Body' : json_b64
}
response = smr_client.invoke_endpoint(**request_args)
print(response['Body'].read().decode('utf-8'))

In [None]:
# すでにあった場合の削除
r = sm_client.delete_endpoint(EndpointName=endpoint_name)
r = sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
r = sm_client.delete_model(ModelName=model_name)

### Boto3 でホスティングと推論

In [None]:
model_name = 'MyTFModelAddProcessFromBoto3'
endpoint_config_name = model_name + 'EndpointConfig'
endpoint_name = model_name + 'Endpoint'

In [None]:
# すでにあった場合の削除
r = sm_client.delete_endpoint(EndpointName=endpoint_name)
r = sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
r = sm_client.delete_model(ModelName=model_name)

#### inference.py 他、必要なファイルはmodel.tar.gzに一緒に入れる必要がある

In [None]:
# tar.gz の出力先を指定
tar_dir = 'MyModelAddProcess'
code_dir = './code'
os.makedirs(tar_dir, exist_ok=True)
tar_name = os.path.join(tar_dir, 'model.tar.gz')
with tarfile.open(tar_name, mode='w:gz') as tar:
    tar.add(model_dir)
    tar.add(code_dir)

In [None]:
model_s3_path = f's3://{bucket}/{tar_dir}'

model_s3_uri = sagemaker.s3.S3Uploader.upload(
    local_path = tar_name,
    desired_s3_uri = model_s3_path
)
print(model_s3_uri)

In [None]:
response = sm_client.create_model(
    ModelName=model_name,
    PrimaryContainer={
        # SageMaker SDK の時と同じ URI を指定
        'Image': container_image_uri,
        # SageMaker SDK の時と同じ URI を指定
        'ModelDataUrl': model_s3_uri,
    },
    # SageMaker SDK の時と同じ role を指定
    ExecutionRoleArn=sm_role,
)
response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            'VariantName': 'AllTrafic',
            'ModelName': model_name,
            'InitialInstanceCount': 1,
            'InstanceType': 'ml.m5.xlarge',
        },
    ],
)
response = sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name,
)

while True:
    status = sm_client.describe_endpoint(EndpointName=endpoint_name)['EndpointStatus']
    if status in ['InService','RollingBack','SystemUpdating','OutOfService']:
        print('!')
        print(status)
        break
    else:
        print('.',end='')
        sleep(5)

In [None]:
# 推論
with open('./mountains.jpg', 'rb') as img:
    data = img.read()
bio = BytesIO()
bio.write(data)
b64_data = base64.b64encode(bio.getvalue()).decode('utf-8')
json_b64 = json.dumps({'b64_image':b64_data})
request_args = {
    'EndpointName': endpoint_name,
    'ContentType' : 'application/json',
    'Accept' : 'application/json',
    'Body' : json_b64
}
response = smr_client.invoke_endpoint(**request_args)
print(response['Body'].read().decode('utf-8'))

In [None]:
# 削除
r = sm_client.delete_endpoint(EndpointName=endpoint_name)
r = sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
r = sm_client.delete_model(ModelName=model_name)