In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2

import tensorflow as tf
from keras import backend as K
from keras.models import Model, model_from_json, load_model
from keras.layers import Input, Conv2D, MaxPooling2D, Reshape, Bidirectional, LSTM, Dense, Lambda, Activation, BatchNormalization, Dropout
from tensorflow.keras.optimizers import Adam

In [2]:
import os
os.environ["TF_FORCE_GPU_ALLOW_GROWTH"]="true"

In [3]:
train = pd.read_csv('preprocessed_train.csv')
valid = pd.read_csv('preprocessed_valid.csv')

In [None]:
train_size = 4096
valid_size= 512

In [None]:
num_train = np.arange(len(train))
num_valid = np.arange(len(valid))
np.random.shuffle(num_train)
np.random.shuffle(num_valid)

In [None]:
def get_train(idx, batch_size):
    x = []
    steps_done = idx * batch_size
    indexes = num_train[steps_done : (idx + 1) * batch_size]
    for i in indexes:
        img_dir = 'processed_train/' + train.loc[i, 'FILENAME']
        image = cv2.imread(img_dir, cv2.IMREAD_GRAYSCALE)
        x.append(image)
    x = np.array(x).reshape(-1, 256, 64, 1)
    return x

In [None]:
def get_valid(idx, batch_size):
    x = []
    steps_done = idx * batch_size
    indexes = num_valid[steps_done : (idx + 1) * batch_size]
    for i in indexes:
        img_dir = 'processed_valid/' + valid.loc[i, 'FILENAME']
        image = cv2.imread(img_dir, cv2.IMREAD_GRAYSCALE)
        x.append(image)
    x = np.array(x).reshape(-1, 256, 64, 1)
    return x

In [4]:
alphabets = u"ABCDEFGHIJKLMNOPQRSTUVWXYZ-'` "
max_str_len = 34
num_of_characters = len(alphabets) + 1
num_of_timestamps = 64

def label_to_num(label):
    label_num = []
    for ch in label:
        label_num.append(alphabets.find(ch))
        
    return np.array(label_num)

def num_to_label(num):
    ret = ""
    for ch in num:
        if ch == -1:  # CTC Blank
            break
        else:
            ret+=alphabets[ch]
    return ret

In [None]:
def train_labels(idx, batch_size):
    steps_done = idx * batch_size
    indexes = num_train[steps_done : (idx + 1) * batch_size]

    y = np.ones([batch_size, max_str_len]) * -1
    label_len = np.zeros([batch_size, 1])
    input_len = np.ones([batch_size, 1]) * (num_of_timestamps-2)
    output = np.zeros([batch_size])

    for index, i in enumerate(indexes):
        label_len[index] = len(train.loc[i, 'IDENTITY'])
        y[index, 0:len(train.loc[i, 'IDENTITY'])]= label_to_num(train.loc[i, 'IDENTITY'])

    return y, label_len, input_len, output 

In [None]:
def valid_labels(idx, batch_size):
    steps_done = idx * batch_size
    indexes = num_valid[steps_done : (idx + 1) * batch_size]

    y = np.ones([batch_size, max_str_len]) * -1
    label_len = np.zeros([batch_size, 1])
    input_len = np.ones([batch_size, 1]) * (num_of_timestamps-2)
    output = np.zeros([batch_size])

    for index, i in enumerate(indexes):
        label_len[index] = len(valid.loc[i, 'IDENTITY'])
        y[index, 0:len(valid.loc[i, 'IDENTITY'])]= label_to_num(valid.loc[i, 'IDENTITY'])

    return y, label_len, input_len, output 

In [5]:
input_data = Input(shape=(256, 64, 1), name='input')

inner = Conv2D(32, (3, 3), padding='same', name='conv1', kernel_initializer='he_normal')(input_data)  
inner = BatchNormalization()(inner)
inner = Activation('relu')(inner)
inner = MaxPooling2D(pool_size=(2, 2), name='max1')(inner)

inner = Conv2D(64, (3, 3), padding='same', name='conv2', kernel_initializer='he_normal')(inner)
inner = BatchNormalization()(inner)
inner = Activation('relu')(inner)
inner = MaxPooling2D(pool_size=(2, 2), name='max2')(inner)
inner = Dropout(0.3)(inner)

inner = Conv2D(128, (3, 3), padding='same', name='conv3', kernel_initializer='he_normal')(inner)
inner = BatchNormalization()(inner)
inner = Activation('relu')(inner)
inner = MaxPooling2D(pool_size=(1, 2), name='max3')(inner)
inner = Dropout(0.3)(inner)

inner = Reshape(target_shape=((64, 1024)), name='reshape')(inner)
inner = Dense(64, activation='relu', kernel_initializer='he_normal', name='dense1')(inner)

inner = Bidirectional(LSTM(256, return_sequences=True, name='lstminner1'), name = 'lstm1')(inner) # try gru instead of lstm
inner = Bidirectional(LSTM(256, return_sequences=True, name='lstminner2'), name = 'lstm2')(inner)

