In [1]:
import sys
import gzip
import shutil
import os

import tensorflow as tf
tf.disable_v2_behavior()
import numpy as np

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


Instructions for updating:
non-resource variables are not supported in the long term


# TensorFlowの低レベルAPIを使ってCNNを実装する

CNNを構築する上で, 畳み込み層を構築するラッパー関数と全結合そうを構築するラッパー関数を定義する.

In [2]:
def conv_layer(input_tensor, name, kernel_size, n_output_channels,
               padding_mode="SAME", strides=(1,1,1,1)):
    with tf.variable_scope(name):
        # 入力チャネルの個数を取得する, input_tensorの形状 : [バッチ * 幅 * 高さ * チャネル数]
        input_shape = input_tensor.get_shape().as_list()
        n_input_channels = input_shape[-1]
        
        weights_shape = (list(kernel_size) + [n_input_channels, n_output_channels])
        
        weights = tf.get_variable(name="_weights", shape=weights_shape)
        print(weights)
        biases = tf.get_variable(name="_biases", initializer=tf.zeros(shape=[n_output_channels]))
        print(biases)
        conv = tf.nn.conv2d(input=input_tensor,
                            filter=weights,
                            strides=strides,
                            padding=padding_mode)
        print(conv)
        conv = tf.nn.bias_add(conv, biases, name="net_pre-activation")
        print(conv)
        conv = tf.nn.relu(conv, name="activation")
        print(conv)
        
        return conv

In [3]:
# conv_layerの確認
g = tf.Graph()
with g.as_default():
    x = tf.placeholder(tf.float32, shape=[None, 28, 28, 1])
    conv_layer(x, name="convtest", kernel_size=(3, 3), n_output_channels=32)
    
del g, x

Instructions for updating:
Colocations handled automatically by placer.
<tf.Variable 'convtest/_weights:0' shape=(3, 3, 1, 32) dtype=float32_ref>
<tf.Variable 'convtest/_biases:0' shape=(32,) dtype=float32_ref>
Tensor("convtest/Conv2D:0", shape=(?, 28, 28, 32), dtype=float32)
Tensor("convtest/net_pre-activation:0", shape=(?, 28, 28, 32), dtype=float32)
Tensor("convtest/activation:0", shape=(?, 28, 28, 32), dtype=float32)


In [4]:
def fc_layer(input_tensor, name, n_output_units, activation_fn=None):
    with tf.variable_scope(name):
        # バッチ以外のshapeを取得
        input_shape = input_tensor.get_shape().as_list()[1:]
        n_input_units = np.prod(input_shape)
        if len(input_shape) > 1:
            input_tensor = tf.reshape(input_tensor, shape=(-1, n_input_units))
            
        weights_shape = [n_input_units, n_output_units]
        weights = tf.get_variable(name="_weights", shape=weights_shape)
        print(weights)
        biases = tf.get_variable(name="_biases", initializer=tf.zeros(shape=[n_output_units]))
        print(biases)
        layer = tf.matmul(input_tensor, weights)
        print(layer)
        layer = tf.nn.bias_add(layer, biases, name="net_pre-activation")
        print(layer)
        if activation_fn is None:
            return layer
        layer = activation_fn(layer, name="activation")
        print(layer)
        return layer

In [5]:
# fc_layerの確認
g = tf.Graph()
with g.as_default():
    x = tf.placeholder(tf.float32, shape=[None, 28, 28, 1])
    fc_layer(x, name="fctest", n_output_units=32, activation_fn=tf.nn.relu)
    
del g, x

<tf.Variable 'fctest/_weights:0' shape=(784, 32) dtype=float32_ref>
<tf.Variable 'fctest/_biases:0' shape=(32,) dtype=float32_ref>
Tensor("fctest/MatMul:0", shape=(?, 32), dtype=float32)
Tensor("fctest/net_pre-activation:0", shape=(?, 32), dtype=float32)
Tensor("fctest/activation:0", shape=(?, 32), dtype=float32)


この2つの関数を使ってCNNモデルを構築する`build_cnn`を定義する.

