# Dogs vs. Cats を CNN で解く

## Drive をマウント
(Google Colab でやる場合)

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

## パッケージ群のインストール

In [None]:
import tensorflow
from tensorflow import keras
%matplotlib inline
import matplotlib.pyplot as plt

In [None]:
# TensorFlow経由でデバイス設定の確認が可能です.
# from tensorflow.python.client import device_lib
# device_lib.list_local_devices()

In [None]:
import os, cv2, random
import numpy as np
import pandas as pd
from matplotlib import ticker
import seaborn as sns

## ファイル一覧の取得

In [None]:
# Google Colab でやる場合
TRAIN_DIR = './gdrive/My Drive/colab/kaggle_dogs_cats/train/'
TEST_DIR = './gdrive/My Drive/colab/kaggle_dogs_cats/test/'

# ローカルでやる場合
# TRAIN_DIR = 'train/'
# TEST_DIR = 'test/'

In [None]:
# エラーが出ないこと (マウントが完了していること) を確認する.
print(len(os.listdir(TRAIN_DIR)))
print(len(os.listdir(TEST_DIR)))

In [None]:
# 訓練用データ (全部、犬だけ、猫だけ)
train_images = [TRAIN_DIR + i for i in os.listdir(TRAIN_DIR)] # use this for full dataset
train_dogs =   [TRAIN_DIR + i for i in os.listdir(TRAIN_DIR) if 'dog' in i]
train_cats =   [TRAIN_DIR + i for i in os.listdir(TRAIN_DIR) if 'cat' in i]

# 評価用データ
test_images =  [TEST_DIR+i for i in os.listdir(TEST_DIR)]

In [None]:
# 手っ取り早く試すために、データを減らす (犬と猫 1000個ずつ).
train_images = train_dogs[:1000] + train_cats[:1000]
random.shuffle(train_images)

# 評価用データを 25個に減らす.
test_images = test_images[:25]

## 画像を配列に格納

In [None]:
ROWS = 64
COLS = 64
CHANNELS = 3

In [None]:
def read_image(file_path):
    """
    画像をファイルから読み込んで、ROWS * COLS にリサイズして返す.
    """
    img = cv2.imread(file_path, cv2.IMREAD_COLOR) # モノクロの場合は cv2.IMREAD_GRAYSCALE
    return cv2.resize(img, (ROWS, COLS), interpolation=cv2.INTER_CUBIC)

In [None]:
def prep_data(image_files):
    """
    画像のファイル名のリストを受け取って、画像を数値化した多次元配列を返す.
    """
    count = len(image_files)
    data = np.ndarray((count, CHANNELS, ROWS, COLS), dtype=np.uint8)

    for i, image_file in enumerate(image_files):
        image = read_image(image_file) # COLS * ROWS * CHANNELS
        data[i] = image.T # Transpose (転置) -> CHANNELS * ROWS * COLS
        if i % 250 == 0: print('Processed {} of {}'.format(i, count))
    
    return data

In [None]:
# 画像を配列に格納 (時間がかかります).
train = prep_data(train_images)
test = prep_data(test_images)

In [None]:
print(train.shape)
print(test.shape)

## ラベル (正解) データをリストに格納する

In [None]:
labels = []
for i in train_images:
    if 'dog.' in i: # 単純に 'dog' とすると、フォルダ名がマッチするため.
        labels.append(1) # 犬なら 1.
    else:
        labels.append(0) # 猫なら 0.

In [None]:
# 正しくラベルがついたか確認.
import pprint
print(labels[:10])
pprint.pprint(train_images[:10])

In [None]:
# 各ラベルの個数をグラフで確認.
sns.countplot(labels)
plt.title('Cats and Dogs')

## 画像データの確認

In [None]:
def show_cats_and_dogs(idx):
    """
    idx 番目の猫と、idx 番目の犬を、並べて表示する.
    """
    cat = read_image(train_cats[idx])
    dog = read_image(train_dogs[idx])

    pair = np.concatenate((cat, dog), axis=1)
    # Cols * Rows * Channels を結合するので、列方向に追加され、横長の画像になる.

    plt.figure(figsize=(10, 5))
    plt.imshow(pair)
    plt.show()

In [None]:
for idx in range(0, 5):
  show_cats_and_dogs(idx)

## ピクセル値を平均した画像を生成する

