# MNIST by TensorFlow on SageMaker

<small>Copyright © 2018 by Arata Furukawa. (http://ornew.net)</small>

<small style="font-size:0.5em;">この資料は「機械学習アプリを「賢く」作る：Amazon SageMakerクラウド・ハンズオン」のために作成されたものです。ノートブックをダウンロードし、持ち帰って自由に実行・加工していただいて構いませんが、解説文の外部への公開・転載はご遠慮ください。</small>

---

まずは、「ディープラーニングの"Hello world"」と呼ばれるMNISTの実装を通して、以下の2つを学びます。ついでにJupyterの使い方も解説します。

- SageMakerでの独自モデル開発プロセス
- TensorFlowの使い方

Jupyter上でグラフを表示するためのおまじないです。

In [None]:
output_path = 's3://marukawa0224-XX/tensorflow'
base_job_name = 'marukawa0224-XX-mnist-job'
print('output_path:\n\t{}'.format(output_path))
print('base_job_name:\n\t{}'.format(base_job_name))

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np

TensorFlowをインポートします。慣習的に、tfという名前でインポートすることが多いです。

SageMakerでは現在1.4.0がサポートされています。バージョンが1.4.0である事を確認してください。

In [None]:
import tensorflow as tf
tf.__version__

## SageMakerでTensorFlowモデルが動く流れ

TensorFlowの高レベルAPIの**Estimator**という仕様が（ほぼ）そのままSageMakerで実行できます。

TensorFlowのEstimatorは、

- 学習（training）
- 評価（evaluation）
- 予測（prediction）
- サービング（APIサーバ化）のためのモデルのエクスポート

を担う、機械学習モデルの高水準なインターフェイスです。

予めTensorFlowに用意されたEstimatorを使う他、**EstimatorSpec**の要件を満たすことで、自由なモデルを使った**オリジナルのEstimatorを作る**ことができます。

EstimatorをSageMakerのAPIに渡すことで、

- 学習や評価のためのインスタンスを実行
- 予測を行うAPIエンドポイント作成、公開

などが容易に行なえます。

 名前 | 役割
:----|:----
 tensorflow.estimator.Estimator | モデルの学習、評価、予測、出力などをするインターフェイス
 tensorflow.estimator.EstimatorSpec | Estimatorとして機能するために必要なもの
 sagemaker.tensorflow.TensorFlow | Estimatorの実行に必要なものを定義したプログラムを渡すと、実際のクラウド環境に自動で展開して実行する事ができる

カスタムされたEstimatorの作成の詳細は、[公式ガイド](https://www.tensorflow.org/get_started/custom_estimators)も参考にしてください。

### SageMakerに渡すプログラムの要件

SageMakerに渡すプログラムには、以下の4つの関数が定義されている必要があります。

1. `train_input_fn`
2. `eval_input_fn`
3. `serving_input_fn`
4. `model_fn` または `keras_model_fn` または `estimator_fn` のいずれか

 名前 | 役割
:----|:----
 train_input_fn | 学習データの読み込みと前処理をする関数です。
 eval_input_fn | 評価データの読み込みと前処理をする関数です。
 serving_input_fn | 学習やサービング時にモデルの入力となるプレースホルダなどを定義します。
 model_fn | EstimatorSpecを返す関数です。独自モデルを使いたいときはこれを定義します。
 keras_model_fn | Kerasで実装する場合はこの関数でKerasのEstimatorを返します。
 estimator_fn | 予めTensorFlowで定義されている簡単なEstimatorを使うときは、この関数でEstimatorを返します。

別の言い方をすれば、**たった4つ関数を定義するだけ**でSageMakerで動かすモデルは完成です！

実際にこれらの関数を簡単に実装していきましょう。

## 学習・評価の入力データの読み込み

**サンプル**の要件を定めます。

 名前 | 定義
:----|:----
 フィーチャ | モデルの入力データのこと。
 サンプル  | 学習・評価データのこと。
 データセット | サンプルの集合のこと。

In [None]:
EXAMPLE_SPEC = {
    'image': tf.FixedLenFeature([28 * 28], tf.float32),
    'label': tf.FixedLenFeature([]       , tf.int64),
}

### サンプルデータの構築・アップロード

学習・評価までのサンプルデータの流れは以下のようになります。

1. データセットを用意する
2. 各データをtf.Exampleにシリアライズする
3. シリアライズしたデータセットをTFRecordとして保存する
4. **事前にS3バケット上にアップロードする**
5. 学習等を実行するときにS3パスを指定する
6. クラウド上の実行コンテナにS3パスのデータがマウントされる
7. `*_input_fn`関数の第一引数にマウントパスが渡される
8. TFRecordファイルを読み込む
9. デシリアライズされフィーチャとしてモデルに入力される

TensorFlowでは、データセットのファイル形式として**TFRecord**を推奨しています。TFRecordは大規模データを省メモリ＆並列に処理することができます。

データを準備し`EXAMPLE_SPEC`に合わせて変換しなくてはいけません。この作業はモデルの開発では最初に1回行うもので、その後も頻繁に行うものではありません。

TensorFlowにはチュートリアル用にMNISTデータセットをダウンロードするメソッドが用意されているので、まずはデータセットをダウンロードします。

In [None]:
train, test = tf.keras.datasets.mnist.load_data()

`tf.Example`に変換し、シリアライズして`TFRecordWriter`で書き込みます。長く見えますが、やっていることは単純です。`feature`の形式が`EXAMPLE_SPEC`と一致していることが重要です。

In [None]:
def convert_tfrecord(filename, images, labels):
    options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.GZIP)
    with tf.python_io.TFRecordWriter(filename, options) as tfrecord:
        for image, label in zip(images, labels):
            example = tf.train.Example(features=tf.train.Features(feature={
                'image' : tf.train.Feature(
                    float_list=tf.train.FloatList(value=image.flatten() / 255.)),
                'label' : tf.train.Feature(
                    int64_list=tf.train.Int64List(value=[np.argmax(label)])),
            }))
            tfrecord.write(example.SerializeToString())

In [None]:
%%time
tf.gfile.MakeDirs('data/mnist')
convert_tfrecord('data/mnist/train.tfr', train[0], train[1])
convert_tfrecord('data/mnist/test.tfr' , test[0] , test[1])

`data/mnist`ディレクトリに`train.tfr`と`test.tfr`のファイルができていれば成功です。

In [None]:
!ls data/mnist

### サンプルデータの読み込みとフィーチャへの変換

先程「SageMakerに渡すプログラムの要件」で確認した関数のうち、学習と評価のためのデータ入力部分を書きましょう。

S3上にアップロードされたTFRecord形式のデータを読み込みます。TensorFlow Dataset APIを使うと簡単に扱うことができます。

In [None]:
import os

def train_input_fn(data_dir, params):
    tfrecord = os.path.join(data_dir, 'test.tfr')
    return _input_fn(tfrecord, params, training=True)
    
def eval_input_fn(data_dir, params):
    tfrecord = os.path.join(data_dir, 'test.tfr')
    return _input_fn(tfrecord, params, training=False)

def _parse_proto(proto):
    parsed = tf.parse_single_example(proto, EXAMPLE_SPEC)
    image = parsed['image']
    label = tf.one_hot(parsed['label'], 10)
    return image, label

def _input_fn(tfrecord, params, training):
    dataset = (tf.data.TFRecordDataset(tfrecord, compression_type=tf.python_io.TFRecordCompressionType.GZIP)
        .map(_parse_proto)
        .shuffle(1000))
    # 学習のときは無限に生成する
    dataset = dataset.repeat() if training else dataset
    images, labels = (dataset
        .batch(params.get('batch_size', 512))
        .make_one_shot_iterator()
        .get_next())
    return {'images': images}, labels

## 予測時の入力データの定義

学習、評価時のモデルの入力関数では実データを読み込みましたが、予測をAPI化してサービングの入力を受ける場合、実際にデータが来るまで中身がわかりません。プレースホルダを定義してフィーチャのデータ形式を明確にします。

In [None]:
def serving_input_fn(hparams):
    features = {
        'images': tf.placeholder(tf.float32, [None, 784])
    }
    return tf.estimator.export.build_raw_serving_input_receiver_fn(features)()

## Estimatorの定義

最後にもう一つ、何らかのEstimatorを定義しなくてはいけません。試しに、ソフトマックス回帰モデルを実装してみましょう！

In [None]:
def model_fn(features, labels, mode, params):    
    x = features['images']
    
    W = tf.get_variable('W', [784,10])
    b = tf.get_variable('b', [10])
    
    y = tf.nn.softmax(tf.matmul(x, W) + b)
    y_indices = tf.argmax(input=y, axis=1) # 予測したラベルのインデクス
    
    predictions = {
        'classes': y_indices,
        'probabilities': y,
    }
    export_outputs = {
        'predictions': tf.estimator.export.PredictOutput(predictions),
    }
    
    if mode == tf.estimator.ModeKeys.PREDICT:
        return tf.estimator.EstimatorSpec(
            mode=mode,
            predictions=predictions,
            export_outputs=export_outputs)

    learning_rate = params.get('learning_rate', 0.5)
    global_step = tf.train.get_or_create_global_step()
    
    cross_entropy = tf.reduce_mean(
        -tf.reduce_sum(labels * tf.log(y), reduction_indices=[1]))
    
    optimizer = tf.train.GradientDescentOptimizer(learning_rate)
    fit = optimizer.minimize(cross_entropy, global_step)
    
    return tf.estimator.EstimatorSpec(
        mode=mode,
        predictions=predictions,
        loss=cross_entropy,
        train_op=fit,
        export_outputs=export_outputs)

細かい実装は、学習を走らせているときに解説いたします。

## SageMakerで実行する

SageMakerを操作するために、セッションを作成します。

In [None]:
import sagemaker
from sagemaker import get_execution_role

session = sagemaker.Session()
role = sagemaker.get_execution_role()

先程TFRecordを保存した`data`ディレクトリをS3にアップロードします。

ただ、参加者全員がそれぞれにアップロードするとそれなりの容量になってしまうので、ハンズオンでは事前にアップロードしたものをご利用ください。もし持ち帰ってご自身で実行される場合は、以下のプログラムを実行すればアップロードできます。

```python
data_dir = session.upload_data(
    'data/mnist', # ローカルディレクトリ
    '',           # バケット名を入れてください
    'data/mnist') # S3パス
```

In [None]:
# 事前にこちらがアップロードしたS3パスです
data_dir = 's3://sagemaker-seminar-data/mnist'

`Estimator`を作ります。この`Estimator`は、TensorFlowの`Estimator`をSageMakerのAPIでラップしたものです。

第一引数に、`Estimator`の本体となるプログラムのパスを指定します。ここまでで定義した関数を事前に`code/mnist.py`ファイルに保存してありますので、そのパスを指定します。

In [None]:
!cat code/mnist.py

In [None]:
hyperparameters = {
    'learning_rate': 0.5,
    'batch_size': 512,
}

from sagemaker.tensorflow import TensorFlow
estimator = TensorFlow(
    entry_point='./code/mnist.py',
    hyperparameters=hyperparameters,
    role=role,
    output_path=output_path,
    code_location=output_path,
    training_steps=1000,
    evaluation_steps=100,
    base_job_name=base_job_name,
    
    #########################################################
    # ハンズオンでは以下の設定を絶対に変えないでください！！！
    # 他の方が実行できなくなったり、高額な課金が発生することがあります。
    # 異常な課金等を確認した場合、請求額を追加徴収させて頂きます。
    #########################################################
    train_instance_count=1,
    train_instance_type='ml.m4.xlarge')

## 学習の実行

では、学習を実行しましょう！

estimatorのfitメソッドを呼び出すと、学習を実行します。

In [None]:
%%time
estimator.fit(data_dir)

## ソフトマックス回帰モデルの解説

学習には少し時間がかかるので、その間に`model_fn`とその中身の解説をします。

`model_fn`の引数は4つです。

### features

入力となる特徴のデータです。今回は`'images'`キーで登録された784次元の数字を任意長のバッチ単位で受け取ります。

### labels

入力となるラベルのデータです。今回は、0から9までの数字のOHV表現です。

OHV（One Hot Vector）とは、ベクトルの要素のうち1つだけが1であるようなベクトルです。OHV表現と数字の対応は以下のようになります。

```
                           OHV <==> Number
[1, 0, 0, 0, 0, 0, 0, 0, 0, 0] <==> 0
[0, 1, 0, 0, 0, 0, 0, 0, 0, 0] <==> 1
[0, 0, 1, 0, 0, 0, 0, 0, 0, 0] <==> 2
[0, 0, 0, 1, 0, 0, 0, 0, 0, 0] <==> 3
[0, 0, 0, 0, 1, 0, 0, 0, 0, 0] <==> 4
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0] <==> 5
[0, 0, 0, 0, 0, 0, 1, 0, 0, 0] <==> 6
[0, 0, 0, 0, 0, 0, 0, 1, 0, 0] <==> 7
[0, 0, 0, 0, 0, 0, 0, 0, 1, 0] <==> 8
[0, 0, 0, 0, 0, 0, 0, 0, 0, 1] <==> 9
```

１つのベクトルを、確率分布とみなすことができます。「`i`番目の値が大きいということは、数字`"i"`の確率が高い」と読み替えて頂いてもここでは差し支えありません。

### mode

実行モードです。`tf.estimator.ModeKeys.*`に定義される定数です。学習や評価で処理を分けることができます。

### params

実行するときに指定されるハイパーパラメータです。ここでは学習率だけを使っています。ハイパーパラメータはユーザが自由に設定することができます。

### 層の構築

ニューラルネットワークは以下の式に準ずる層の積み重ねとみなされます。

$$ y = \phi (xW + b) $$

- $x$
    - 入力データ
    - 今回は任意個の28x28=784ピクセルのグレースケール画像（1次元配列）
- $W$
    - 重みパラメータ
    - 学習したい定数値
    - 今回は入力との行列積
- $b$
    - バイアスパラメータ
    - 学習したい定数値
- $\phi$
    - 活性化関数
    - 今回はsoftmax
- $y$
    - 出力
    - 10個の値を持つ1次元配列

softmaxは、独立した固定長の分類器で頻繁に用いられます。

$$ \text{softmax}(x)_i = \frac{\exp(x_i)}{\sum_j \exp(x_j)} $$

これを活性化関数とすると、このモデルは以下の式で表せます。

$$ y = \text{softmax}(xW + b) $$

実際に、TensorFlowで上記の式を実装した結果が、`model_fn`の下記になります。

```python
x = features['images']

W = tf.get_variable('W', [784,10])
b = tf.get_variable('b', [10])

y = tf.nn.softmax(tf.matmul(x, W) + b)
```

$W$と$b$は、学習したい**パラメータ（変数）**なので、**実行が終わっても値が保持されてほしい**ですね。こういう値は`tf.get_variable`を用いて作成します。

フィーチャの形式は、`serving_input_fn`で`'tf.placeholder(tf.float32, [None, 784])`と定義しました。`[None,784]`は、値の形状を示します。**Noneは任意長を示すので、1次元目が任意長、2次元目が784個となる二次元配列**という意味です。

`'W'`と`'b'`は変数名で、`[784,10]`と`[10]`は値の形状を示しています。


$$
y =  \text{softmax}(
\begin{bmatrix}
    \begin{array}{cccc}
      x_{1,1} & x_{1,2} & \ldots & x_{1,784} \\
      x_{2,1} & x_{2,2} & \ldots & x_{2,784} \\
      \vdots & \vdots & \ddots & \vdots \\
      x_{n,1} & x_{n,2} & \ldots & x_{n,784}
    \end{array}
\end{bmatrix}
\begin{bmatrix}
    \begin{array}{cccc}
      W_{1,1} & W_{1,2} & \ldots & W_{1,10} \\
      W_{2,1} & W_{2,2} & \ldots & W_{2,10} \\
      \vdots & \vdots & \ddots & \vdots \\
      W_{784,1} & W_{784,2} & \ldots & W_{784,10}
    \end{array}
\end{bmatrix} + \begin{bmatrix}
    \begin{array}{cccc}
      b_{1} \\
      b_{2} \\
      \vdots \\
      b_{10}
    \end{array}
\end{bmatrix}
)
$$

$y$の式は、数式そのままですね。これでモデルが定義できました。

モデルの予測と、出力したい値を定義します。

```python
y_indices = tf.argmax(input=y, axis=1) # 予測したラベルのインデクス

predictions = {
    'classes': y_indices,
    'probabilities': y,
}
export_outputs = {
    'predictions': tf.estimator.export.PredictOutput(predictions),
}
```

`argmax`は入力のうち**一番値が大きな要素のインデクス**を返します。今回、OHVがインデクスと対応しているので、そのまま**最も確率が高い数字**を示すことになります。

予測モードのときはここで終了です。とても簡単でした。

### 誤差を定義する

モデルを学習したり、評価するときには、その方法を記述しなくてはいけません。

モデルの出力$y$と、理想的な答えの出力が**どの程度違うのか**を定式化します。これを**誤差**（または**損失**）と呼びます。

どの程度「違う」のかを示す**誤差が小さくなるほど、精度が高い**と言えますね。ディープラーニングでは、**誤差が小さくなるようにモデルの各パラメータを修正することで、モデルを学習させます**。

誤差を定義しましょう。理想的な答え(ここではラベルと呼びます)を$y'$とします。2つの確率分布がどの程度違うのかを示すために、**交差エントロピー**という関数を使います。交差エントロピーは以下の式で定義されます。

$$ H_{y’}(y) = -\sum_i y’_i \log(y_i) $$

実装しましょう。

```python
cross_entropy = tf.reduce_mean(
    -tf.reduce_sum(labels * tf.log(y), reduction_indices=[1]))
```

任意の数のデータ入力を同時に受け取るため、それぞれのデータごとの誤差の平均を最終的な誤差とします。`reduce_mean`で平均を計算していることに注意してください。

### 誤差を小さくするようにパラメータを最適化する

さて、誤差を定義したので、これを小さくするようにパラメータを修正しなくてはなりません。

最適化アルゴリズムには様々な手法が存在しますが、ここでは全ての基本となる**勾配降下法**を用いましょう。

基本的な最適化アルゴリズムは事前にTensorFlowに実装されています。勾配降下法によってパラメータを最適化するためには、`tf.train.GradientDescentOptimizer`を用います。

```python
optimizer = tf.train.GradientDescentOptimizer(learning_rate)
fit = optimizer.minimize(cross_entropy, global_step)
```

これで、学習率0.5の最急勾配降下法により、**交差エントロピーを最小化するように**全ての学習用パラメータを最適化する`fit`を定義しました。

`EstimatorSpec`に、誤差や学習オペレーションなどを指定することで、学習と評価時の定義は終わりです。

```python
return tf.estimator.EstimatorSpec(
    mode=mode,
    predictions=predictions,
    loss=cross_entropy,
    train_op=fit,
    export_outputs=export_outputs)
```

## モデルのデプロイ

学習の実行が正常に終わったら、モデルをデプロイしましょう。これも少し時間がかかります。

In [None]:
api = estimator.deploy(
    #########################################################
    # ハンズオンでは以下の設定を絶対に変えないでください！！！
    # 他の方が実行できなくなったり、高額な課金が発生することがあります。
    # 異常な課金があった場合、請求額を追加徴収させて頂きます。
    #########################################################
    initial_instance_count=1,
    instance_type='ml.c2.xlarge')
print(api.endpoint)

## 予測の実行

デプロイしたモデルで実際に予測をしてみましょう！

まずは予測機（Predictor）を作ります。

In [None]:
from sagemaker.tensorflow.predictor import tf_serializer, tf_deserializer
predictor = sagemaker.RealTimePredictor(endpoint=api.endpoint,
                              deserializer=tf_deserializer, 
                              serializer=tf_serializer,
                              content_type='application/octet-stream')

`predict`メソッドを使うと、AWS上にデプロイされたTensorFlowモデルで予測を行うことができます。

試しに、MNISTのテストデータセットの幾つかを推論してみましょう。

In [None]:
def create_request(data):
    from sagemaker.tensorflow.tensorflow_serving.apis import predict_pb2
    tensor_proto = tf.make_tensor_proto(
        values=data, shape=[1, 784], dtype=tf.float32)
    request = predict_pb2.PredictRequest()
    request.model_spec.name = 'generic_model'
    request.model_spec.signature_name = 'predictions'
    request.inputs['images'].CopyFrom(tensor_proto)
    return request

def predict(image, label):
    result = predictor.predict(create_request(image))
    predict = result.outputs['classes'].int64_val[0]
    print('答え={} 予測={}'.format(label, predict))
    plt.imshow(np.reshape(image, (28,28)), cmap='gray')
    plt.show()

In [None]:
for i in np.random.randint(0, len(mnist.test.labels), 10):
    image = mnist.test.images[i].tolist()
    label = np.argmax(mnist.test.labels[i])
    predict(image, label)

実際に手書きをして予測をしてみましょう。

※Chrome推奨

In [None]:
from IPython.display import HTML
HTML(open("input.html").read())

In [None]:
predict(np.asarray(data), '-')

TensorFlowのモデルを実際にAPI化するとき、推奨されるのは**TensorFlow Serving**というツールを使うことです。実際、SageMakerでデプロイしたモデルはTensorFlow Servingサーバとして公開されます。

TensorFlow Servingでは、**gRPC**という方式で通信を行います。通信データは**Protocol Buffers形式**です。先程のリクエストで作成しているのは、このProtocol Buffers形式のデータです。

TensorFlow Servingが提供している`.proto`ファイルを用いることで、Android Javaや、C#、Node.jsなど、様々な言語からgRPC通信でTensorFlowモデルを利用することができます。

作成したエンドポイントは、SDKを通して削除できます。次のノートブックに進む前に削除をお願いします。

In [None]:
session.delete_endpoint(api.endpoint)