In [6]:
def build_cnn():
    # Xとyのプレースホルダを作成
    tf_x = tf.placeholder(tf.float32, shape=[None, 784], name="tf_x")
    tf_y = tf.placeholder(tf.int32, shape=[None], name="tf_y")
    
    # 入力を4次元のテンソルに変換, [バッチ * 幅 * 高さ * チャネル数(=1)]
    tf_x_image = tf.reshape(tf_x, shape=[-1, 28, 28, 1], name="tf_x_reshaped")
    
    # one-hotエンコーディング
    tf_y_onehot = tf.one_hot(indices=tf_y, depth=10, dtype=tf.float32, name="tf_y_onehot")
    
    # 第1層 : 畳み込み層1
    print("\nBuilding 1st layer: ")
    h1 = conv_layer(tf_x_image, name="conv_1",
                    kernel_size=(5, 5),
                    padding_mode="VALID",
                    n_output_channels=32)
    # MAXプーリング
    h1_pool = tf.nn.max_pool(h1,
                             ksize=[1, 2, 2, 1],
                             strides=[1, 2, 2, 1],
                             padding="SAME")
    
    # 第2層 : 畳み込み層
    print("\nBuilding 2nd layer: ")
    h2 = conv_layer(h1_pool, name="conv_2",
                    kernel_size=(5, 5),
                    padding_mode="VALID",
                    n_output_channels=64)
    # MAXプーリング
    h2_pool = tf.nn.max_pool(h2,
                             ksize=[1, 2, 2, 1],
                             strides=[1, 2, 2, 1],
                             padding="SAME")
    
    # 第3層 : 全結合層1
    print("\nBuilding 3rd layer: ")
    h3 = fc_layer(h2_pool, name="fc_3",
                  n_output_units=1024,
                  activation_fn=tf.nn.relu)
    # ドロップアウト
    keep_prob = tf.placeholder(tf.float32, name="fc_keep_prob")
    h3_drop = tf.nn.dropout(h3, keep_prob=keep_prob, name="dropout_layer")
    
    # 第4層 : 全結合層2 (線形活性化)
    print('\nBuilding 4th layer:')
    h4 = fc_layer(h3_drop, name='fc_4',
                  n_output_units=10, 
                  activation_fn=None)

    # 予測
    predictions = {
        'probabilities' : tf.nn.softmax(h4, name='probabilities'),
        'labels' : tf.cast(tf.argmax(h4, axis=1), tf.int32,
                           name='labels')
    }

    # 損失関数と最適化
    cross_entropy_loss = tf.reduce_mean(
        tf.nn.softmax_cross_entropy_with_logits(
            logits=h4, labels=tf_y_onehot),
        name='cross_entropy_loss')

    # オプティマイザ
    optimizer = tf.train.AdamOptimizer(learning_rate=1e-4)
    optimizer = optimizer.minimize(cross_entropy_loss,
                                   name='train_op')

    # 予測正解率を特定
    correct_predictions = tf.equal(
        predictions['labels'], 
        tf_y, name='correct_preds')

    accuracy = tf.reduce_mean(
        tf.cast(correct_predictions, tf.float32),
        name='accuracy')

    
def save(saver, sess, epoch, path='./model/'):
    if not os.path.isdir(path):
        os.makedirs(path)
    print('Saving model in %s' % path)
    saver.save(sess, os.path.join(path,'cnn-model.ckpt'),
               global_step=epoch)

    
def load(saver, sess, path, epoch):
    print('Loading model from %s' % path)
    saver.restore(sess, os.path.join(
            path, 'cnn-model.ckpt-%d' % epoch))

    
def train(sess, training_set, validation_set=None,
          initialize=True, epochs=20, shuffle=True,
          dropout=0.5, random_seed=None):

    X_data = np.array(training_set[0])
    y_data = np.array(training_set[1])
    training_loss = []

    # 変数を初期化
    if initialize:
        sess.run(tf.global_variables_initializer())

    np.random.seed(random_seed) # batch_generatorをシャッフルする
    for epoch in range(1, epochs+1):
        batch_gen = batch_generator(
                        X_data, y_data, 
                        shuffle=shuffle)
        avg_loss = 0.0
        for i,(batch_x,batch_y) in enumerate(batch_gen):
            feed = {'tf_x:0': batch_x, 
                    'tf_y:0': batch_y, 
                    'fc_keep_prob:0': dropout}
            loss, _ = sess.run(
                    ['cross_entropy_loss:0', 'train_op'],
                    feed_dict=feed)
            avg_loss += loss

        training_loss.append(avg_loss / (i+1))
        print('Epoch %02d Training Avg. Loss: %7.3f' % (
            epoch, avg_loss), end=' ')
        if validation_set is not None:
            feed = {'tf_x:0': validation_set[0],
                    'tf_y:0': validation_set[1],
                    'fc_keep_prob:0':1.0}
            valid_acc = sess.run('accuracy:0', feed_dict=feed)
            print(' Validation Acc: %7.3f' % valid_acc)
        else:
            print()

            
