In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import matplotlib.pyplot as plt
from PIL import Image
import cv2
import random
import tensorflow as tf
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Input, Dense, Conv2D, MaxPooling2D, UpSampling2D, Flatten, Reshape, Conv2DTranspose, ZeroPadding2D, Cropping2D
from tensorflow.keras.models import Model
from shutil import copyfile, rmtree
from timeit import default_timer as timer

In [None]:
# Вспомогательная функция для доступа к файлам относительно корня директория с данными.
INPUT_ROOT = "../input/gtsrb-german-traffic-sign"
def from_input(path):
    return os.path.join(INPUT_ROOT, path)

In [None]:
# Загружаем таблицу с данными о данных.
train_info = pd.read_csv(from_input("Train.csv"))
train_info.head()

In [None]:
# Посмотрим как выглядят наши данные.
train_info.describe()

In [None]:
# сколько примеров в каждом из классов
train_info.groupby('ClassId')['ClassId'].count()

In [None]:
test_info =  pd.read_csv(from_input("Test.csv"))
test_info.head()

In [None]:
test_info.describe()

In [None]:
# сколько примеров в каждом из классов
test_info.groupby('ClassId')['ClassId'].count()

In [None]:
%matplotlib inline

import matplotlib.image as mpimg
import matplotlib.pyplot as plt

# Показываем изображения в сетке 6х8.
nrows = 8
ncols = 6

pic_offset = 0 # Чтобы итерировать по изображениям каждый раз когда запустим код ниже.

In [None]:
def show_images(offset):
    fig = plt.gcf()
    fig.set_size_inches(ncols*3, nrows*3)

    for i in range(43):
        # subplot индексы начинаются с 1
        sp = plt.subplot(nrows, ncols, i + 1)
        sp.axis('Off')
        subdir = os.path.join(from_input('train'), str(i))
        files = os.listdir(subdir)
        img_path = os.path.join(subdir, files[offset % len(files)])
        img = mpimg.imread(img_path)
        #print(img.shape)
        plt.imshow(img)

    plt.show()

In [None]:
show_images(pic_offset)
pic_offset += 1

Загрузка и подготовка данных:

In [None]:
TARGET_SIZE = (38, 38) # изображения будут изменены до этого размера
FLATTEN_SIZE = TARGET_SIZE[0] * TARGET_SIZE[1] * 3
BATCH_SIZE=300

In [None]:
paths = train_info['Path'].values
y_train = train_info['ClassId'].values

indices = np.arange(y_train.shape[0])
randgen = random.Random(62)
randgen.shuffle(indices)

paths = paths[indices]
y_train = y_train[indices]
y_train = to_categorical(y_train, 43)

data=[]

for i, f in enumerate(paths):
    print('\rLoading data {0:.1f}%...'.format((i / len(paths)) * 100), end = '\r')
    image = Image.open(os.path.join(from_input('train'), f.replace('Train/', '')))
    resized_image = image.resize(TARGET_SIZE)
    data.append(np.array(resized_image))

X_train = np.array(data).astype('float32') / 255.0

print('Data loaded.              ')

train_datagen = ImageDataGenerator()
train_generator = train_datagen.flow(X_train,
                                    y_train,
                                    batch_size=BATCH_SIZE,
                                    shuffle=True,
                                    seed=17)

ae_datagen = ImageDataGenerator()
ae_generator = ae_datagen.flow(X_train,
                                X_train,
                                batch_size=BATCH_SIZE,
                                shuffle=True,
                                seed=11)

In [None]:
paths = test_info['Path'].values
y_test = test_info['ClassId'].values
y_test = to_categorical(y_test, 43)

data=[]

for i, f in enumerate(paths):
    print('\rLoading data {0:.1f}%...'.format((i / len(paths)) * 100), end = '\r')
    image = Image.open(os.path.join(from_input('test'), f.replace('Test/', '')))
    resized_image = image.resize(TARGET_SIZE)
    data.append(np.array(resized_image))

print('Data loaded.              ')

X_test = np.array(data).astype('float32') / 255.0 

test_datagen = ImageDataGenerator()
test_generator = test_datagen.flow(X_test,
                                    y_test,
                                    batch_size=BATCH_SIZE,
                                    shuffle=False,
                                    seed=17)

Некоторые вспомогательные функции:

In [None]:
def plot(history, plot_acc = True):
    %matplotlib inline

    import matplotlib.image  as mpimg
    import matplotlib.pyplot as plt

    
    loss=history.history['loss']
    epochs=range(len(loss))
    plt.figure()
    plt.plot(epochs, loss, 'r', "Training Loss")
    plt.xlabel('Epoch')
    plt.title('Training loss')

    if plot_acc:
        acc=history.history['acc']
        plt.figure()
        plt.plot(epochs, acc, 'r', "Training Accuracy")
        plt.title('Training accuracy')
        plt.xlabel('Epoch')



