# CRNN

In [1]:
# Configuration

# These vars represent the cropped image shape
H = 60 
W = 284

# Pool = 4
# PoolSize = 2
# LastFilter = 256

UnitClass = 30 # Number of characters
MaxInputCharLen = 24 # Max label length of input character
MaxPredictedCharLen = 64 # Max label length of predicted character

BATCH_SIZE = 375 # x = train_size/batch_size -> (n-epoch/x ----- )
EPOCH = 50
TRAIN_SIZE = 30000 # Number of training sets that is used. 
VALID_SIZE = 3000 # Number of validation sets that is used.
TEST_SIZE = 100 # Number of testing sets that is used.

## Variables

In [2]:
# Define variable for directory
def image_dir(base_img):
    train_dir = base_img + '\\train_v2\\train'
    test_dir = base_img + '\\test_v2\\test'
    valid_dir = base_img + '\\validation_v2\\validation'
    
    return train_dir, test_dir, valid_dir

def csv_dir(base_csv):
    train_csv_dir = base_csv + '/written_name_train_v2.csv'
    test_csv_dir = base_csv + '/written_name_test_v2.csv'
    valid_csv_dir = base_csv + '/written_name_validation_v2.csv'
    
    return train_csv_dir, test_csv_dir, valid_csv_dir

## Dataset

In [3]:
import pandas as pd
import numpy as np

# DATASET
def read_csv_dir(train, test, valid):
    df_train = pd.read_csv(train)
    df_test = pd.read_csv(test)
    df_valid = pd.read_csv(valid)
    
    return df_train, df_test, df_valid

# Check nan data in dataframe
def nan_data(dataframe, *args):
    """ 
    params:
        dataframe -> data
        *args -> label
    """
    num_args=len(args)
    
    final_res = []
    for data in dataframe:
        bool_res_ = []
        for i in range(num_args):
            bool_res = True if ( data.isna().sum()[args[i]] != 0 ) else False
            bool_res_.append(bool_res)
            
        if bool_res_.__contains__(True):
            final_res.append(True)
        else:
            final_res.append(False)
        
        bool_res_.clear()
        
    return final_res

# Drop nan data
def drop_nan(dataframe, axis=0, inplace=False):
    if not inplace:
        df = dataframe.dropna(axis=axis, inplace=inplace)
        return df
    else:
        dataframe.dropna(axis=axis, inplace=inplace)
        return None

# Reset index of data in dataframe
def _reset_index_(dataframe, inplace=False, drop=False):
    if not inplace:
        df = dataframe.reset_index(inplace=inplace, drop=drop)
        return df
    else:
        dataframe.reset_index(inplace=inplace, drop=drop)
        return None 

def crop_image(image, dim=(64,128)):
    (h, w) = image.shape # Check the input image (old-size image)
    
    if h > dim[0]:
        image = image[:dim[0], :]
    
    if w > dim[1]:
        image = image[:, :dim[1]]
    
    new_img = np.ones(dim)*255 # create white blank image
    
    new_img[:h, :w] = image # fill the array to new image
    
    return new_img

# Label the character of the input name to num or vice versa
def label_name(name, name_to_num=True):
    alphabets = u"ABCDEFGHIJKLMNOPQRSTUVWXYZ'- "
    
    label = []
    if name_to_num:
        name = name.upper()
        for chars in name:
            num = alphabets.find(chars)
            label.append(num)
        return np.array(label)
    
    else:
        chars=""
        for nums in name:
            chars += alphabets[nums]
        return chars

# Funct to label each char of the name in dataframe
def label(df, size, max_char_len, label_base):
    # Create new Label (output label) for dataset padded with the label of each char of the name
    y_ = np.ones([size, max_char_len])*-1
    # This var counts the length of the name. 
    y_len = np.zeros([size, 1])
    
    # Label all the training set
        # The remaining unlabeled pads will be padded with -1
    for i in range(size):
        y_len[i] = len(df.loc[i, label_base])
        y_[i, :len(df.loc[i, label_base])] = label_name(name=df.loc[i, label_base])
    
    return y_, y_len
        

## CRNN

In [4]:
import os
import cv2
import math
import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf
from keras import initializers
from keras import backend as K
from keras.models import Model
from keras.optimizers import SGD
from keras.layers import Input, Conv2D, MaxPooling2D, Reshape, Bidirectional, LSTM, Dense, Lambda, Activation, BatchNormalization, Dropout, Lambda

