In [1]:
import os
import keras
from keras.models import Sequential
from keras.layers.convolutional import Conv2D
from keras.layers.convolutional import MaxPooling2D
from keras.layers.core import Activation
from keras.layers.core import Flatten, Dropout
from keras.layers.core import Dense
from keras.datasets import cifar10
from keras.optimizers import RMSprop
from keras.callbacks import TensorBoard, ModelCheckpoint

Using TensorFlow backend.
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


## ネットワーク定義

In [2]:
def network(input_shape, num_classes):
    model = Sequential()

    model.add(
        Conv2D(32, kernel_size=3, padding="same",
               input_shape=input_shape, activation="relu")
    )
    # 32チャネル
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))
    
    model.add(Conv2D(64, kernel_size=3, padding="same", activation="relu"))
    # 64チャネル
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Flatten())
    model.add(Dense(512, activation="relu"))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes))
    model.add(Activation("softmax"))
    return model

## データセット定義

In [3]:
class CIFAR10Dataset():

    def __init__(self):
        self.image_shape = (32, 32, 3)# 入力sizeの定義
        self.num_classes = 10# クラス数

    def get_batch(self):
        (x_train, y_train), (x_test, y_test) = cifar10.load_data()

        x_train, x_test = [self.preprocess(d) for d in [x_train, x_test]]
        y_train, y_test = [self.preprocess(d, label_data=True) for d in
                           [y_train, y_test]]

        return x_train, y_train, x_test, y_test

    def preprocess(self, data, label_data=False):
        if label_data:
            
            data = keras.utils.to_categorical(data, self.num_classes)
        else:# 正規化
            data = data.astype("float32")
            data /= 255  
            shape = (data.shape[0],) + self.image_shape  
            data = data.reshape(shape)

        return data

## 学習処理定義

In [7]:
class Trainer():

    def __init__(self, model, loss, optimizer):
        self._target = model
        # コンパイル
        self._target.compile(
            loss=loss, optimizer=optimizer, metrics=["accuracy"]
            )
        self.verbose = 1
        logdir = "logdir_cifar10_net"
        self.log_dir = os.path.join(os.path.dirname("__file__"), logdir)
        self.model_file_name = "model_file.hdf5"

    def train(self, x_train, y_train, batch_size, epochs, validation_split):
        if os.path.exists(self.log_dir):
            import shutil
            shutil.rmtree(self.log_dir)
        os.mkdir(self.log_dir)

        model_path = os.path.join(self.log_dir, self.model_file_name)
        self._target.fit(
            x_train, y_train,
            batch_size=batch_size, epochs=epochs,
            validation_split=validation_split,
            callbacks=[
                TensorBoard(log_dir=self.log_dir),
                ModelCheckpoint(model_path, save_best_only=True)
            ],
            verbose=self.verbose
        )

In [8]:
dataset = CIFAR10Dataset()
model = network(dataset.image_shape, dataset.num_classes)

In [9]:
x_train, y_train, x_test, y_test = dataset.get_batch()
trainer = Trainer(model, loss="categorical_crossentropy", optimizer=RMSprop())
trainer.train(
    x_train, y_train, batch_size=128, epochs=12, validation_split=0.2
)


Train on 40000 samples, validate on 10000 samples


Epoch 1/12

Epoch 2/12
Epoch 3/12
Epoch 4/12
Epoch 5/12
Epoch 6/12
Epoch 7/12
Epoch 8/12
Epoch 9/12
Epoch 10/12
Epoch 11/12
Epoch 12/12


In [10]:
score = model.evaluate(x_test, y_test, verbose=0)
print("Test loss:", score[0])
print("Test accuracy:", score[1])

Test loss: 0.8370696809768676
Test accuracy: 0.7193999886512756


## ネットワークをより深くする

In [11]:
def network(input_shape, num_classes):
    model = Sequential()

    model.add(Conv2D(
        32, kernel_size=3, padding="same",
        input_shape=input_shape, activation="relu"
        ))
    # 32チャネル
    model.add(Conv2D(32, kernel_size=3, activation="relu"))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))
    # 32チャネル
    model.add(Conv2D(64, kernel_size=3, padding="same", activation="relu"))
    #64チャネル
    model.add(Conv2D(64, kernel_size=3, activation="relu"))
    #64チャネル
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))

    model.add(Flatten())
    model.add(Dense(512, activation="relu"))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes))
    model.add(Activation("softmax"))
    return model

In [12]:
dataset = CIFAR10Dataset()
model = network(dataset.image_shape, dataset.num_classes)