In [None]:
def show_layers(model):
    print('Name\tOutput shape\tActivation\tInitializer')
    for l in model.layers:
        print('{0}({1})\t{2}\t{3}\t{4}'
            .format(l.name,
              l.__class__.__name__,
              l.output_shape,
              l.activation.__name__ if hasattr(l, 'activation') else '<none>',
              l.kernel_initializer.__class__.__name__ if hasattr(l, 'kernel_initializer') else '<none>'))


def custom_summary(model):
    model.summary()
    show_layers(model)

In [None]:
VERBOSE=1

In [None]:
def train_model(model, kernel_initializer, optimizer, epochs):
    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

    start_time = timer()
    history = model.fit_generator(train_generator,
                        epochs=epochs,
                        verbose=VERBOSE,
                        callbacks=[tf.keras.callbacks.EarlyStopping(monitor='acc', min_delta=0.0001, patience=2)],
                        steps_per_epoch= round(X_train.shape[0] / BATCH_SIZE))
    end_time = timer()
    
    custom_summary(model)
    print('==============================')
    print('Initializer: ', kernel_initializer)
    print('Optimizer: ', optimizer.__class__.__name__)
    print('Learning rate: ', optimizer.get_config()['learning_rate'])
    print('Epochs: ', epochs)
    print('==============================')
    print('Trained in {0:.2f} minutes'.format((end_time - start_time) / 60))
    
    acc=history.history['acc'][-1]
    test_acc = model.evaluate_generator(test_generator)[1]
    
    print('Results at the end of training: acc={1:.02f}%, test_acc={2:.02f}%'
          .format(i, acc*100, test_acc*100))

    plot(history)

In [None]:
def train_autoencoder(ae_model, kernel_initializer, optimizer, epochs):
    ae_model.compile(loss='mean_squared_error', optimizer = optimizer)
    
    start_time = timer()
    history = ae_model.fit_generator(ae_generator,
                        epochs=epochs,
                        verbose=VERBOSE,
                        steps_per_epoch= round(X_train.shape[0] / BATCH_SIZE))
    end_time = timer()
    
    custom_summary(ae_model)
    print('==============================')
    print('Initializer: ', kernel_initializer)
    print('Optimizer: ', optimizer.__class__.__name__)
    print('Learning rate: ', optimizer.get_config()['learning_rate'])
    print('Epochs: ', epochs)
    print('==============================')
    print('Trained in {0:.2f} minutes'.format((end_time - start_time) / 60))
    plot(history, plot_acc = False)

In [None]:
def train_ae_stack_layer(ae_model, kernel_initializer, optimizer, epochs, data):
    ae_model.compile(loss='mean_squared_error', optimizer = optimizer)
    
    start_time = timer()
    history = ae_model.fit(data,
                           data,
                            epochs=epochs,
                            verbose=VERBOSE,
                            steps_per_epoch= round(X_train.shape[0] / BATCH_SIZE))
    end_time = timer()
    
    print('==============================')
    print('Initializer: ', kernel_initializer)
    print('Optimizer: ', optimizer.__class__.__name__)
    print('Learning rate: ', optimizer.get_config()['learning_rate'])
    print('Epochs: ', epochs)
    print('==============================')
    print('Trained in {0:.2f} minutes'.format((end_time - start_time) / 60))
    plot(history, plot_acc = False)

Тренируем стек автокодировщиков - полносвязных:

Для сравнения тренируем и тестируем сеть без предварительной настройки параметров.

In [None]:
kernel_initializer='glorot_uniform'
optimizer=Adam(learning_rate=0.0001)
epochs=20

model = tf.keras.models.Sequential([
    Flatten(input_shape=TARGET_SIZE + (3,)),
    Dense(256, activation='tanh', kernel_initializer=kernel_initializer),
    Dense(128, activation='tanh', kernel_initializer=kernel_initializer),
    Dense(43, activation='softmax')
])

train_model(model, kernel_initializer, optimizer, epochs)

Тренируем первый слой автокодировщикa:

In [None]:
flatX = X_train.reshape((X_train.shape[0], FLATTEN_SIZE))

In [None]:
kernel_initializer='glorot_uniform'
optimizer=Adam(learning_rate=0.0001)
epochs = 20

input1 = Input(shape=(FLATTEN_SIZE,))
first_dense_l = Dense(256, activation='tanh', kernel_initializer=kernel_initializer)
first_dense = first_dense_l(input1)
reverse_first_dense_l = Dense(FLATTEN_SIZE, activation='tanh', kernel_initializer=kernel_initializer)
output = reverse_first_dense_l(first_dense)