# BatchNormalization : a technique for training very deep neural networks that normalizes the contributions to a layer for every mini-batch. This has the impact of settling the learning process and drastically decreasing the number of training epochs required to train deep neural networks.
# https://towardsdatascience.com/batch-normalisation-in-deep-neural-network-ce65dd9e8dbf


# target_shape_y = int(conf.H / math.pow(conf.PoolSize, conf.Pool))
# target_shape_x = int(conf.W / math.pow(conf.PoolSize, conf.Pool))*conf.LastFilter

inputs = Input(shape=(H, W, 1), name='INPUT')

# 6 Conv Layer 2 RNN variation used here is Bidirectional LSTM.
def build_model():
    # CNN
    conv1 = Conv2D(64, kernel_size=(1,1), padding='same', kernel_initializer='he_normal', name='CONV1')(inputs)
    conv1 = BatchNormalization()(conv1)
    conv1 = Activation('relu')(conv1)

    conv2 = Conv2D(64, kernel_size=(1,1), padding='same', kernel_initializer='he_normal', name='CONV2')(conv1)
    conv2 = BatchNormalization()(conv2)
    conv2 = Activation('relu')(conv2)

    pool1 = MaxPooling2D(strides=(2,2), name='POOL1')(conv2)

    conv3 = Conv2D(128, kernel_size=(1,1), padding='same', kernel_initializer='he_normal', name='CONV3')(pool1)
    conv3 = BatchNormalization()(conv3)
    conv3 = Activation('relu')(conv3)

    conv4 = Conv2D(128, kernel_size=(1,1), padding='same', kernel_initializer='he_normal', name='CONV4')(conv3)
    conv4 = BatchNormalization()(conv4)
    conv4 = Activation('relu')(conv4)

    pool2 = MaxPooling2D(strides=(2,2), name='POOL2')(conv4)
    pool2 = Dropout(0.3)(pool2)

    conv5 = Conv2D(256, kernel_size=(1,1), padding='same', kernel_initializer='he_normal', name='CONV5')(pool2)
    conv5 = BatchNormalization()(conv5)
    conv5 = Activation('relu')(conv5)

    conv6 = Conv2D(256, kernel_size=(1,1), padding='same', kernel_initializer='he_normal', name='CONV6')(conv5)
    conv6 = BatchNormalization()(conv6)
    conv6 = Activation('relu')(conv6)

    # layers = Conv2D(128, kernel_size=(3,3), padding='same', kernel_initializer='he_normal', name='CONV3')(layers)
    # layers = BatchNormalization()(layers)
    # layers = Activation('relu')(layers)

    pool3 = MaxPooling2D(pool_size=(1,2), strides=(1,2), name='POOL3')(conv6)
    pool3 = Dropout(0.3)(pool3)

    # layers = Conv2D(256, kernel_size=(3,3), padding='same', kernel_initializer=initializers.HeNormal, name='CONV4')(layers)
    # layers = BatchNormalization()(layers)
    # layers = Activation('relu')(layers)

    # layers = MaxPooling2D(name='POOL4')(layers)
    # layers = Dropout(0.3)(layers)

    #RNN
    layers = Reshape(target_shape=((3840, 35)))(pool3)
    layers = Dense(64, activation='relu', kernel_initializer='he_normal', name='DENSE1')(layers)

    layers = Bidirectional(LSTM(256, return_sequences=True, name='LSTM'))(layers)
    layers = Bidirectional(LSTM(256, return_sequences=True, name='LSTM2'))(layers)
        

    layers = Dense(UnitClass, kernel_initializer='he_normal')(layers)
    y_pred = Activation('softmax', name='softmax')(layers)
    # y_pred = Bidirectional(LSTM(const.UnitClass, activation='softmax'))(layers)

    model = Model(inputs=inputs, outputs=y_pred)

    model.summary()

    return y_pred

# Since RNNs are powerful for sequence learning, they require pre-segmented training data, 
# Each character in the label needs to be aligned to it location of occurrence in the input image.
# Post-processing techniques are required on the output of RNN, which is a probability matrix, to transform it to the actual sequence of labels. 
# Connectionist Temporal Classification (CTC) is used to get those jobs above done. 
def ctc_loss_func(args):
    y_pred, y_true, input_len, label_len = args
    # the 2 is critical here since the first couple outputs of the RNN tend to be garbage.
    y_pred = y_pred[:,2:,:]
    return K.ctc_batch_cost(y_true, y_pred, input_len, label_len)

def sgd_optimizer():
    sgd = SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True, clipnorm=5)
    return sgd

