In [1]:
import matplotlib.pyplot as plt
import numpy as np
import random
import cv2
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
from keras.layers.merge import Concatenate
from keras.layers.core import Lambda
from keras.models import Model
from keras.models import load_model
from keras.layers import Input
import tensorflow as tf
import os
import skimage
from keras.models import Sequential
from keras.layers import Convolution2D, MaxPooling2D
from keras.layers import Activation, Dropout, Flatten, Dense
from keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img
from keras.applications.vgg16 import preprocess_input
from keras.applications.vgg16 import VGG16
from keras.layers import GRU

  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


## Toast data

In [2]:
characters = '0123456789+-*/=()'
width, height, n_len, n_class = 400, 80, 14, len(characters) + 1

In [3]:
datagen = ImageDataGenerator(rescale=1. / 255)

In [4]:
YY = None
YY_val = None

In [5]:
def generate():
    ds = '0123456789'
    ts = ['{}{}{}{}{}', '({}{}{}){}{}', '{}{}({}{}{})']
    os = '+-*/'
    # os = ['+', '-', 'times', 'div']
    cs = [random.choice(ds) if x % 2 == 0 else random.choice(os) for x in range(5)]
    return random.choice(ts).format(*cs)

def get_img_by_char(char, base_path='/Users/imperatore/tmp/pre_ocr'):
    """
    get a img by giving char
    :param char:
    :param base_path:
    :return:
    """
    opdict = {'+': 10, '-': 11, '*': 12, '/': 13, '=': 14, '(': 15, ')': 16}
    if char in opdict.keys():
        char = opdict[char]
    path = os.path.join(base_path, str(char))
    files = os.listdir(path)

    rdm = random.randint(0, len(files) - 1)
    
    if rdm >= len(files):
        print(path, len(files), rdm)
        
    file = files[rdm]
    path = os.path.join(path, file)
    return cv2.imread(path, cv2.IMREAD_GRAYSCALE)

def get_sequence_img(chars):
    x = get_img_by_char(chars[0])
    for i in range(1, len(chars)):
        x = np.hstack([x, get_img_by_char(chars[i])])
    x = cv2.resize(x, (400, 80))
#     x = skimage.util.random_noise(x, mode='gaussian', clip=True)
#     print('get_sequence_img output')
#     plt.imshow(x)
#     plt.show()
#     print (chars, x.shape)
    return x

def gen(batch_size=128, gene=4):
    X = np.zeros((batch_size, width, height, 1), dtype=np.uint8)
    y = np.zeros((batch_size, n_len), dtype=np.uint8)
    while True:
        for i in range(batch_size):
            random_str = ''.join([random.choice(characters) for j in range(n_len)])
#             random_str = '60/3=20'
            tmp = np.array(get_sequence_img(random_str))
            tmp = tmp.reshape(tmp.shape[0], tmp.shape[1], 1)
            tmp = tmp.transpose(1, 0, 2)
            
            X[i] = tmp
            y[i] = [characters.find(x) for x in random_str]
        
        i = 0
        XX = None
        yy = None
        for batch in datagen.flow(X, y, batch_size=batch_size):
#             print(batch[0].shape, batch[1].shape)
            
            if not type(XX) == np.ndarray:
                XX = batch[0]
                yy = batch[1]
            else:
                XX = np.concatenate([XX, batch[0]], axis=0)
                yy = np.concatenate([yy, batch[1]], axis=0)
            
            i += 1
            if i >= gene:
                break
        yield [XX, yy, np.ones(batch_size * gene) * rnn_length, np.ones(batch_size * gene) * n_len], np.ones(batch_size * gene)


def gen_single(batch_size=128, gene=4, flag=None):
    X = np.zeros((batch_size, width, height, 3), dtype=np.uint8)
    y = np.zeros((batch_size, n_len), dtype=np.uint8)
    while True:
        for i in range(batch_size):
            random_str = ''.join([random.choice(characters) for j in range(n_len)])
#             random_str = '60/3=20'
            tmp = np.array(get_sequence_img(random_str))
            tmp = tmp.reshape(tmp.shape[0], tmp.shape[1], 1)
            tmp0 = np.copy(tmp)
            tmp = np.concatenate([tmp, tmp0], axis=2)
            tmp = np.concatenate([tmp, tmp0], axis=2)
            tmp = tmp.transpose(1, 0, 2)
            
            X[i] = tmp
            y[i] = [characters.find(x) for x in random_str]
        
        i = 0
        XX = None
        yy = None
        for batch in datagen.flow(X, y, batch_size=batch_size):
#             print(batch[0].shape, batch[1].shape)
            
            if not type(XX) == np.ndarray:
                XX = batch[0]
                yy = batch[1]
            else:
                XX = np.concatenate([XX, batch[0]], axis=0)
                yy = np.concatenate([yy, batch[1]], axis=0)
            
            i += 1
            if i >= gene:
                break
        if flag == None:
            YY = yy
        else:
            YY_val = yy
        yield XX

# Evaluator

In [6]:
from keras import backend as K

def ctc_lambda_func(args):
    y_pred, labels, input_length, label_length = args
    y_pred = y_pred[:, 2:, :]
    return K.ctc_batch_cost(labels, y_pred, input_length, label_length)

