In [1]:
#Setup
import os
from sklearn.model_selection import KFold
import tensorflow as tf
import keras
from keras.layers import Input, Dense, Activation, Bidirectional, Dropout, LSTM, MaxPooling2D
from keras import applications
from keras.layers import Reshape, Lambda, BatchNormalization
from keras.layers.merge import add, concatenate
from keras.models import Sequential, Model
from keras.optimizers import Adam, Adadelta
from keras.callbacks import EarlyStopping, LearningRateScheduler, ModelCheckpoint, TensorBoard, ReduceLROnPlateau
from keras import backend as K
import numpy as np
import json
from keras.preprocessing import image
from keras.applications.vgg16 import preprocess_input
import itertools
import random
import editdistance
random.seed(2020)
print("Setup Complete")

Setup Complete


Using TensorFlow backend.


In [2]:
#define
SIZE = 100, 32
ADAM_LR = 0.001
EPOCHS = 20
BATCH_SIZE = 3
FINETUNE = False
DOWNSAMPLE_FACTOR = 32
LETTERS = "0123456789ABCDEFGHIJ,"
NUM_CLASS = len(LETTERS) + 1
MAX_LENGTH = 9
IMG_PATH = r"Dataset/easy_samples"
LABEL_PATH = r"Dataset/easy_samples.json"

In [3]:
#modify pooling layer
def maxpooling(base_model):
    model = Sequential(name='vgg16')
    for layer in base_model.layers[:-1]:
        if 'pool' in layer.name:
            pooling_layer = MaxPooling2D(pool_size=(2, 2), name=layer.name)
            model.add(pooling_layer)
        else:
            model.add(layer)
    return model

In [4]:
def text_to_labels(text):
    return list(map(lambda x: LETTERS.index(x), text))
def labels_to_text(labels):
    return ''.join(list(map(lambda x: LETTERS[x] if x<len(letters) else "", labels)))
def ctc_lambda(args):
    y_pred, labels, input_length, label_length = args
    y_pred = y_pred[:, 2:, :]
    return K.ctc_batch_cost(labels, y_pred, input_length, label_length)