In [None]:
dog_avg = np.array([dog[0].T for i, dog in enumerate(train) if labels[i]==1]).mean(axis=0)
# 0 番目のチャンネルだけを使っている？

plt.imshow(dog_avg)
plt.title('Your Average Dog')

In [None]:
cat_avg = np.array([cat[0].T for i, cat in enumerate(train) if labels[i]==0]).mean(axis=0)
plt.imshow(cat_avg)
plt.title('Your Average Cat')

## モデル (CatdogNet-16) の定義

In [None]:
from keras.models import Sequential
from keras.layers import Input, Dropout, Flatten, Conv2D, MaxPooling2D, Dense, Activation
from keras.optimizers import RMSprop
from keras.callbacks import ModelCheckpoint, Callback, EarlyStopping
from keras.utils import np_utils

In [None]:
optimizer = RMSprop(lr=1e-4) # lr = learning rate
objective = 'binary_crossentropy' # 二値分類なので.

In [None]:
def catdog():
    """
    モデルを作って返す.
    """
    model = Sequential()
    
    model.add(Conv2D(32, (3, 3), input_shape=(CHANNELS, ROWS, COLS), activation='relu', padding='same'))
    model.add(Conv2D(32, (3, 3), activation='relu', padding='same'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
    model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
    model.add(MaxPooling2D(data_format='channels_first', pool_size=(2, 2)))
    
    model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
    model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
    model.add(MaxPooling2D(data_format='channels_first', pool_size=(2, 2)))
    
    model.add(Conv2D(256, (3, 3), activation='relu', padding='same'))
    model.add(Conv2D(256, (3, 3), activation='relu', padding='same'))
    # model.add(Conv2D(256, 3, 3, activation='relu', padding='same'))
    model.add(MaxPooling2D(data_format='channels_first', pool_size=(2, 2)))
    
    model.add(Flatten())
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.5))
    
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.5))
    
    model.add(Dense(1))
    model.add(Activation('sigmoid'))
    
    model.compile(loss=objective, optimizer=optimizer, metrics=['accuracy'])
    return model

In [None]:
model = catdog()

## トレーニングの実行

In [None]:
epochs = 10
batch_size = 16

In [None]:
## Callback for loss logging per epoch
class LossHistory(Callback):
    """
    途中の損失を記録しておくためのクラス
    """
    def on_train_begin(self, logs={}):
        """
        学習を開始する前に行う処理
        """
        self.losses = []     # Loss の初期化
        self.val_losses = [] # Validation の Loss の初期化
        
    def on_epoch_end(self, batch, logs={}):
        """
        各 Epoch が終わった時に行う処理
        """
        self.losses.append(logs.get('loss'))         # Epoch での Loss を全体の Loss に追加
        self.val_losses.append(logs.get('val_loss')) # Epoch での Validation の Loss を全体のに追加

In [None]:
# Validation Loss をモニタして、変化しなくなったら、トレーニングを停止する.
early_stopping = EarlyStopping(monitor='val_loss', patience=3, verbose=1, mode='auto')

In [None]:
def run_catdog():
    
    history = LossHistory()
    
    # 学習してパラメタを設定する.
    model.fit(train, labels, batch_size=batch_size, epochs=epochs,
              validation_split=0.25, verbose=0, shuffle=True, callbacks=[history, early_stopping])
    
    # validation_split=0.25 : 75%のデータを学習に使って、25%のデータを Validation に使う.

    # 学習したパラメタを使って予測をする.
    predictions = model.predict(test, verbose=0)
    return predictions, history

In [None]:
predictions, history = run_catdog()

In [None]:
loss = history.losses
val_loss = history.val_losses

In [None]:
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('CatdogNet-16 Loss Trend')
plt.plot(loss, 'blue', label='Training Loss')
plt.plot(val_loss, 'green', label='Validation Loss')
plt.xticks(range(0, epochs)[0::2])
plt.legend()
plt.show()

## 推定結果の可視化

In [None]:
for i in range(0,10):
    if predictions[i, 0] >= 0.5: 
        print('I am {:.2%} sure this is a Dog'.format(predictions[i][0]))
    else: 
        print('I am {:.2%} sure this is a Cat'.format(1-predictions[i][0]))
        
    plt.imshow(test[i].T)
    plt.show()