# Create new input label for ctc
def input_label():
    labels = Input(shape=[MaxInputCharLen], dtype='float32')
    input_len = Input(shape=[1], dtype='int64')
    label_len= Input(shape=[1], dtype='int64')
    return labels, input_len, label_len

# This func to define a loss function
def loss(y_pred, labels, input_len, label_len, name):
    ctc = Lambda(ctc_loss_func, output_shape=(1,), name=name)([y_pred, labels, input_len, label_len]) 
    return ctc

# Define crnn model with ctc implementation as output
    # CTC is to interpret outputs of RNN as a probability distribution over all possible label sequences.
def CRNN():
    y_pred = build_model() # Gets the output of the model
    labels, input_len, label_len = input_label() # Create new labels for input
    # Define CTC
    ctc = loss(y_pred, labels, input_len, label_len, name='ctc') 
    # Define Model 
    model = Model(
        inputs=[inputs, labels, input_len, label_len],
        outputs=ctc
    )
    return model, y_pred

def train(model, x_train, x_valid, y_train, y_valid, train_input_len, train_label_len, train_output, valid_input_len, valid_label_len, valid_output, opt, y_pred):
    model.compile(
        loss={
            'ctc': lambda y_true, y_pred: y_pred
        },
        optimizer=opt
    )
    
    result = model.fit(x=[x_train, y_train, train_input_len, train_label_len], y=train_output,
              validation_data=([x_valid, y_valid, valid_input_len, valid_label_len], valid_output),
              epochs=EPOCH, batch_size=BATCH_SIZE)
    
    return result

def validation(model, x_, size):
    # try predict the data
    pred = model.predict(x_)
    # var to sum up the total of the given input
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
    decoded = K.get_value(K.ctc_decode(pred, input_length=input_len)[0][0])
    
    preds = []
    for i in range(size):
        decoded_name = label_name(decoded[i], name_to_num=False)
        preds.append(decoded_name)
    
    return preds

def validation_accuracy_metrics(data, preds, size, features):
    valid_feature_names = data.loc[:size, features]
    
    corr_char = 0 # init var to check if the model has guessed correctly the character of each names
    total_char = 0
    for i in range(size): # loop validation set
        # Check and compare decoded predicted name with the true name of the validation set 
        valid_fname_length = len(valid_feature_names[i])
        preds_fname_length = len(preds[i])
        total_char += valid_fname_length
        for j in range(min(valid_fname_length, preds_fname_length)):
            if valid_fname_length[j] == preds_fname_length[j]:
                corr_char += 1
    
    print('total character : ', total_char)
    char_predict_accuracy = float(corr_char*100/total_char)
    return char_predict_accuracy

def testing(model, test_dir, df, X_test, size):
    plt.figure(figsize=(20, 12))
    for i in range(size):
        # show test image
        img_dir = os.path.join(test_dir, df.loc[i, 'FILENAME'])
        img = cv2.imread(img_dir, cv2.IMREAD_GRAYSCALE)
        plt.subplot(2, 5, i+1)
        plt.imshow(img)
        
        # show predicted names
        pred = model.predict(X_test)
        input_len = np.ones(pred.shape[0]) * pred.shape[1]
        decoded = K.get_value(K.ctc_decode(pred, input_length=input_len)[0][0]) # decode the predicted names
        
        plt.title(label_name(decoded[0], name_to_num=False)) # Convert the decoded to be name string
        

## Handwriting

In [5]:
# -*- coding: utf-8 -*-
"""
Created on Mon Jul 11 11:21:40 2022

@author: user
"""

import cv2
import os
import numpy as np

def IMGCrop(path, size, data):
    img_arr = []
    for i in range(size):
        img_dir = os.path.join(path, data.loc[i, 'FILENAME'])
        img = cv2.imread(img_dir, cv2.IMREAD_GRAYSCALE) # Grayscale channel 1
        img = crop_image(img, dim=(60, 284))
        # cv2.imshow('image', img)
        # cv2.waitKey(0) 
        # cv2.destroyAllWindows()
        img = img/255.
        img_arr.append(img)
    
    img_arr = np.array(img_arr)
    return img_arr
        