def predict(sess, X_test, return_proba=False):
    feed = {'tf_x:0': X_test, 
            'fc_keep_prob:0': 1.0}
    if return_proba:
        return sess.run('probabilities:0', feed_dict=feed)
    else:
        return sess.run('labels:0', feed_dict=feed)

In [7]:
import tensorflow as tf
import numpy as np

random_seed = 123

np.random.seed(random_seed)


# グラフを作成する
g = tf.Graph()
with g.as_default():
    tf.set_random_seed(random_seed)
    # モデルの構築
    build_cnn()

    # モデルを保存
    saver = tf.train.Saver()


Building 1st layer: 
<tf.Variable 'conv_1/_weights:0' shape=(5, 5, 1, 32) dtype=float32_ref>
<tf.Variable 'conv_1/_biases:0' shape=(32,) dtype=float32_ref>
Tensor("conv_1/Conv2D:0", shape=(?, 24, 24, 32), dtype=float32)
Tensor("conv_1/net_pre-activation:0", shape=(?, 24, 24, 32), dtype=float32)
Tensor("conv_1/activation:0", shape=(?, 24, 24, 32), dtype=float32)

Building 2nd layer: 
<tf.Variable 'conv_2/_weights:0' shape=(5, 5, 32, 64) dtype=float32_ref>
<tf.Variable 'conv_2/_biases:0' shape=(64,) dtype=float32_ref>
Tensor("conv_2/Conv2D:0", shape=(?, 8, 8, 64), dtype=float32)
Tensor("conv_2/net_pre-activation:0", shape=(?, 8, 8, 64), dtype=float32)
Tensor("conv_2/activation:0", shape=(?, 8, 8, 64), dtype=float32)

