In [1]:
import tensorflow as tf
import tensorflow_hub as hub

import tensorflow_datasets as tfds

import time

from PIL import Image
import requests
from io import BytesIO

import matplotlib.pyplot as plt
import numpy as np
import glob
import os
import pathlib
import cv2

In [2]:
# パラメータ一覧
DATA_DIR = "C:\\tmp\\bit_image"

MODEL_DIR = '/tmp/my_saved_bit_model/'

DATA_SET_NAME = 'tf_flowers'

TRAIN_SPLIT = 0.9

NUM_CLASSES = 6

# 画像の解像度とデータセットの量でパラメータの指針がある。いったん軽いほうにしてるけど
# RESIZE_TOとCROP_TOはコメントアウトの方に変えてもいいかも
RESIZE_TO = 160 # 512
CROP_TO = 128 # 480

SCHEDULE_LENGTH = 500# 10000, 20000
SCHEDULE_BOUNDARIES = [200, 300, 400] #[3000, 6000, 9000] [6000, 12000, 18000]

# バッチサイズ512だとOOM発生するかも
# その場合2^nの形で修正して
BATCH_SIZE = 512
STEPS_PER_EPOCH = 10

# 今コメントアウトしてるけどこっちで学習させても十分かも
EPOCHS_NUM = 10

In [3]:
# ラベル一覧
tf_flowers_labels = ['dandelion', 'daisy', 'tulips', 'sunflowers', 'roses', 'apple']

In [4]:
# モジュールのロード
model_url = "https://tfhub.dev/google/bit/m-r50x1/1"
module = hub.KerasLayer(model_url)

In [5]:
train_image = []
test_image = []
train_label = []
test_label= []

img_size = 224
train_num = 0 
test_num = 0 

folder_name = DATA_DIR

# フォルダ内のディレクトリの読み込み
classes = os.listdir( folder_name )

for i, d in enumerate(classes):
    files = os.listdir( folder_name + '/' + d  )

    tmp_image = []
    tmp_label = []
    for f in files:
        # 1枚の画像に対する処理
        if not 'jpg' in f:# jpg以外のファイルは無視
            continue

        # 画像読み込み
        img = cv2.imread( folder_name+ '/' + d + '/' + f)
        # one_hot_vectorを作りラベルとして追加
        label = i

        # リサイズをする処理
        if img_size != 0:
            img = cv2.resize( img , (img_size , img_size ))
            tmp_image.append(img)                
            tmp_label.append(label)


    if train_num == 0 :
        train_image.extend( tmp_image )
        train_label.extend( tmp_label )
    elif test_num == 0 :
        sampled_image , sampled_label = random_sampling( tmp_image , tmp_label , train_num )
        train_image.extend( sampled_image )
        train_label.extend( sampled_label )
    else :
        sampled_train_image , sampled_train_label , sampled_test_image , sampled_test_label = random_sampling( tmp_image , tmp_label , train_num , test_num )
        train_image.extend( sampled_train_image )
        train_label.extend( sampled_train_label )
        test_image.extend( sampled_test_image )
        test_label.extend( sampled_test_label )

    print(d + 'read complete ,' + str(len(train_label)) + ' pictures exit')


appleread complete ,11 pictures exit
daisyread complete ,644 pictures exit
dandelionread complete ,1542 pictures exit
rosesread complete ,2183 pictures exit
sunflowersread complete ,2882 pictures exit
tulipsread complete ,3681 pictures exit


In [6]:
train_image =np.stack(train_image)
train_label = np.stack(train_label)

In [7]:
dataset = tf.data.Dataset.from_tensor_slices({'image':train_image,'label':train_label})

In [8]:

AUTOTUNE = tf.data.experimental.AUTOTUNE
dataset2 = dataset.prefetch(buffer_size=AUTOTUNE)

In [9]:
num_train = int(len(train_image)*TRAIN_SPLIT)
dataset2 = dataset2.shuffle(10000)
ds_train = dataset2.take(num_train)
ds_test = dataset2.skip(num_train)

In [10]:
# BiTモデルクラス
class MyBiTModel(tf.keras.Model):
  
    def __init__(self, num_classes, module):
        super().__init__()

        self.num_classes = num_classes
        self.head = tf.keras.layers.Dense(num_classes, kernel_initializer='zeros')
        self.bit_model = module

    # predict関数を最大の選択肢だのindex出力するようにしてる。後でテストするときに視覚的にわかりやすいためだけどこの辺は仕様次第かな
    def predict(self, image):
        return tf.argmax(self(image)[0]).numpy()
    
    # tf,functionつけてみたけどあんま早くないかも。Bitモデル自体がtensorflowオブジェクトの計算に最適化されてなさそう。
    #@tf.function
    def call(self, images):
        bit_embedding = self.bit_model(images)
        # 論文のコードだとsoftmaxかけられてなかったけどさすがにちがくね？ってことでsofmaxかけてる
        # あんま精度でなかったら外してもいいかも
        return tf.keras.activations.softmax(self.head(bit_embedding))