if __name__=="__main__":
    
    BASE = 'C:\\Clarenti\\Data\\Project\\ML\\Program\\Dataset\\Recognition\\Handwriting'
    
    # DEFINE VARIABLES
    train_dir, test_dir, valid_dir = image_dir(BASE)
    train_csv_dir, test_csv_dir, valid_csv_dir = csv_dir(BASE)
    
    # READ CSV
    df_train, df_test, df_valid = read_csv_dir(train_csv_dir, test_csv_dir, valid_csv_dir)
    print(df_train.shape)
    print(df_valid.shape)
    
    # CHECK FOR NAN DATA
    is_nan = nan_data([df_train, df_valid], 'FILENAME', 'IDENTITY')
    print({'train_nan':is_nan[0], 'valid_nan':is_nan[1]})
    
    # DROP NAN
    drop_nan(df_train, inplace=True)
    drop_nan(df_valid, inplace=True)
    
    # REMOVE INVALID DATA
    df_train = df_train[df_train['IDENTITY']!='UNREADABLE']
    df_valid = df_valid[df_valid['IDENTITY']!='UNREADABLE']
    
    print('CURRENT TRAIN : ', df_train.shape)
    print('CURRENT_VALID : ', df_valid.shape)
    
    # RESET INDEX ON DATAFRAME AFTER FILTERING
    _reset_index_(df_train, inplace=True, drop=True)
    _reset_index_(df_valid, inplace=True, drop=True)
    
    print(df_train.tail(5))
   
    #==========================================================    
    # IMAGE PREPROCESSING
    
    # CROP IMAGE
    X_train = IMGCrop(train_dir, TRAIN_SIZE, df_train)
    X_valid = IMGCrop(valid_dir, VALID_SIZE, df_valid)
    X_test = IMGCrop(test_dir, TEST_SIZE, df_test)

    print('CURRENT TRAIN : ', X_train.shape)
    print('CURRENT VALID : ', X_valid.shape)
    
    # RESHAPE
    X_train = np.array(X_train).reshape(-1, 60, 284, 1) # -1 -> =value digabung/diwrap jadi 1
    X_valid = np.array(X_valid).reshape(-1, 60, 284, 1)
    X_test = np.array(X_test).reshape(-1, 60, 284, 1)
    
    print('CURRENT TRAIN AFTER RESHAPE : ', X_train.shape)
    print('CURRENT VALID AFTER RESHAPE : ', X_valid.shape)
    
    
    # =========================================================
    # BUILD AND TRAIN MODEL
    model, pred = CRNN()
    opt = sgd_optimizer()
    
    # ===================================================
    y_train, y_train_len = label(df_train, TRAIN_SIZE, MaxInputCharLen, 'IDENTITY')
    y_valid, y_valid_len = label(df_valid, VALID_SIZE, MaxInputCharLen, 'IDENTITY')
    
    train_input_len, valid_input_len = np.ones([TRAIN_SIZE, 1]) * 62, np.ones([VALID_SIZE, 1]) * 62
    train_output = np.zeros([TRAIN_SIZE])
    valid_output = np.zeros([VALID_SIZE])
    
    # FIT
    hist = train(model, X_train, X_valid, y_train, y_valid, train_input_len, y_train_len, train_output, valid_input_len, y_valid_len, valid_output, opt, pred)
    
    # ==========================================================
    # VALIDATION
    valid_preds = validation(hist, X_valid, VALID_SIZE)
    valid_acc = validation_accuracy_metrics(df_valid, valid_preds, VALID_SIZE, 'IDENTITY')
    print('correct chars predicted : %f' %(valid_acc))
    
    # TESTING
    testing(hist, test_dir, df_test, X_test, 10)
    # test_acc = CRNN.validation_accuracy_metrics(df_valid, valid_preds, conf.VALID_SIZE, 'IDENTITY')
    # print('correct chars predicted : %f' %(valid_acc))


(330961, 2)
(41370, 2)
{'train_nan': True, 'valid_nan': True}
CURRENT TRAIN :  (330294, 2)
CURRENT_VALID :  (41280, 2)
                FILENAME       IDENTITY
330289  TRAIN_330957.jpg          LENNY
330290  TRAIN_330958.jpg        TIFFANY
330291  TRAIN_330959.jpg  COUTINHO DESA
330292  TRAIN_330960.jpg         MOURAD
330293  TRAIN_330961.jpg        HELOISE
CURRENT TRAIN :  (30000, 60, 284)
CURRENT VALID :  (3000, 60, 284)
CURRENT TRAIN AFTER RESHAPE :  (30000, 60, 284, 1)
CURRENT VALID AFTER RESHAPE :  (3000, 60, 284, 1)
Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 INPUT (InputLayer)          [(None, 60, 284, 1)]      0         
                                                                 
 CONV1 (Conv2D)              (None, 60, 284, 64)       128       
                                                                 
 batch_normalization (BatchN  (None, 60, 284, 64)      256    

  super(SGD, self).__init__(name, **kwargs)