In [6]:
class DataGenerator:
    def __init__(self,img_dirpath, labels_path, img_w, img_h,idxs, training = True, n_eraser=5):
        self.img_w = img_w
        self.img_h = img_h
        self.idxs = idxs
        self.img_dirpath = img_dirpath
        self.labels = json.load(open(labels_path)) if labels_path != None else None
        self.img_dir = os.listdir(self.img_dirpath)
        if self.idxs is not None:
            self.img_dir = [self.img_dir[idx] for idx in self.idxs]
        self.n = len(self.img_dir)
        self.indexes = list(range(self.n))
        self.cur_index = 0
        self.imgs = np.zeros((self.n, self.img_h, self.img_w, 3), dtype=np.float16)
        self.training = training
        self.texts = []
    def build_data(self):
        print(self.n, "Image Loading....",  self.img_dirpath)
        for i, img_file in enumerate(self.img_dir):
            img = image.load_img(self.img_dirpath + img_file, target_size = SIZE[::-1])
            img = image.img_to_array(img)
            img = preprocess_input(img).astype(np.float16)
            self.imgs[i] = img
            if self.labels != None:
                self.texts.append(self.labels[img_file])
            else:
                self.texts.append('')
        print("Done!")
    def next_sample(self):
        self.cur_index += 1
        if self.cur_index > self.n:
            self.cur_index = 0
            random.shuffle(self.indexes)
        return self.imgs[self.indexes[self.cur_index]].astype(np.float32), self.texts[self.indexes[self.cur_index]]
    def next_batch(self):
        while True:
            X_data = np.zeros([BATCH_SIZE, self.img_w, self.img_h, 3], dtype=np.float32)
            Y_data = np.zeros([BATCH_SIZE, MAX_LENGTH], dtype=np.float32)   
            input_length= np.ones((BATCH_SIZE, 1), dtype=np.float32) * (self.img_w//DOWNSAMPLE_FACTOR - 2)
            label_length = np.zeros((BATCH_SIZE, 1), dtype=np.float32)
            for i in range(BATCH_SIZE):
                img, text = self.next_sample()
                img = img.transpose((1, 0, 2))
                X_data[i] = img
                Y_data[i, :len(text)] = text_to_labels(text)
                label_length[i] = len(text)
            inputs={
                'the_inputs': X_data,
                'the_labels': Y_data,
                'input_length': input_length,
                'label_length': label_length
            }
            outputs={'ctc': np.zeros([BATCH_SIZE])}
            yield (inputs, outputs)

In [9]:
#Build model
def build_model(input_shape, training, finetune):
    #build cnn layer
    inputs = Input(name="the_inputs", shape=input_shape, dtype='float32')
    base_model = applications.VGG16(weights='imagenet', include_top=False)
    base_model = maxpooling(base_model)
    inner = base_model(inputs)
    inner = Reshape(target_shape=(int(inner.shape[1]), -1), name='reshape')(inner)
    inner = Dense(512, activation='relu', kernel_initializer='he_normal', name='dense1')(inner) 
    inner = Dropout(0.25)(inner) 
    lstm1 = Bidirectional(LSTM(512, return_sequences=True, kernel_initializer='he_normal', name='lstm1', dropout=0.25, recurrent_dropout=0.25))(inner)
    lstm2 = Bidirectional(LSTM(512, return_sequences=True, kernel_initializer='he_normal', name='lstm1', dropout=0.25, recurrent_dropout=0.25))(lstm1)
    y_pred = Dense(NUM_CLASS, activation='softmax', kernel_initializer='he_normal')(lstm2)
    labels= Input(name='the_labels', shape=[LABEL_LEN], dtype='float32')
    input_length = Input(name='input_length', shape=[1], dtype='int64')
    label_length = Input(name='label_length', shape=[1], dtype='int64')
    loss = Lambda(ctc_lambda, output_shape=(1,), name='ctc')([y_pred, labels, input_length, label_length])
    for layer in base_model.layers:
        layer.trainable = finetune
    y_func = K.function([inputs], [y_pred])
    if training:
        Model(inputs=[inputs, labels, input_length, label_length], outputs=loss).summary()
        return Model(inputs=[inputs, labels, input_length, label_length], outputs=loss), y_func
    else:
        return Model(inputs=[inputs], outputs = y_pred)

In [10]:
#Test model
model, y_func = build_model((*SIZE, 3), training=True, finetune=0)

Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
the_inputs (InputLayer)         (None, 100, 32, 3)   0                                            
__________________________________________________________________________________________________
vgg16 (Sequential)              multiple             14714688    the_inputs[0][0]                 
__________________________________________________________________________________________________
reshape (Reshape)               (None, 6, 1024)      0           vgg16[1][0]                      
__________________________________________________________________________________________________
dense1 (Dense)                  (None, 6, 512)       524800      reshape[0][0]                  

In [11]:
#Training with kfold
def train_kfold(idx, kfold, imgpath, labelpath, finetune):
    sess = tf.Session()
    K.set_session(sess)
    model, y_func = build_model((*SIZE, 3), training=True, finetune=finetune)
    ada = Adam(lr = LR)
    model.compile(loss={'ctc': lambda y_true, y_pred: y_pred}, optimizer=ada)
    train_idx, valid_idx = kfold[idx]
    train_generator = DataGenerator(imgpath, labelpath, *SIZE, train_idx, True)
    train_generator.build_data()
    valid_generator = DataGenerator(imgpath, labelpath, *SIZE, train_idx, False)
    valid_generator.build_data()
    weight_path = 'model/pre_weight_%d.h5'%idx
    ckp = ModelCheckpoint(weight_path, monitor = 'val_loss', verbose=1, save_best_only=True, save_weights_only=True)
    earlystop = keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=0, mode='min')
    if finetune:
        print('load pretrain model')
        model.load_weights(weight_path)
    model.fit_generator(generator=train_generator.next_batch(),
                       steps_per_epoch=int(len(train_idx)/BATCH_SIZE),
                        epochs=EPOCHS,
                        callbacks=[ckp, earlystop],
                        validation_data = valid_generator.next_batch(),
                        validation_steps=int(len(valid_idx)/BATCH_SIZE))

In [12]:
#Trainning
def train():
    nsplits = 5
    nfiles = np.arange(len(os.listdir(IMG_PATH)))
    kfold = list(KFold(nsplits, random_state=2020).split(nfiles))
    for idx in range(nsplits):
        train_kfold(idx, kfold, IMG_PATH, LABEL_PATH, finetune=False)