Building 3rd layer: 
<tf.Variable 'fc_3/_weights:0' shape=(1024, 1024) dtype=float32_ref>
<tf.Variable 'fc_3/_biases:0' shape=(1024,) dtype=float32_ref>
Tensor("fc_3/MatMul:0", shape=(?, 1024), dtype=float32)
Tensor("fc_3/net_pre-activation:0", shape=(?, 1024), dtype=float3

ここで, データをロードする.

In [8]:
if (sys.version_info > (3, 0)):
    writemode = 'wb'
else:
    writemode = 'w'

zipped_mnist = [f for f in os.listdir('./')
                if f.endswith('ubyte.gz')]
for z in zipped_mnist:
    with gzip.GzipFile(z, mode='rb') as decompressed, open(z[:-3], writemode) as outfile:
        outfile.write(decompressed.read())

In [9]:
import struct
import numpy as np


def load_mnist(path, kind='train'):
    """Load MNIST data from `path`"""
    labels_path = os.path.join(path,
                               '%s-labels-idx1-ubyte'
                                % kind)
    images_path = os.path.join(path,
                               '%s-images-idx3-ubyte'
                               % kind)

    with open(labels_path, 'rb') as lbpath:
        magic, n = struct.unpack('>II',
                                 lbpath.read(8))
        labels = np.fromfile(lbpath,
                             dtype=np.uint8)

    with open(images_path, 'rb') as imgpath:
        magic, num, rows, cols = struct.unpack(">IIII",
                                               imgpath.read(16))
        images = np.fromfile(imgpath,
                             dtype=np.uint8).reshape(len(labels), 784)

    return images, labels


X_data, y_data = load_mnist('./', kind='train')
print('Rows: %d,  Columns: %d' % (X_data.shape[0], X_data.shape[1]))
X_test, y_test = load_mnist('./', kind='t10k')
print('Rows: %d,  Columns: %d' % (X_test.shape[0], X_test.shape[1]))

X_train, y_train = X_data[:50000,:], y_data[:50000]
X_valid, y_valid = X_data[50000:,:], y_data[50000:]

print('Training:   ', X_train.shape, y_train.shape)
print('Validation: ', X_valid.shape, y_valid.shape)
print('Test Set:   ', X_test.shape, y_test.shape)

Rows: 60000,  Columns: 784
Rows: 10000,  Columns: 784
Training:    (50000, 784) (50000,)
Validation:  (10000, 784) (10000,)
Test Set:    (10000, 784) (10000,)


In [10]:
def batch_generator(X, y, batch_size=64, 
                    shuffle=False, random_seed=None):
    
    idx = np.arange(y.shape[0])
    
    if shuffle:
        rng = np.random.RandomState(random_seed)
        rng.shuffle(idx)
        X = X[idx]
        y = y[idx]
    
    for i in range(0, X.shape[0], batch_size):
        yield (X[i:i+batch_size, :], y[i:i+batch_size])

In [11]:
mean_vals = np.mean(X_train, axis=0)
std_val = np.std(X_train)

X_train_centered = (X_train - mean_vals)/std_val
X_valid_centered = (X_valid - mean_vals)/std_val
X_test_centered = (X_test - mean_vals)/std_val

del X_data, y_data, X_train, X_valid, X_test

読み込んだデータに対して, 学習を回していく.

In [12]:
## TFセッションの作成とCNNモデルのトレーニング

with tf.Session(graph=g) as sess:
    train(sess,
          training_set=(X_train_centered, y_train), 
          validation_set=(X_valid_centered, y_valid), 
          initialize=True,
          random_seed=123)
    save(saver, sess, epoch=20)

Epoch 01 Training Avg. Loss: 279.542  Validation Acc:   0.975
Epoch 02 Training Avg. Loss:  73.216  Validation Acc:   0.983
Epoch 03 Training Avg. Loss:  49.446  Validation Acc:   0.985
Epoch 04 Training Avg. Loss:  39.279  Validation Acc:   0.987
Epoch 05 Training Avg. Loss:  31.282  Validation Acc:   0.988
Epoch 06 Training Avg. Loss:  26.679  Validation Acc:   0.987
Epoch 07 Training Avg. Loss:  23.063  Validation Acc:   0.989
Epoch 08 Training Avg. Loss:  20.499  Validation Acc:   0.990
Epoch 09 Training Avg. Loss:  16.571  Validation Acc:   0.991
Epoch 10 Training Avg. Loss:  14.480  Validation Acc:   0.991
Epoch 11 Training Avg. Loss:  13.506  Validation Acc:   0.992
Epoch 12 Training Avg. Loss:  11.383  Validation Acc:   0.991
Epoch 13 Training Avg. Loss:   9.521  Validation Acc:   0.991
Epoch 14 Training Avg. Loss:   8.515  Validation Acc:   0.992
Epoch 15 Training Avg. Loss:   7.804  Validation Acc:   0.992
Epoch 16 Training Avg. Loss:   6.598  Validation Acc:   0.991
Epoch 17

In [15]:
# 新しい計算グラフを作成し, モデルを構築
g2 = tf.Graph()
with g2.as_default():
    tf.set_random_seed(random_seed)
    
    build_cnn()

    saver = tf.train.Saver()

# 新しいセッションを作成し, モデルを復元
with tf.Session(graph=g2) as sess:
    load(saver, sess, 
         epoch=20, path='./model/')
    
    # 予測正解率を計算
    preds = predict(sess, X_test_centered, 
                    return_proba=False)
    print('Test Accuracy: %.3f%%' % (100*
                np.sum(preds == y_test)/len(y_test)))
    


Building 1st layer: 
<tf.Variable 'conv_1/_weights:0' shape=(5, 5, 1, 32) dtype=float32_ref>
<tf.Variable 'conv_1/_biases:0' shape=(32,) dtype=float32_ref>
Tensor("conv_1/Conv2D:0", shape=(?, 24, 24, 32), dtype=float32)
Tensor("conv_1/net_pre-activation:0", shape=(?, 24, 24, 32), dtype=float32)
Tensor("conv_1/activation:0", shape=(?, 24, 24, 32), dtype=float32)

Building 2nd layer: 
<tf.Variable 'conv_2/_weights:0' shape=(5, 5, 32, 64) dtype=float32_ref>
<tf.Variable 'conv_2/_biases:0' shape=(64,) dtype=float32_ref>
Tensor("conv_2/Conv2D:0", shape=(?, 8, 8, 64), dtype=float32)
Tensor("conv_2/net_pre-activation:0", shape=(?, 8, 8, 64), dtype=float32)
Tensor("conv_2/activation:0", shape=(?, 8, 8, 64), dtype=float32)

Building 3rd layer: 
<tf.Variable 'fc_3/_weights:0' shape=(1024, 1024) dtype=float32_ref>
<tf.Variable 'fc_3/_biases:0' shape=(1024,) dtype=float32_ref>
Tensor("fc_3/MatMul:0", shape=(?, 1024), dtype=float32)
Tensor("fc_3/net_pre-activation:0", shape=(?, 1024), dtype=float3