model = MyBiTModel(num_classes=NUM_CLASSES, module=module)

In [11]:

# データの前処理パイプライン
# 論文にこれやってって書いてあったので従っておいたほうがよさそう

SCHEDULE_LENGTH = SCHEDULE_LENGTH * 512 / BATCH_SIZE

def cast_to_tuple(features):
  return (features['image'], features['label'])
  
def preprocess_train(features):
  features['image'] = tf.image.random_flip_left_right(features['image'])
  features['image'] = tf.image.resize(features['image'], [RESIZE_TO, RESIZE_TO])
  features['image'] = tf.image.random_crop(features['image'], [CROP_TO, CROP_TO, 3])
  features['image'] = tf.cast(features['image'], tf.float32) / 255.0
  return features

def preprocess_test(features):
  features['image'] = tf.image.resize(features['image'], [RESIZE_TO, RESIZE_TO])
  features['image'] = tf.cast(features['image'], tf.float32) / 255.0
  return features

pipeline_train = (ds_train
                  .shuffle(10000)
                  .repeat()
                  .map(preprocess_train, num_parallel_calls=8)
                  .batch(BATCH_SIZE)
                  .map(cast_to_tuple) 
                  .prefetch(2))

pipeline_test = (ds_test.map(preprocess_test, num_parallel_calls=1)
                  .map(cast_to_tuple) 
                  .batch(BATCH_SIZE)
                  .prefetch(2))

In [12]:
# optimizerと損失関数の定義
# 損失関数がクロスエントロピーなのにもともと負の値が許されてたの意味が分からない

lr = 0.003 * BATCH_SIZE / 512 

lr_schedule = tf.keras.optimizers.schedules.PiecewiseConstantDecay(boundaries=SCHEDULE_BOUNDARIES, 
                                                                   values=[lr, lr*0.1, lr*0.001, lr*0.0001])
optimizer = tf.keras.optimizers.SGD(learning_rate=lr_schedule, momentum=0.9)

loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

model.compile(optimizer=optimizer,
              loss=loss_fn,
              metrics=['accuracy'])


In [13]:
# これだと50エポックやると思うから少なくしてもいいと思う
history = model.fit(
    pipeline_train,
    batch_size=BATCH_SIZE,
    steps_per_epoch=STEPS_PER_EPOCH,
    epochs= EPOCHS_NUM,
    validation_data=pipeline_test 
)

Epoch 1/10
Epoch 2/10
 1/10 [==>...........................] - ETA: 0s - loss: 1.1709 - accuracy: 0.9062

KeyboardInterrupt: 

In [14]:
def preprocess_image(image):
    image = np.array(image)
    img_reshaped = tf.reshape(
        image, [1, image.shape[0], image.shape[1], image.shape[2]])
    image = tf.image.convert_image_dtype(img_reshaped, tf.float32)
    return image


def load_image_from_url(url):
    response = requests.get(url)
    image = Image.open(BytesIO(response.content))
    image = preprocess_image(image)
    return image

In [None]:
# モデルの保存①モデル丸ごと保存
export_module_dir = '/tmp/my_saved_bit_model/'
tf.saved_model.save(model, export_module_dir)

In [None]:
# モデルのロード①
# load_model1 =  hub.KerasLayer(export_module_dir, trainable=True)
load_model1 =  tf.keras.models.load_model(export_module_dir)

In [15]:
# モデルの保存②重み行列だけ保存（こっちが普通,早い）
model.save_weights(MODEL_DIR)

In [16]:
# モデルのロード②

# こっちの場合モデルを作ってあげないといけないのでロードに時間かかる。今回は省略
# model_url = "https://tfhub.dev/google/bit/m-r50x1/1"
# module = hub.KerasLayer(model_url)

load_model2 = MyBiTModel(num_classes=NUM_CLASSES, module=module)
load_model2.load_weights(MODEL_DIR)

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x295c4fabaf0>

In [None]:
# それぞれで5個予測させてみる
for features in list(ds_test.take(5).as_numpy_iterator()):
    image = features['image']
    image = preprocess_image(image)
    image = tf.image.resize(image, [CROP_TO, CROP_TO])
    # オーバーライドした関数はセーブしてないっぽいからせっかく作ったpredict関数が動かない。悲しい
    print(f"予測値：{load_model1.predict(image)}, 正解：{features['label']}")

In [19]:
for features in list(ds_test.take(5).as_numpy_iterator()):
    image = features['image']
    image = preprocess_image(image)
    image = tf.image.resize(image, [CROP_TO, CROP_TO])
    load_model2(image)
    print(f"予測値：{load_model2.predict(image)}, 正解：{features['label']}")

予測値：2, 正解：2
予測値：2, 正解：1
予測値：1, 正解：1
予測値：3, 正解：3
予測値：3, 正解：3
