# SageMakerのserializerとdeserializerを理解する

SageMakerの推論エンドポイントにリクエストを投げる際に、

SageMaker -> endpointにて

このノートブックでは、LightGBMがインストールされたカスタムコンテナ構築し、SageMaker Trainingジョブで学習後、推論を行います。
カスタムコンテナの挙動を観察し、SageMakerの推論動作について理解を深めます。

ノートブックは20分程度で実行できます。

## SageMakerの仕組み（仮説）

* SageMakerのコントロールプレーン（サーバ）がある。
    * それは、pingを打って各推論エンドポイントが動いてるかヘルスチェックして、把握している。
    
* 推論エンドポイントへは、.predict()(SageMakerSDKの場合) or invoke_endpoint()(boto3の場合）でデータを投げる
    * predictも結局はinvoke_endpoint()している

https://github.com/aws/sagemaker-python-sdk/blob/885423c26ce7288283bbca7d9c1c53c4d0ccf103/src/sagemaker/predictor.py#L123


invoke_endpoint()すると、SageMakerに推論先(同じエンドポイントでも、variantごとにインスタンスタイプを持っているので、別IPアドレスのはず。同じvariantでも複数のインスタンスを持ち、それらも別IPのはず）を聞きに行き、返された宛先のエンドポイントにデータを投げていると予想。
* endpointやvariantを指定しているので、SageMakerに場所を聞く必要があると予想。SageMakerはDNSのような役割をする。
    * これにより、variantsへのロードバランスをSageMakerが行える。（AutoScaleはSageMakerではなく、他の機構が行なっているはず）
* SageMakerから帰ってきた宛先に/invocationを投げる。推論エンドポイントは/invocationに返答する。

invoke_endpoint()の前の、Predictorクラス作成の時に、SerializerとDeserializerを指定している。
つまり、データ投げる前のクライアント側でシリアライズして、エンドポイントに投げる。
エンドポイントからの応答（推論結果）は、シリアルデータでクライアントに返ってくる。
デシリアライズをクライアント側で実施する。

# シリアライズの確認
シリアライズはクライアント側で実行され、シリアライズされたデータは推論エンドポイントにinvokeされます。

GitHubのソースコード

https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/serializers.py


input形式は様々

outputはシリアライザで指定したクラスによる

In [25]:
import numpy as np

In [26]:
data_nparr = np.array([0.25387,
                       0.0,
                       6.91,
                       0.0,
                       0.4480,
                       5.399,
                       95.3,
                       5.8700,
                       3.0,
                       233.0,
                       17.9,
                       396.90,
                       30.81])

data_str = '0.25387,0.0,6.91,0.0,0.448,5.399,95.3,5.87,3.0,233.0,17.9,396.9,30.81\n0.01951,17.5,1.38,0.0,0.4161,7.104,59.5,9.2229,3.0,216.0,18.6,393.24,8.05\n4.64689,0.0,18.1,0.0,0.614,6.98,67.6,2.5329,24.0,666.0,20.2,374.68,11.66'


In [28]:
print(type(data_nparr))
print(data_nparr)
data_nparr

<class 'numpy.ndarray'>
[2.5387e-01 0.0000e+00 6.9100e+00 0.0000e+00 4.4800e-01 5.3990e+00
 9.5300e+01 5.8700e+00 3.0000e+00 2.3300e+02 1.7900e+01 3.9690e+02
 3.0810e+01]


array([2.5387e-01, 0.0000e+00, 6.9100e+00, 0.0000e+00, 4.4800e-01,
       5.3990e+00, 9.5300e+01, 5.8700e+00, 3.0000e+00, 2.3300e+02,
       1.7900e+01, 3.9690e+02, 3.0810e+01])

In [29]:
print(type(data_str))
print(data_str)
data_str

<class 'str'>
0.25387,0.0,6.91,0.0,0.448,5.399,95.3,5.87,3.0,233.0,17.9,396.9,30.81
0.01951,17.5,1.38,0.0,0.4161,7.104,59.5,9.2229,3.0,216.0,18.6,393.24,8.05
4.64689,0.0,18.1,0.0,0.614,6.98,67.6,2.5329,24.0,666.0,20.2,374.68,11.66


'0.25387,0.0,6.91,0.0,0.448,5.399,95.3,5.87,3.0,233.0,17.9,396.9,30.81\n0.01951,17.5,1.38,0.0,0.4161,7.104,59.5,9.2229,3.0,216.0,18.6,393.24,8.05\n4.64689,0.0,18.1,0.0,0.614,6.98,67.6,2.5329,24.0,666.0,20.2,374.68,11.66'

In [23]:
# 推論実行
with open(local_test, 'r') as f:
    payload = f.read().strip()
    print(type(payload))
    print(payload)
print('=' * 20)
payload

<class 'str'>
0.25387,0.0,6.91,0.0,0.448,5.399,95.3,5.87,3.0,233.0,17.9,396.9,30.81
0.01951,17.5,1.38,0.0,0.4161,7.104,59.5,9.2229,3.0,216.0,18.6,393.24,8.05
4.64689,0.0,18.1,0.0,0.614,6.98,67.6,2.5329,24.0,666.0,20.2,374.68,11.66
3.67367,0.0,18.1,0.0,0.583,6.312,51.9,3.9917,24.0,666.0,20.2,388.62,10.58
0.29819,0.0,6.2,0.0,0.504,7.686,17.0,3.3751,8.0,307.0,17.4,377.51,3.92
8.15174,0.0,18.1,0.0,0.7,5.39,98.9,1.7281,24.0,666.0,20.2,396.9,20.85
6.65492,0.0,18.1,0.0,0.713,6.317,83.0,2.7344,24.0,666.0,20.2,396.9,13.99
0.17171,25.0,5.13,0.0,0.453,5.966,93.4,6.8185,8.0,284.0,19.7,378.08,14.44
5.73116,0.0,18.1,0.0,0.532,7.061,77.0,3.4106,24.0,666.0,20.2,395.28,7.01
3.1636,0.0,18.1,0.0,0.655,5.759,48.2,3.0665,24.0,666.0,20.2,334.4,14.13
11.8123,0.0,18.1,0.0,0.718,6.824,76.5,1.794,24.0,666.0,20.2,48.45,22.74
8.64476,0.0,18.1,0.0,0.693,6.193,92.6,1.7912,24.0,666.0,20.2,396.9,15.17
0.02177,82.5,2.03,0.0,0.415,7.61,15.7,6.27,2.0,348.0,14.7,395.38,3.11
0.13914,0.0,4.05,0.0,0.51,5.572,88.5,2.5961,5.0

'0.25387,0.0,6.91,0.0,0.448,5.399,95.3,5.87,3.0,233.0,17.9,396.9,30.81\n0.01951,17.5,1.38,0.0,0.4161,7.104,59.5,9.2229,3.0,216.0,18.6,393.24,8.05\n4.64689,0.0,18.1,0.0,0.614,6.98,67.6,2.5329,24.0,666.0,20.2,374.68,11.66\n3.67367,0.0,18.1,0.0,0.583,6.312,51.9,3.9917,24.0,666.0,20.2,388.62,10.58\n0.29819,0.0,6.2,0.0,0.504,7.686,17.0,3.3751,8.0,307.0,17.4,377.51,3.92\n8.15174,0.0,18.1,0.0,0.7,5.39,98.9,1.7281,24.0,666.0,20.2,396.9,20.85\n6.65492,0.0,18.1,0.0,0.713,6.317,83.0,2.7344,24.0,666.0,20.2,396.9,13.99\n0.17171,25.0,5.13,0.0,0.453,5.966,93.4,6.8185,8.0,284.0,19.7,378.08,14.44\n5.73116,0.0,18.1,0.0,0.532,7.061,77.0,3.4106,24.0,666.0,20.2,395.28,7.01\n3.1636,0.0,18.1,0.0,0.655,5.759,48.2,3.0665,24.0,666.0,20.2,334.4,14.13\n11.8123,0.0,18.1,0.0,0.718,6.824,76.5,1.794,24.0,666.0,20.2,48.45,22.74\n8.64476,0.0,18.1,0.0,0.693,6.193,92.6,1.7912,24.0,666.0,20.2,396.9,15.17\n0.02177,82.5,2.03,0.0,0.415,7.61,15.7,6.27,2.0,348.0,14.7,395.38,3.11\n0.13914,0.0,4.05,0.0,0.51,5.572,88.5,2.5961,5.0

In [31]:
### str型のCSVフォーマットをシリアライズする場合
from sagemaker.serializers import CSVSerializer

serialized = CSVSerializer().serialize(data_str)
print(type(serialized))
print(serialized)
serialized

<class 'str'>
0.25387,0.0,6.91,0.0,0.448,5.399,95.3,5.87,3.0,233.0,17.9,396.9,30.81
0.01951,17.5,1.38,0.0,0.4161,7.104,59.5,9.2229,3.0,216.0,18.6,393.24,8.05
4.64689,0.0,18.1,0.0,0.614,6.98,67.6,2.5329,24.0,666.0,20.2,374.68,11.66


'0.25387,0.0,6.91,0.0,0.448,5.399,95.3,5.87,3.0,233.0,17.9,396.9,30.81\n0.01951,17.5,1.38,0.0,0.4161,7.104,59.5,9.2229,3.0,216.0,18.6,393.24,8.05\n4.64689,0.0,18.1,0.0,0.614,6.98,67.6,2.5329,24.0,666.0,20.2,374.68,11.66'

In [None]:
### str型のCSVフォーマットをシリアライズする場合
from sagemaker.serializers import CSVSerializer

serialized = CSVSerializer().serialize(data_str)
print(type(serialized))
print(serialized)
serialized

In [38]:
from sagemaker.serializers import NumpySerializer

serialized = NumpySerializer().serialize(data_str)
print(type(serialized))
print(serialized)
serialized

<class 'bytes'>
b"\x93NUMPY\x01\x00v\x00{'descr': '<U216', 'fortran_order': False, 'shape': (), }                                                            \n0\x00\x00\x00.\x00\x00\x002\x00\x00\x005\x00\x00\x003\x00\x00\x008\x00\x00\x007\x00\x00\x00,\x00\x00\x000\x00\x00\x00.\x00\x00\x000\x00\x00\x00,\x00\x00\x006\x00\x00\x00.\x00\x00\x009\x00\x00\x001\x00\x00\x00,\x00\x00\x000\x00\x00\x00.\x00\x00\x000\x00\x00\x00,\x00\x00\x000\x00\x00\x00.\x00\x00\x004\x00\x00\x004\x00\x00\x008\x00\x00\x00,\x00\x00\x005\x00\x00\x00.\x00\x00\x003\x00\x00\x009\x00\x00\x009\x00\x00\x00,\x00\x00\x009\x00\x00\x005\x00\x00\x00.\x00\x00\x003\x00\x00\x00,\x00\x00\x005\x00\x00\x00.\x00\x00\x008\x00\x00\x007\x00\x00\x00,\x00\x00\x003\x00\x00\x00.\x00\x00\x000\x00\x00\x00,\x00\x00\x002\x00\x00\x003\x00\x00\x003\x00\x00\x00.\x00\x00\x000\x00\x00\x00,\x00\x00\x001\x00\x00\x007\x00\x00\x00.\x00\x00\x009\x00\x00\x00,\x00\x00\x003\x00\x00\x009\x00\x00\x006\x00\x00\x00.\x00\x00\x009\x00\x00\x00,\x00\x00\x003\x00\x00

b"\x93NUMPY\x01\x00v\x00{'descr': '<U216', 'fortran_order': False, 'shape': (), }                                                            \n0\x00\x00\x00.\x00\x00\x002\x00\x00\x005\x00\x00\x003\x00\x00\x008\x00\x00\x007\x00\x00\x00,\x00\x00\x000\x00\x00\x00.\x00\x00\x000\x00\x00\x00,\x00\x00\x006\x00\x00\x00.\x00\x00\x009\x00\x00\x001\x00\x00\x00,\x00\x00\x000\x00\x00\x00.\x00\x00\x000\x00\x00\x00,\x00\x00\x000\x00\x00\x00.\x00\x00\x004\x00\x00\x004\x00\x00\x008\x00\x00\x00,\x00\x00\x005\x00\x00\x00.\x00\x00\x003\x00\x00\x009\x00\x00\x009\x00\x00\x00,\x00\x00\x009\x00\x00\x005\x00\x00\x00.\x00\x00\x003\x00\x00\x00,\x00\x00\x005\x00\x00\x00.\x00\x00\x008\x00\x00\x007\x00\x00\x00,\x00\x00\x003\x00\x00\x00.\x00\x00\x000\x00\x00\x00,\x00\x00\x002\x00\x00\x003\x00\x00\x003\x00\x00\x00.\x00\x00\x000\x00\x00\x00,\x00\x00\x001\x00\x00\x007\x00\x00\x00.\x00\x00\x009\x00\x00\x00,\x00\x00\x003\x00\x00\x009\x00\x00\x006\x00\x00\x00.\x00\x00\x009\x00\x00\x00,\x00\x00\x003\x00\x00\x000\x00\x00\x0

SageMakerの動き
* クライアント側でデータがシリアライズされる。シリアルデータを推論エンドポイントに送る。
* ====== SageMaker 内部 ==========
* 推論エンドポイントは、シリアル化されたデータを受け取る
* SageMakerのコードで、デシリアライズする。
* input_fn実行
* predict_fn実行
* output_fn実行
* データをシリアライズする。
* クライアントに送信
* ====== SageMaker 内部 ==========
* クライアント側で、デシリアライズする。

# デシリアライズの確認
クライアントは、推論エンドポイントからシリアルデータを受け取りますので、それをクライアント側でデシリアライズします。


GitHubのソースコード

https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/deserializers.py


inputは、推論エンドポイントから受信したシリアルデータ。形式はJSONやndarray

outputは、クラスで指定した形式（JSON, ndarray, pandasなど）


LightGBMは推論結果をndarray型で出力するので、ndarray型をシリアライズして、クライアントに渡すことを想定する。

In [43]:
from sagemaker.deserializers import PandasDeserializer

In [44]:
import botocore
import json
from io import BytesIO

In [45]:
# 返却したいオブジェクト
body_json = {
    "aaa": 3,
    "bbb": [
        {
            "ccc": "ddd"
        }
    ]
}

# エンコード。(encode()はデフォルトでutf-8。)
body_encoded = json.dumps(body_json).encode()

# StreamingBodyへ整形する。
body = botocore.response.StreamingBody(BytesIO(body_encoded),len(body_encoded))

In [46]:
#deserialized = PandasDeserializer().deserialize(body, 'text/csv')
deserialized = PandasDeserializer().deserialize(body, 'application/json') ### JSONがdeserializerのインプット


In [47]:
print(type(deserialized))
print('='*30)
print(deserialized)
print('='*30)
deserialized

<class 'pandas.core.frame.DataFrame'>
   aaa             bbb
0    3  {'ccc': 'ddd'}


Unnamed: 0,aaa,bbb
0,3,{'ccc': 'ddd'}


In [52]:
print(body_nparr)

[19.95642073 27.84489184 23.74743743]


In [72]:
from sagemaker.deserializers import NumpyDeserializer

# 返却したいオブジェクト
body_json = {
    "aaa": 3,
    "bbb": [
        {
            "ccc": "ddd"
        }
    ]
}
body_nparr = np.array([
                        19.95642073217597,
                        27.844891841022335,
                        23.747437427003455
                        ])

# エンコード。(encode()はデフォルトでutf-8。)
body_encoded = json.dumps(body_json).encode()
body_encoded2 = body_nparr.tobytes()

# StreamingBodyへ整形する。
body = botocore.response.StreamingBody(BytesIO(body_encoded),len(body_encoded))
body2 = botocore.response.StreamingBody(BytesIO(body_encoded2),len(body_encoded2))

#deserialized = NumpyDeserializer().deserialize(body, 'application/json') ### JSONがdeserializerのインプット
deserialized = NumpyDeserializer().deserialize(body2, 'application/x-npy') ### JSONがdeserializerのインプット

print(type(deserialized))
print('='*30)
print(deserialized)
print('='*30)
deserialized

OSError: Failed to interpret file <_io.BytesIO object at 0x7efcaf7732c0> as a pickle

In [64]:
print(type(body_encoded))
print(body_encoded)

print(type(body))
print(body)


print(type(body_nparr))
print(body_nparr)

print(type(body_encoded2))
print(body_encoded2)

print(type(body2))
print(body2)

<class 'bytes'>
b'{"aaa": 3, "bbb": [{"ccc": "ddd"}]}'
<class 'botocore.response.StreamingBody'>
<botocore.response.StreamingBody object at 0x7efcafb4faf0>
<class 'numpy.ndarray'>
[19.95642073 27.84489184 23.74743743]
<class 'bytes'>
b'\x84\xe95\xfd\xd7\xf43@!\xd9\xe9\xd4J\xd8;@F\xc9(\x0fX\xbf7@'
<class 'botocore.response.StreamingBody'>
<botocore.response.StreamingBody object at 0x7efcafb4f460>


# 参考

botocore.response

https://botocore.amazonaws.com/v1/documentation/api/latest/reference/response.html

raw_streamを入力する必要がある。


バイナリ I/O
https://docs.python.org/ja/3/library/io.html#binary-i-o


BytesIO はインメモリーのバイナリストリームです:

f = io.BytesIO(b"some initial binary data: \x00\x01")

In [77]:
from sagemaker.deserializers import NumpyDeserializer


# StreamingBodyへ整形する。
body = botocore.response.StreamingBody(BytesIO(b'{"hogehoge":1}'),len(b'{"hogehoge":1}'))

deserialized = NumpyDeserializer().deserialize(body, 'application/json') ### JSONがdeserializerのインプット
#deserialized = NumpyDeserializer().deserialize(body, 'application/x-npy') ### JSONがdeserializerのインプット

print(type(deserialized))
print('='*30)
print(deserialized)
print('='*30)
deserialized

<class 'numpy.ndarray'>
{'hogehoge': 1}


array({'hogehoge': 1}, dtype=object)

In [78]:
from sagemaker.deserializers import NumpyDeserializer

# StreamingBodyへ整形する。
body = botocore.response.StreamingBody(BytesIO(b'{"hogehoge":1}'),len(b'{"hogehoge":1}'))

deserialized = NumpyDeserializer().deserialize(body, 'application/x-npy') ### ndarrayがdeserializerのインプット

print(type(deserialized))
print('='*30)
print(deserialized)
print('='*30)
deserialized

OSError: Failed to interpret file <_io.BytesIO object at 0x7efcaf764e00> as a pickle

In [None]:
body_nparr = np.array([
                        19.95642073217597,
                        27.844891841022335,
                        23.747437427003455
                        ])

In [105]:
# np.load(io.BytesIO(stream.read()), allow_pickle=self.allow_pickle)　が動かないとエラー

np.load(BytesIO(b'[1,1,1]'), allow_pickle='allow_pickle') # np.load()でエラー発生

OSError: Failed to interpret file <_io.BytesIO object at 0x7efcaf473090> as a pickle

In [83]:
body_nparr = np.array([
                        19.95642073217597,
                        27.844891841022335,
                        23.747437427003455
                        ])

In [85]:
body_nparr

array([19.95642073, 27.84489184, 23.74743743])

In [86]:
np.save('hoge', body_nparr)

In [101]:
body = botocore.response.StreamingBody(b'{"hogehoge":1}',len(b'{"hogehoge":1}'))

In [102]:
body.read()

AttributeError: 'bytes' object has no attribute 'read'

In [99]:
body.seek()

UnsupportedOperation: seek

In [90]:
BytesIO(b'{"hogehoge":1}')

<_io.BytesIO at 0x7efcaf7731d0>


nupy.load()

https://numpy.org/doc/stable/reference/generated/numpy.load.html

The file to read. File-like objects must support the seek() and read() methods and must always be opened in binary mode. 

In [116]:
BytesIO(b'[1,1,1]').seek(50000)

50000

In [119]:
BytesIO(b'[1,1,1]').read(10000)

b'[1,1,1]'

BytesIOはseekもreadもできる。

In [132]:
np.load(BytesIO(b' a'), allow_pickle=True) # np.load()でエラー発生

OSError: Failed to interpret file <_io.BytesIO object at 0x7efcaf3def90> as a pickle

b'aaaa'のバイト列がいけてないのか？pickleであることを示す文字列がない？で、seek()で失敗している？？


中身が想定しているものではないのかも
https://teratail.com/questions/302899

ファイルがおかしい場合にエラーとなっている事例のようだ


In [141]:
np.load(BytesIO(body_nparr.dumps()), allow_pickle=True) # np.load()でエラー発生

array([19.95642073, 27.84489184, 23.74743743])

# 解答
numpyのndarrayを、ファイルではなく、pickle文字列に変換する必要がある。
そのために、numpy.ndarray.dumps()を使う

https://numpy.org/doc/stable/reference/generated/numpy.ndarray.dumps.html

In [143]:
np.load(BytesIO(body_nparr.dumps()), allow_pickle=True) # np.load()でエラー発生

### こうすることで、doby_nparrがpickleのstringに変換され、BytesIO()によってseek()もread()もできるストリーム（file_alike)に変換される。
### np.load()でこれを読み込むことができる。

array([19.95642073, 27.84489184, 23.74743743])

In [135]:
body_nparr = np.array([
                        19.95642073217597,
                        27.844891841022335,
                        23.747437427003455
                        ])

In [138]:
body_nparr.tobytes()

b'\x84\xe95\xfd\xd7\xf43@!\xd9\xe9\xd4J\xd8;@F\xc9(\x0fX\xbf7@'