In [13]:
x_train, y_train, x_test, y_test = dataset.get_batch()
trainer = Trainer(model, loss="categorical_crossentropy", optimizer=RMSprop())
trainer.train(
    x_train, y_train, batch_size=128, epochs=12, validation_split=0.2
)

Train on 40000 samples, validate on 10000 samples
Epoch 1/12
Epoch 2/12
Epoch 3/12
Epoch 4/12
Epoch 5/12
Epoch 6/12
Epoch 7/12
Epoch 8/12
Epoch 9/12
Epoch 10/12
Epoch 11/12
Epoch 12/12


In [14]:
score = model.evaluate(x_test, y_test, verbose=0)
print("Test loss:", score[0])
print("Test accuracy:", score[1])

Test loss: 0.7125389976978302
Test accuracy: 0.7699000239372253


## data augmentation

In [15]:
from keras.preprocessing.image import ImageDataGenerator
import numpy as np

In [17]:
class Trainer():

    def __init__(self, model, loss, optimizer):
        self._target = model
        self._target.compile(
            loss=loss, optimizer=optimizer, metrics=["accuracy"]
            )
        self.verbose = 1
        logdir = "logdir_cifar10_deep_with_aug"
        self.log_dir = os.path.join(os.path.dirname("__file__"), logdir)
        self.model_file_name = "model_file.hdf5"

    def train(self, x_train, y_train, batch_size, epochs, validation_split):
        if os.path.exists(self.log_dir):
            import shutil
            shutil.rmtree(self.log_dir) 
        os.mkdir(self.log_dir)

        datagen = ImageDataGenerator(
            featurewise_center=False, 
            samplewise_center=False,  
            featurewise_std_normalization=False,  
            samplewise_std_normalization=False,  
            zca_whitening=False,  
            rotation_range=0,  
            width_shift_range=0.1, # 横方向への移動割合
            height_shift_range=0.1, # 縦方向への移動割合
            horizontal_flip=True,  #水平方向への反転
            vertical_flip=False)  

        datagen.fit(x_train)# 学習データの平均、分散などの情報保存
        # trainとvalidation分割
        indices = np.arange(x_train.shape[0])
        np.random.shuffle(indices)
        validation_size = int(x_train.shape[0] * validation_split)
        x_train, x_valid = \
            x_train[indices[:-validation_size], :], \
            x_train[indices[-validation_size:], :]
        y_train, y_valid = \
            y_train[indices[:-validation_size], :], \
            y_train[indices[-validation_size:], :]
        
        model_path = os.path.join(self.log_dir, self.model_file_name)
        # 学習
        self._target.fit_generator(
            datagen.flow(x_train, y_train, batch_size=batch_size),
            steps_per_epoch=x_train.shape[0] // batch_size,
            epochs=epochs,
            validation_data=(x_valid, y_valid),
            callbacks=[
                TensorBoard(log_dir=self.log_dir),
                ModelCheckpoint(model_path, save_best_only=True)
            ],
            verbose=self.verbose,
            workers=4
        )

In [18]:
dataset = CIFAR10Dataset()
model = network(dataset.image_shape, dataset.num_classes)

In [21]:
x_train, y_train, x_test, y_test = dataset.get_batch()
trainer = Trainer(model, loss="categorical_crossentropy", optimizer=RMSprop())
trainer.train(
    x_train, y_train, batch_size=128, epochs=15, validation_split=0.2
)

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


In [22]:
score = model.evaluate(x_test, y_test, verbose=0)
print("Test loss:", score[0])
print("Test accuracy:", score[1])

Test loss: 0.7179710119247437
Test accuracy: 0.7653999924659729


## 予測

In [23]:
from pathlib import Path
import numpy as np
from PIL import Image
from keras.models import load_model

In [24]:
model_path = "logdir_cifar10_deep_with_aug/model_file.hdf5"
images_folder = "sample_images"

In [25]:
# load model
model = load_model(model_path)
image_shape = (32, 32, 3)

In [26]:
# load images
def crop_resize(image_path):
    image = Image.open(image_path)
    length = min(image.size)
    crop = image.crop((0, 0, length, length))
    resized = crop.resize(image_shape[:2])  # use width x height
    img = np.array(resized).astype("float32")
    img /= 255
    return img

In [27]:
folder = Path(images_folder)
image_paths = [str(f) for f in folder.glob("*.png")]
images = [crop_resize(p) for p in image_paths]
images = np.asarray(images)

In [28]:
predicted = model.predict_classes(images)

assert predicted[0] == 3, "image should be cat."
assert predicted[1] == 5, "image should be dog."

print("You can detect cat & dog!")

You can detect cat & dog!