inner = Dense(num_of_characters, kernel_initializer='he_normal',name='dense2')(inner)
y_pred = Activation('softmax', name='softmax')(inner)

model = Model(inputs=input_data, outputs=y_pred)
# model.summary()

In [6]:
def ctc_lambda_func(args):
    y_pred, labels, input_length, label_length = args
    y_pred = y_pred[:, 2:, :]
    return K.ctc_batch_cost(labels, y_pred, input_length, label_length)

In [7]:
labels = Input(name='gtruth_labels', shape=[max_str_len], dtype='float32')
input_length = Input(name='input_length', shape=[1], dtype='int64')
label_length = Input(name='label_length', shape=[1], dtype='int64')

ctc_loss = Lambda(ctc_lambda_func, output_shape=(1,), name='ctc')([y_pred, labels, input_length, label_length])
model_final = Model(inputs=[input_data, labels, input_length, label_length], outputs=ctc_loss)

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate = 0.0001)
model_final.compile(loss={'ctc': lambda y_true, y_pred: y_pred}, optimizer=optimizer, metrics=['accuracy'])

In [None]:
from keras.callbacks import EarlyStopping
from keras.callbacks import ModelCheckpoint

In [None]:
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=15)
mc = ModelCheckpoint('model.h5', monitor='val_accuracy', mode='max', verbose=1, save_best_only=True, save_weights_only=True)
mc2 = ModelCheckpoint('model_loss.h5', monitor='val_loss', mode='min', verbose=1, save_best_only=True, save_weights_only=True)

In [None]:
path = 'optimizer'
num_iterations = int(np.floor(len(train) / train_size))
idx = 0
for i in range(num_iterations):
    train_x = get_train(idx, train_size)
    valid_x = get_valid(idx, valid_size)
    train_y, train_label_len, train_input_len, train_output = train_labels(idx, train_size)
    valid_y, valid_label_len, valid_input_len, valid_output = valid_labels(idx, valid_size)

    myModel = model_final.fit(x=[train_x, train_y, train_input_len, train_label_len], y=train_output, 
                validation_data=([valid_x, valid_y, valid_input_len, valid_label_len], valid_output),
                epochs=120, shuffle=True, callbacks=[es, mc, mc2])
    
    np.save(f'{path}/optimizer.npy', optimizer.get_weights())

    print("Saved model to disk, idx:", idx)
    idx += 1

In [None]:
# loading optimizer

# optimizer = tf.keras.optimizers.Adam()

# # Load the optimizer weights
# opt_weights = np.load(f'{path}/optimizer.npy', allow_pickle=True)

# # Train a dummy record
# # I'm using the universal sentence encoder which requires a string as input
# with tf.GradientTape() as tape:
#     # preduct a dummy record
#     tmp = model('')
#     # create a dummy loss
#     loss = tf.reduce_mean((tmp - tmp)**2)

# # calculate the gradiens and add the gradients
# # the gradients should be near 0
# gradients = tape.gradient(loss, model.trainable_variables)
# optimizer.apply_gradients(zip(gradients, model.trainable_variables))

# # set the weights
# optimizer.set_weights(opt_weights)

In [8]:
# loading a model

# json_file = open('model.json', 'r')
# loaded_model_json = json_file.read()
# json_file.close()
# model = model_from_json(loaded_model_json)
# # load weights into new model

model.load_weights("model.h5")

# print("Loaded model from disk")

In [None]:
plt.plot(myModel.history['loss'], label='train loss')
plt.plot(myModel.history['val_loss'], label='val loss')
plt.legend()
plt.show()
plt.savefig('LossVal_loss')

# plot the accuracy
plt.plot(myModel.history['accuracy'], label='train acc')
plt.plot(myModel.history['val_accuracy'], label='val acc')
plt.legend()
plt.show()
plt.savefig('AccVal_acc')

In [9]:
valid_x = []

for i in range(len(valid)):
    img_dir = 'processed_valid/' + valid.loc[i, 'FILENAME']
    image = cv2.imread(img_dir, cv2.IMREAD_GRAYSCALE)
    valid_x.append(image)
valid_x = np.array(valid_x).reshape(-1, 256, 64, 1)

In [10]:
preds = model.predict(valid_x)
decoded = K.get_value(K.ctc_decode(preds, input_length=np.ones(preds.shape[0])*preds.shape[1], greedy=True)[0][0])

prediction = []
for i in range(len(valid_x)):
    prediction.append(num_to_label(decoded[i]))

In [11]:
y_true = valid.loc[0:len(valid_x), 'IDENTITY']
correct_char = 0
total_char = 0
correct = 0

for i in range(len(valid_x)):
    pr = prediction[i]
    tr = y_true[i]
    total_char += len(tr)
    
    for j in range(min(len(tr), len(pr))):
        if tr[j] == pr[j]:
            correct_char += 1
            
    if pr == tr :
        correct += 1 
    
print('Correct characters predicted : %.2f%%' %(correct_char*100/total_char))
print('Correct words predicted      : %.2f%%' %(correct*100/len(valid_x)))

Correct characters predicted : 80.40%
Correct words predicted      : 67.27%