In [7]:
def evaluate(batch_size=128, steps=10):
    batch_acc = 0
    generator = gen(batch_size)
    for i in range(steps):
        [X_test, y_test, _, _], _  = next(generator)
        y_pred = base_model.predict(X_test)
        shape = y_pred[:,2:,:].shape
        ctc_decode = K.ctc_decode(y_pred[:,2:,:], input_length=np.ones(shape[0])*shape[1])[0][0]
        out = K.get_value(ctc_decode)[:, :n_len]
        if out.shape[1] == n_len:
            batch_acc += (y_test == out).all(axis=1).mean()
    return batch_acc / steps

In [8]:
from keras.callbacks import *

class Evaluator(Callback):
    def __init__(self):
        self.accs = []
    
    def on_epoch_end(self, epoch, logs=None):
        acc = evaluate(steps=20)*100
        self.accs.append(acc)
        print('')
        print('acc: %f%%' % acc)

evaluator = Evaluator()

# VGG16 bottleneck

In [9]:
model = VGG16(weights='imagenet', include_top=False)
# model = VGG16(weights='./vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5', include_top=False)

In [10]:
model.layers[-1]

<keras.layers.pooling.MaxPooling2D at 0x1a299b1978>

In [11]:
# conv_shape = model.output.get_shape().as_list()
pred = model.predict_generator(gen_single(32), 10)

In [12]:
val_pred = model.predict_generator(gen_single(32), 8)

In [11]:
pred = np.load('pred.npy')
val_pred = np.load('pred_val.npy')

In [12]:
conv_shape = pred.shape
rnn_length = conv_shape[1]
rnn_dimen = conv_shape[2] * conv_shape[3]
print(conv_shape, rnn_length, rnn_dimen)

(1280, 12, 2, 512) 12 1024


In [14]:
np.save('pred.npy', pred)
np.save('pred_val.npy', val_pred)

x = Dense(128, kernel_initializer='he_normal')(x)
x = BatchNormalization()(x)
x = Activation('relu')(x)

gru_1 = GRU(rnn_size, return_sequences=True, kernel_initializer='he_normal', name='gru1')(x)
gru_1b = GRU(rnn_size, return_sequences=True, go_backwards=True, kernel_initializer='he_normal', 
             name='gru1_b')(x)
gru1_merged = add([gru_1, gru_1b])

gru_2 = GRU(rnn_size, return_sequences=True, kernel_initializer='he_normal', name='gru2')(gru1_merged)
gru_2b = GRU(rnn_size, return_sequences=True, go_backwards=True, kernel_initializer='he_normal', 
             name='gru2_b')(gru1_merged)
x = concatenate([gru_2, gru_2b])
x = Dropout(0.25)(x)
x = Dense(n_class, kernel_initializer='he_normal', activation='softmax')(x)
base_model = Model(input=input_tensor, output=x)

labels = Input(name='the_labels', shape=[n_len], dtype='float32')
input_length = Input(name='input_length', shape=(1,), dtype='int64')
label_length = Input(name='label_length', shape=(1,), dtype='int64')
loss_out = Lambda(ctc_lambda_func, name='ctc')([base_model.output, labels, input_length, label_length])

In [13]:
md = Sequential()
md.add(Dense(128, input_shape=(12, 2, 512), activation='relu', kernel_initializer='he_normal'))
md.add(Activation('relu'))

rnn_size = 128
gru_1 = GRU(rnn_size, return_sequences=True, kernel_initializer='he_normal', name='gru1')
gru_1b = GRU(rnn_size, return_sequences=True, kernel_initializer='he_normal', name='gru1_b')
# # gru1_merged = add([gru_1, gru_1_b])
# md.add(Concatenate([gru_1, gru_1b]))

gru_2 = GRU(rnn_size, return_sequences=True, kernel_initializer='he_normal', name='gru2')
gru_2b = GRU(rnn_size, return_sequences=True, kernel_initializer='he_normal', name='gru2_b')

md.add(Dropout(0.25))
md.add(Dense(n_class, kernel_initializer='he_normal', activation='softmax'))

labels = Input(name='the_labels', shape=[n_len], dtype='float32')
input_length = Input(name='input_length', shape=(1,), dtype='int64')
label_length = Input(name='label_length', shape=(1,), dtype='int64')
loss_out = Lambda(ctc_lambda_func, name='ctc')([md.output, labels, input_length, label_length])

md.compile(loss={'ctc': lambda y_true, y_pred: y_pred}, optimizer='adam', metrics=['accuracy'])

md.fit(pred, YY,
          nb_epoch=50, batch_size=32,
          validation_data=(validation_data, validation_labels))
md.save_weights('bottleneck_fc_model.h5')

ValueError: Dimension must be 4 but is 3 for 'ctc/transpose_2' (op: 'Transpose') with input shapes: [?,10,2,18], [3].

In [15]:
from keras.engine.input_layer import Input

__init__.py             mnist_test.ipynb        vgg16_bottleneck.ipynb
crnn.ipynb              pred.npy
crnn.py                 pred_val.npy