ae_model = Model(input1, output)

train_ae_stack_layer(ae_model, kernel_initializer, optimizer, epochs, flatX)

Тренируем второй слой автокодировщикa:

In [None]:
# Получаем выходы предыдущего слоя
flatX = Model(input1, first_dense).predict(flatX)

In [None]:
input2 = Input(shape=(first_dense_l.output_shape[1],))
second_dense_l = Dense(128, activation='tanh', kernel_initializer=kernel_initializer)
second_dense = second_dense_l(input2)
reverse_second_dense = Dense(256, activation='tanh', kernel_initializer=kernel_initializer)(second_dense)

ae_model = Model(input2, reverse_second_dense)

train_ae_stack_layer(ae_model, kernel_initializer, optimizer, epochs, flatX)

Склеиваем слои кодирующие,  отбрасываем декодирующие, и получаем нейронную сеть.

In [None]:
inputnn = Input(shape=TARGET_SIZE + (3,))
flatten = Flatten(input_shape=TARGET_SIZE + (3,))(inputnn)
first_dense = first_dense_l(flatten)
second_dense = second_dense_l(first_dense)

Тренируем сеть предварительно настроенную автокодировщиком.

In [None]:
kernel_initializer='glorot_uniform'
optimizer=Adam(learning_rate=0.0001)
epochs=20

output = Dense(43, activation='softmax')(second_dense)
model = Model(inputnn, output)

train_model(model, kernel_initializer, optimizer, epochs)

Попробуем натренировать все слои автокодировщика одновременно.

In [None]:
kernel_initializer='glorot_uniform'
optimizer=Adam(learning_rate=0.0001)
epochs=20

inputae = Input(shape=TARGET_SIZE + (3,))
flatten = Flatten(input_shape=TARGET_SIZE + (3,))(inputae)
first_dense_l = Dense(256, activation='tanh', kernel_initializer=kernel_initializer)
first_dense = first_dense_l(flatten)
second_dense_l = Dense(128, activation='tanh', kernel_initializer=kernel_initializer)
second_dense = second_dense_l(first_dense)
reverse_second_dense = Dense(256, activation='tanh', kernel_initializer=kernel_initializer)(second_dense)
reverse_first_dense = Dense(FLATTEN_SIZE, activation='tanh', kernel_initializer=kernel_initializer)(reverse_second_dense)
output = Reshape(target_shape = TARGET_SIZE + (3,))(reverse_first_dense)

ae_model = Model(inputae, output)
train_autoencoder(ae_model, kernel_initializer, optimizer, epochs)

In [None]:
flatten = Flatten(input_shape=TARGET_SIZE + (3,))(inputae)
first_dense = first_dense_l(flatten)
second_dense = second_dense_l(first_dense)

In [None]:
kernel_initializer='glorot_uniform'
optimizer=Adam(learning_rate=0.0001)
epochs=20

output = Dense(43, activation='softmax')(second_dense)
model = Model(inputae, output)

train_model(model, kernel_initializer, optimizer, epochs)

Нет никакой выгоды от обучения каждого слоя отдельно. Что соответствует результатам *Is Joint Training Better for Deep Auto-Encoders? Yingbo Zhou, Devansh Arpit, Ifeoma Nwogu, Venu Govindaraju, 2015*

Оказывается что, также, нет никакой выгоды от предварительной настройки параметров с помощью автокодировщика.

Попробуем ещё и свёрточную нейронную сеть. Для обучения такого автокодировщика нам понадобится развёрточный слой Conv2DTranspose. Не будем использовать регуляризацию и послойное обучение.

In [None]:
kernel_initializer='glorot_uniform'
optimizer=Adam(learning_rate=0.0005)
epochs=20

model = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(128, (3, 3), activation='relu', input_shape=TARGET_SIZE + (3,)),
    tf.keras.layers.MaxPooling2D(2, 2),
    tf.keras.layers.Conv2D(256, (3, 3), activation='relu', input_shape=TARGET_SIZE + (3,)),
    tf.keras.layers.MaxPooling2D(2, 2),
    tf.keras.layers.Conv2D(512, (3, 3), activation='relu', input_shape=TARGET_SIZE + (3,)),
    tf.keras.layers.MaxPooling2D(2, 2),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(43, activation='softmax')
])

train_model(model, kernel_initializer, optimizer, epochs)

In [None]:
from keras import backend as K
from tensorflow.keras.layers import Layer


class MaxPoolingWithArgmax2D(Layer):
    def __init__(self, pool_size=(2, 2), strides=(2, 2), padding='same', **kwargs):
        super(MaxPoolingWithArgmax2D, self).__init__(**kwargs)
        self.padding = padding
        self.pool_size = pool_size
        self.strides = strides

    def call(self, inputs, **kwargs):
        padding = self.padding
        pool_size = self.pool_size
        strides = self.strides
        ksize = [1, pool_size[0], pool_size[1], 1]
        padding = padding.upper()
        strides = [1, strides[0], strides[1], 1]
        output, argmax = tf.nn.max_pool_with_argmax(inputs, ksize=ksize, 
                                                    strides=strides, padding=padding,
                                                    include_batch_in_index=True)

        return [output, argmax]

    def compute_output_shape(self, input_shape):
        ratio = (1, 2, 2, 1)
        output_shape = [dim // ratio[idx] if dim is not None else None for idx, dim in enumerate(input_shape)]
        output_shape = tuple(output_shape)
        return [output_shape, output_shape]

class MaxUnpooling2D(Layer):
    def __init__(self, size=(2, 2), **kwargs):
        super(MaxUnpooling2D, self).__init__(**kwargs)
        self.size = size

    def call(self, inputs):
        pool, ind, prev_tensor = inputs[0], inputs[1], inputs[2]
        with tf.variable_scope(self.name):
            input_shape = tf.shape(pool)
            o_shape = tf.shape(prev_tensor)
            output_shape = [input_shape[0], o_shape[1], o_shape[2], input_shape[3]]
            flat_input_size = tf.reduce_prod(input_shape)
            flat_output_size = tf.reduce_prod(output_shape)
            
            upsampled = K.repeat_elements(pool, self.size[0], axis=1)
            upsampled = K.repeat_elements(upsampled, self.size[1], axis=2)
            upsampled = tf.reshape(upsampled, [flat_output_size])
            indices = tf.reshape(ind, [flat_input_size, 1])
            gathered_updates = tf.gather_nd(upsampled, indices)
            ret = tf.scatter_nd(indices, gathered_updates, shape=tf.cast([flat_output_size], tf.int64))
            ret = tf.reshape(ret, output_shape)

            set_input_shape = pool.get_shape()
            prev_tensor_shape = prev_tensor.get_shape()

            set_output_shape = [set_input_shape[0], prev_tensor_shape[1], prev_tensor_shape[2], set_input_shape[3]]
            ret.set_shape(set_output_shape)

            return ret

    def compute_output_shape(self, input_shape):
        inds_shape = input_shape[1]
        return inds_shape[0], inds_shape[1] * self.size[0], inds_shape[2] * self.size[1], inds_shape[3]


In [None]:
inputae = Input(shape=TARGET_SIZE + (3,))
first_conv_l = Conv2D(128, (3, 3), activation='relu')
first_conv = first_conv_l(inputae)
(first_mp, first_mp_inds) = MaxPoolingWithArgmax2D((2, 2))(first_conv)
second_conv_l = Conv2D(256, (3, 3), activation='relu')
second_conv = second_conv_l(first_mp)
(second_mp, second_mp_inds) = MaxPoolingWithArgmax2D((2, 2))(second_conv)
third_conv_l = Conv2D(512, (3, 3), activation='relu')
third_conv = third_conv_l(second_mp)
third_mp_l = MaxPoolingWithArgmax2D((2, 2))
(third_mp, third_mp_inds) = third_mp_l(third_conv)

reverse_third_mp = MaxUnpooling2D((2, 2))((third_mp, third_mp_inds, third_conv))
reverse_third_conv = Conv2DTranspose(256, (3, 3), activation='relu')(reverse_third_mp)
reverse_second_mp = MaxUnpooling2D((2, 2))((reverse_third_conv, second_mp_inds, second_conv))
reverse_second_conv = Conv2DTranspose(128, (3, 3), activation='relu')(reverse_second_mp)
reverse_first_mp = MaxUnpooling2D((2, 2))((reverse_second_conv, first_mp_inds, first_conv))
reverse_first_conv = Conv2DTranspose(3, (3, 3), activation='relu')(reverse_first_mp)
output = reverse_first_conv

In [None]:
kernel_initializer = 'glorot_uniform'
optimizer=Adam(learning_rate=0.00001)
epochs=50

ae_model = Model(inputae, output)
ae_model.summary()
train_autoencoder(ae_model, kernel_initializer, optimizer, epochs)

In [None]:
kernel_initializer='glorot_uniform'
optimizer=Adam(learning_rate=0.0005)
epochs=20

flatten = Flatten()(third_mp)
output = Dense(43, activation='softmax')(flatten)
model = Model(inputae, output)

train_model(model, kernel_initializer, optimizer, epochs)