<a href="https://colab.research.google.com/github/yazeedMohi/SCAR-Scene-Captured-Arabic-Recognition/blob/master/SCAR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **SCAR (Scene Calibrated Arabic Recognizer)**

## Mount Drive

In [0]:
from google.colab import drive
drive.mount('/content/drive')
%tensorflow_version 2.1

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


# Run Here

In [0]:
main1(image="/content/drive/My Drive/data/demo/photo5.jpg",localizer = "east",  recognizer = "yazeed")

# Config

In [0]:
import os
class CFG:
    def __init__(self):
        self.initial_epoch = 0
        self.validation_split_ratio = 0.1
        self.max_train_img_size = int(736)
        self.max_predict_img_size = int(1536)
        assert self.max_train_img_size in [256, 384, 512, 640, 736], \
            'max_train_img_size must in [256, 384, 512, 640, 736]'
        if self.max_train_img_size == 256:
            self.batch_size = 8
        elif self.max_train_img_size == 384:
            self.batch_size = 4
        elif self.max_train_img_size == 512:
            self.batch_size = 2
        else:
            self.batch_size = 1
        
        self.origin_image_dir_name = 'Images/Train/'
        self.origin_txt_dir_name = 'GroundTruths/Train/'
        self.origin_image_dir_name_test = 'Images/Test/'
        self.origin_txt_dir_name_test = 'GroundTruths/Test/'
        
        # in paper it's 0.3, maybe to large to this problem
        self.shrink_ratio = 0.2
        # pixels between 0.2 and 0.6 are side pixels
        self.shrink_side_ratio = 0.6
        self.epsilon = 1e-4

        self.num_channels = 3
        self.feature_layers_range = range(5, 1, -1)
        self.feature_layers_num = len(self.feature_layers_range)
        self.pixel_size = 2 ** self.feature_layers_range[-1]
        self.locked_layers = False

        
        
        self.pixel_threshold = 0.9
        self.side_vertex_pixel_threshold = 0.9
        self.trunc_threshold = 0.1
        self.predict_cut_text_line = True
        self.predict_write_txt = False

        self.greedy=False
        self.beam_width=10
        self.top_paths=1
cfg = CFG()

# Main

In [0]:
import argparse
import cv2
import h5py
import os
import string
import datetime

def main1(image,localizer = None,loc_source = "icpr", recognizer = None,rec_source="khattx", replacement = False):

    if(localizer):
        target_path = os.path.join("/content/drive/My Drive/", "output", loc_source, localizer, "checkpoint_weights.hdf5")
        loc_model = NNModel_loc(architecture=localizer)
        loc_model.compile()
        loc_model.load_checkpoint(target=target_path)

        print('3')
    if(recognizer):
        if(rec_source.find("khatt") != -1):
            arabic = True
            # Best size for KHATT & KHATTX
            input_size = (1024, 64, 1)
            charset_base = 'ءآأإابتةثجحخدذرزسشصضطظعغفقكلمنؤهويىئ0123456789@:,.?!"()//\=-_#%$^&*+ '
        else:
            arabic = False
            input_size = (1024, 128, 1)
            charset_base = string.printable[:95]
        max_text_length = 128
        tokenizer = Tokenizer(chars=charset_base, max_text_length=max_text_length)
        target_path = os.path.join("/content/drive/My Drive/", "output", rec_source, recognizer, "checkpoint_weights.hdf5")
        rec_model = NNModel_rec(architecture=recognizer,
                                input_size=input_size,
                                vocab_size=tokenizer.vocab_size,
                                top_paths=10)
        rec_model.compile()
        rec_model.load_checkpoint(target=target_path) 
    
    input_images = []
    segmented = None

    


    if(localizer):
        print("Localizing...")
        input_images, segmented = predict_image(loc_model.model, image, cfg.pixel_threshold)
        cv2_imshow(segmented)
    else:
        img = preprocess(cv2.imread(image), input_size=input_size)
        print(img.shape)
        input_images = [img]
    if(recognizer):
        print("Recognizing...")
        print(input_images[0].shape)
        print(np.asarray(input_images).shape)
        predicts, _ = predict_text(model = rec_model.model,x=input_images, ctc_decode=True)
        predicts = [tokenizer.decode(x[0]) for x in predicts]
        for pd, i in zip(predicts,range(len(predicts))):
          cv2_imshow(adjust_to_see(input_images[i]))
          print(str(pd))

# Localization

## Model

In [0]:

"""
NNModel Class
The NNModel class use Tensorflow Keras module.

x is the input features and y the labels.
"""
from tensorflow.keras import Input, Model
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.layers import Concatenate, Conv2D, UpSampling2D, BatchNormalization
from tensorflow.keras.optimizers import Adam

from tensorflow.keras.callbacks import CSVLogger, TensorBoard, ModelCheckpoint
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

import tensorflow as tf


##SET DEFS
class NNModel_loc:

    def __init__(self,
                 architecture="east",
                 num_channels=3,
                 locked_layers=False,
                 feature_layers_range=range(5, 1, -1),
                 feature_layers_num=4,
                 learning_rate=1e-3
                ):
        """
        Initialization of a NN Model.

        parameters:
            architecture: option of the architecture model to build and compile.
        """

        self.architecture = globals()[architecture]
        self.num_channels = num_channels
        self.locked_layers = locked_layers
        self.feature_layers_range = feature_layers_range
        self.feature_layers_num = feature_layers_num
        self.learning_rate = learning_rate
        
        self.model= None

    def summary(self, output=None, target=None):
        """Show/Save model structure (summary)"""

        self.model.summary()

        if target is not None:
            os.makedirs(output, exist_ok=True)

            """with open(os.path.join(output, target), "w") as f:
                with redirect_stdout(f):
                    self.model.summary()"""

    def load_checkpoint(self, target):
        """ Load a model with checkpoint file"""

        if os.path.isfile(target):
            if self.model is None:
                self.compile()

            self.model.load_weights(target)

    def get_callbacks(self, logdir, checkpoint, monitor="val_loss", verbose=0):
        """Setup the list of callbacks for the model"""

        callbacks = [
            CSVLogger(
                filename=os.path.join(logdir, "epochs.log"),
                separator=";",
                append=True),
            TensorBoard(
                log_dir=logdir,
                write_graph=True,
                write_images=False,
                update_freq="epoch"),
            ModelCheckpoint(
                filepath=checkpoint,
                monitor=monitor,
                save_best_only=True,
                save_weights_only=True,
                verbose=verbose),
            EarlyStopping(
                monitor=monitor,
                min_delta=1e-8,
                patience=20,
                restore_best_weights=True,
                verbose=verbose),
            ReduceLROnPlateau(
                monitor=monitor,
                min_delta=1e-8,
                factor=0.2,
                patience=15,
                verbose=verbose)
        ]


        return callbacks
    def compile(self, learning_rate=None):
        """
        Configures the NN Model for training/predict.

        optimizer: optimizer for training
        """

        # define inputs, outputs and optimizer of the chosen architecture
        outs = self.architecture(self.num_channels, self.locked_layers, self.feature_layers_range, self.feature_layers_num, self.learning_rate)
        inputs, outputs, optimizer = outs

        # create and compile
        self.model = Model(inputs=inputs, outputs=outputs)
        self.model.compile(optimizer=optimizer, loss=self.quad_loss)

    def fit(self,
            x=None,
            y=None,
            batch_size=None,
            epochs=1,
            verbose=1,
            callbacks=None,
            validation_split=0.0,
            validation_data=None,
            shuffle=True,
            class_weight=None,
            sample_weight=None,
            initial_epoch=0,
            steps_per_epoch=None,
            validation_steps=None,
            validation_freq=1,
            max_queue_size=10,
            workers=1,
            use_multiprocessing=False,
            checkpoint=""):
        """
        Model training on data yielded (fit_generator function has support to generator).
        A fit_generator() abstration function of TensorFlow Keras.

        Provide x parameter, a generator of the form: yielding x, y.

        return: A history object
        """

        out = self.model.fit_generator(generator=x,
                           steps_per_epoch=steps_per_epoch,
                           epochs=epochs,
                           validation_data=validation_data,
                           validation_steps=validation_steps,
                           verbose=1,
                           initial_epoch=initial_epoch,
                           callbacks=callbacks)
        return out

    def predict(self,
                x,
                batch_size=None,
                verbose=0,
                steps=1,
                callbacks=None,
                max_queue_size=10,
                workers=1,
                use_multiprocessing=False):
        pass

    @staticmethod
    def quad_loss(y_true, y_pred):
        
        epsilon = 1e-6
        lambda_side_vertex_coord_loss = 1.0
        lambda_side_vertex_code_loss = 1.0
        lambda_inside_score_loss = 4.0
        def smooth_l1_loss(prediction_tensor, target_tensor, weights):
            n_q = tf.reshape(quad_norm(target_tensor), tf.shape(weights))
            diff = prediction_tensor - target_tensor
            abs_diff = tf.abs(diff)
            abs_diff_lt_1 = tf.less(abs_diff, 1)
            pixel_wise_smooth_l1norm = (tf.reduce_sum(
                tf.where(abs_diff_lt_1, 0.5 * tf.square(abs_diff), abs_diff - 0.5),
                axis=-1) / n_q) * weights
            return pixel_wise_smooth_l1norm


        def quad_norm(g_true):
            shape = tf.shape(g_true)
            delta_xy_matrix = tf.reshape(g_true, [-1, 2, 2])
            diff = delta_xy_matrix[:, 0:1, :] - delta_xy_matrix[:, 1:2, :]
            square = tf.square(diff)
            distance = tf.sqrt(tf.reduce_sum(square, axis=-1))
            distance *= 4.0
            distance += epsilon
            return tf.reshape(distance, shape[:-1])
        
        
        # loss for inside_score
        logits = y_pred[:, :, :, :1]
        labels = y_true[:, :, :, :1]
        # balance positive and negative samples in an image
        beta = 1 - tf.reduce_mean(labels)
        # first apply sigmoid activation
        predicts = tf.nn.sigmoid(logits)
        # log +epsilon for stable cal
        inside_score_loss = tf.reduce_mean(
            -1 * (beta * labels * tf.math.log(predicts + epsilon) +
                  (1 - beta) * (1 - labels) * tf.math.log(1 - predicts + epsilon)))
        inside_score_loss *= lambda_inside_score_loss

        # loss for side_vertex_code
        vertex_logits = y_pred[:, :, :, 1:3]
        vertex_labels = y_true[:, :, :, 1:3]
        vertex_beta = 1 - (tf.reduce_mean(y_true[:, :, :, 1:2])
                           / (tf.reduce_mean(labels) + epsilon))
        vertex_predicts = tf.nn.sigmoid(vertex_logits)
        pos = -1 * vertex_beta * vertex_labels * tf.math.log(vertex_predicts +
                                                        epsilon)
        neg = -1 * (1 - vertex_beta) * (1 - vertex_labels) * tf.math.log(
            1 - vertex_predicts + epsilon)
        positive_weights = tf.cast(tf.equal(y_true[:, :, :, 0], 1), tf.float32)
        side_vertex_code_loss = \
            tf.reduce_sum(tf.reduce_sum(pos + neg, axis=-1) * positive_weights) / (
                    tf.reduce_sum(positive_weights) + epsilon)
        side_vertex_code_loss *= lambda_side_vertex_code_loss

        # loss for side_vertex_coord delta
        g_hat = y_pred[:, :, :, 3:]
        g_true = y_true[:, :, :, 3:]
        vertex_weights = tf.cast(tf.equal(y_true[:, :, :, 1], 1), tf.float32)
        pixel_wise_smooth_l1norm = smooth_l1_loss(g_hat, g_true, vertex_weights)
        side_vertex_coord_loss = tf.reduce_sum(pixel_wise_smooth_l1norm) / (
                tf.reduce_sum(vertex_weights) + epsilon)
        side_vertex_coord_loss *= lambda_side_vertex_coord_loss
        
        return inside_score_loss + side_vertex_code_loss + side_vertex_coord_loss



"""
Networks to the text localization
"""

def east(num_channels, locked_layers, feature_layers_range, feature_layers_num, learning_rate):
    input_img = Input(name='input_img',
                               shape=(None, None, num_channels),
                               dtype='float32')
    vgg16 = VGG16(input_tensor=input_img,
                  weights='imagenet',
                  include_top=False)
    if locked_layers:
        # locked first two conv layers
        locked_layers = [vgg16.get_layer('block1_conv1'),
                         vgg16.get_layer('block1_conv2')]
        for layer in locked_layers:
            layer.trainable = False
    f = [vgg16.get_layer('block%d_pool' % i).output
              for i in feature_layers_range]
    f.insert(0, None)
    diff = feature_layers_range[0] - feature_layers_num
    
    def g(i):
        # i+diff in cfg.feature_layers_range
        assert i + diff in feature_layers_range, \
            ('i=%d+diff=%d not in ' % (i, diff)) + \
            str(feature_layers_range)
        if i == feature_layers_num:
            bn = BatchNormalization()(h(i))
            cn = Conv2D(32, 3, activation='relu', padding='same')(bn)
            return cn
        else:
            return UpSampling2D((2, 2))(h(i))

    def h(i):
        # i+diff in cfg.feature_layers_range
        assert i + diff in feature_layers_range, \
            ('i=%d+diff=%d not in ' % (i, diff)) + \
            str(feature_layers_range)
        if i == 1:
            return f[i]
        else:
            concat = Concatenate(axis=-1)([g(i - 1), f[i]])
            bn1 = BatchNormalization()(concat)
            conv_1 = Conv2D(128 // 2 ** (i - 2), 1,
                            activation='relu', padding='same',)(bn1)
            bn2 = BatchNormalization()(conv_1)
            conv_3 = Conv2D(128 // 2 ** (i - 2), 3,
                            activation='relu', padding='same',)(bn2)
            return conv_3

    before_output = g(feature_layers_num)
    inside_score = Conv2D(1, 1, padding='same', name='inside_score'
                          )(before_output)
    side_v_code = Conv2D(2, 1, padding='same', name='side_vertex_code'
                         )(before_output)
    side_v_coord = Conv2D(4, 1, padding='same', name='side_vertex_coord'
                          )(before_output)
    east_detect = Concatenate(axis=-1,
                              name='east_detect')([inside_score,
                                                   side_v_code,
                                                   side_v_coord])
    
    optimizer = Adam(lr=learning_rate, decay=5e-4)
    
    return (input_img, east_detect, optimizer)

## Predict Image

In [0]:
import argparse

import numpy as np
from PIL import Image, ImageDraw
from PIL import ImageQt
from keras.preprocessing import image
from keras.applications.vgg16 import preprocess_input

from google.colab.patches import cv2_imshow


def sigmoid(x):
    """`y = 1 / (1 + exp(-x))`"""
    return 1 / (1 + np.exp(-x))


def cut_text_line(geo, scale_ratio_w, scale_ratio_h, im_array, img_path, s):
    geo /= [scale_ratio_w, scale_ratio_h]
    p_min = np.amin(geo, axis=0)
    p_max = np.amax(geo, axis=0)
    min_xy = p_min.astype(int)
    max_xy = p_max.astype(int) + 2
    sub_im_arr = im_array[min_xy[1]:max_xy[1], min_xy[0]:max_xy[0], :].copy()
    for m in range(min_xy[1], max_xy[1]):
        for n in range(min_xy[0], max_xy[0]):
            if not point_inside_of_quad(n, m, geo, p_min, p_max):
                sub_im_arr[m - min_xy[1], n - min_xy[0], :] = 255
    sub_im = image.array_to_img(sub_im_arr, scale=False)
    
    return binarize(sub_im_arr)
    
    #sub_im.save(img_path + '_subim%d.jpg' % s)


def predict_image(detector, img_path, pixel_threshold, quiet=True,gen=None):
    #im2,out = gen.sample()
    results = []
    
    img = image.load_img(img_path)
    #img = Image.open(img_path)
    #img = image.array_to_img(im2)
    d_wight, d_height = resize_image(img, cfg.max_predict_img_size)
    img = img.resize((d_wight, d_height), Image.NEAREST).convert('RGB')
    img = image.img_to_array(img)
    
    img = preprocess_input(img, mode='tf')

    b, g, r = img[:,:,0],img[:,:,1],img[:,:,2]
    img = np.dstack([r,g,b])
    x = np.expand_dims(img, axis=0)
    print(x.shape)
    y = detector.predict(x)
    #y = out
    y = np.squeeze(y, axis=0)
    print(y.shape)#,y[:,:,0],y[:,:,1],y[:,:,2],y[:,:,3],y[:,:,4],y[:,:,5],y[:,:,6])
    y[:, :, :3] = sigmoid(y[:, :, :3])
    """opt = np.get_printoptions()
    np.set_printoptions(threshold=np.inf)
    #print(np.asarray(y[:,:,1]))
    print(np.asarray(y[:,:,2]))
    np.set_printoptions(**opt)"""
    cond = np.greater_equal(y[:, :, 0],sigmoid(pixel_threshold))
    activation_pixels = np.where(cond)
    quad_scores, quad_after_nms = nms(y, activation_pixels)
    with Image.open(img_path) as im:
        #im = im2
        im = image.array_to_img(img)
        im_array = image.img_to_array(im.convert('RGB'))
        d_wight, d_height = resize_image(im, cfg.max_predict_img_size)
        scale_ratio_w = d_wight / im.width
        scale_ratio_h = d_height / im.height
        im = im.resize((d_wight, d_height), Image.NEAREST).convert('RGB')
        quad_im = im.copy()
        draw = ImageDraw.Draw(im)
        for i, j in zip(activation_pixels[0], activation_pixels[1]):
            px = (j + 0.5) * cfg.pixel_size
            py = (i + 0.5) * cfg.pixel_size
            line_width, line_color = 1, 'red'
            if y[i, j, 1] >= sigmoid(cfg.side_vertex_pixel_threshold):
                if y[i, j, 2] < sigmoid(cfg.trunc_threshold):
                    line_width, line_color = 2, 'yellow'
                elif y[i, j, 2] >= sigmoid(1-cfg.trunc_threshold):
                    line_width, line_color = 2, 'green'
            draw.line([(px - 0.5 * cfg.pixel_size, py - 0.5 * cfg.pixel_size),
                       (px + 0.5 * cfg.pixel_size, py - 0.5 * cfg.pixel_size),
                       (px + 0.5 * cfg.pixel_size, py + 0.5 * cfg.pixel_size),
                       (px - 0.5 * cfg.pixel_size, py + 0.5 * cfg.pixel_size),
                       (px - 0.5 * cfg.pixel_size, py - 0.5 * cfg.pixel_size)],
                      width=line_width, fill=line_color)
        quad_draw = ImageDraw.Draw(quad_im)
        txt_items = []
        for score, geo, s in zip(quad_scores, quad_after_nms,
                                 range(len(quad_scores))):
            if np.amin(score) > 0:
                quad_draw.line([tuple(geo[0]),
                                tuple(geo[1]),
                                tuple(geo[2]),
                                tuple(geo[3]),
                                tuple(geo[0])], width=2, fill='red')
                if cfg.predict_cut_text_line:
                    results.append(cut_text_line(geo, scale_ratio_w, scale_ratio_h, im_array,
                                  img_path, s))
                    
                rescaled_geo = geo / [scale_ratio_w, scale_ratio_h]
                rescaled_geo_list = np.reshape(rescaled_geo, (8,)).tolist()
                txt_item = ','.join(map(str, rescaled_geo_list))
                txt_items.append(txt_item + '\n')
            elif not quiet:
                print('quad invalid with vertex num less then 4.')
        
        return results, image.img_to_array(quad_im.convert('RGB'))
        """if cfg.predict_write2txt and len(txt_items) > 0:
            with open(img_path[:-4] + '.txt', 'w') as f_txt:
                f_txt.writelines(txt_items)"""


def predict_txt(east_detect, img_path, txt_path, pixel_threshold, quiet=False):
    img = image.load_img(img_path)
    d_wight, d_height = resize_image(img, cfg.max_predict_img_size)
    scale_ratio_w = d_wight / img.width
    scale_ratio_h = d_height / img.height
    img = img.resize((d_wight, d_height), Image.NEAREST).convert('RGB')
    img = image.img_to_array(img)
    img = preprocess_input(img, mode='tf')
    x = np.expand_dims(img, axis=0)
    y = east_detect.predict(x)

    y = np.squeeze(y, axis=0)
    y[:, :, :3] = sigmoid(y[:, :, :3])
    cond = np.greater_equal(y[:, :, 0], pixel_threshold)
    activation_pixels = np.where(cond)
    quad_scores, quad_after_nms = nms(y, activation_pixels)

    txt_items = []
    for score, geo in zip(quad_scores, quad_after_nms):
        if np.amin(score) > 0:
            rescaled_geo = geo / [scale_ratio_w, scale_ratio_h]
            rescaled_geo_list = np.reshape(rescaled_geo, (8,)).tolist()
            txt_item = ','.join(map(str, rescaled_geo_list))
            txt_items.append(txt_item + '\n')
        elif not quiet:
            print('quad invalid with vertex num less then 4.')
    if cfg.predict_write2txt and len(txt_items) > 0:
        with open(txt_path, 'w') as f_txt:
            f_txt.writelines(txt_items)


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--path', '-p',
                        default='demo/012.png',
                        help='image path')
    parser.add_argument('--threshold', '-t',
                        default=cfg.pixel_threshold,
                        help='pixel activation threshold')
    return parser.parse_args()

## Preprocessing

In [0]:
def batch_reorder_vertexes(xy_list_array):
    reorder_xy_list_array = np.zeros_like(xy_list_array)
    for xy_list, i in zip(xy_list_array, range(len(xy_list_array))):
        reorder_xy_list_array[i] = reorder_vertexes(xy_list)
    return reorder_xy_list_array


def reorder_vertexes(xy_list):
    reorder_xy_list = np.zeros_like(xy_list)
    # determine the first point with the smallest x,
    # if two has same x, choose that with smallest y,
    ordered = np.argsort(xy_list, axis=0)
    xmin1_index = ordered[0, 0]
    xmin2_index = ordered[1, 0]
    if xy_list[xmin1_index, 0] == xy_list[xmin2_index, 0]:
        if xy_list[xmin1_index, 1] <= xy_list[xmin2_index, 1]:
            reorder_xy_list[0] = xy_list[xmin1_index]
            first_v = xmin1_index
        else:
            reorder_xy_list[0] = xy_list[xmin2_index]
            first_v = xmin2_index
    else:
        reorder_xy_list[0] = xy_list[xmin1_index]
        first_v = xmin1_index
    # connect the first point to others, the third point on the other side of
    # the line with the middle slope
    others = list(range(4))
    others.remove(first_v)
    k = np.zeros((len(others),))
    for index, i in zip(others, range(len(others))):
        k[i] = (xy_list[index, 1] - xy_list[first_v, 1]) \
                    / (xy_list[index, 0] - xy_list[first_v, 0] + cfg.epsilon)
    k_mid = np.argsort(k)[1]
    third_v = others[k_mid]
    reorder_xy_list[2] = xy_list[third_v]
    # determine the second point which on the bigger side of the middle line
    others.remove(third_v)
    b_mid = xy_list[first_v, 1] - k[k_mid] * xy_list[first_v, 0]
    second_v, fourth_v = 0, 0
    for index, i in zip(others, range(len(others))):
        # delta = y - (k * x + b)
        delta_y = xy_list[index, 1] - (k[k_mid] * xy_list[index, 0] + b_mid)
        if delta_y > 0:
            second_v = index
        else:
            fourth_v = index
    reorder_xy_list[1] = xy_list[second_v]
    reorder_xy_list[3] = xy_list[fourth_v]
    # compare slope of 13 and 24, determine the final order
    k13 = k[k_mid]
    k24 = (xy_list[second_v, 1] - xy_list[fourth_v, 1]) / (
                xy_list[second_v, 0] - xy_list[fourth_v, 0] + cfg.epsilon)
    if k13 < k24:
        tmp_x, tmp_y = reorder_xy_list[3, 0], reorder_xy_list[3, 1]
        for i in range(2, -1, -1):
            reorder_xy_list[i + 1] = reorder_xy_list[i]
        reorder_xy_list[0, 0], reorder_xy_list[0, 1] = tmp_x, tmp_y
    return reorder_xy_list


def resize_image(im, max_img_size=cfg.max_train_img_size):
    im_width = np.minimum(im.width, max_img_size)
    if im_width == max_img_size < im.width:
        im_height = int((im_width / im.width) * im.height)
    else:
        im_height = im.height
    o_height = np.minimum(im_height, max_img_size)
    if o_height == max_img_size < im_height:
        o_width = int((o_height / im_height) * im_width)
    else:
        o_width = im_width
    d_wight = o_width - (o_width % 32)
    d_height = o_height - (o_height % 32)
    return d_wight, d_height


def point_inside_of_quad(px, py, quad_xy_list, p_min, p_max):
    if (p_min[0] <= px <= p_max[0]) and (p_min[1] <= py <= p_max[1]):
        xy_list = np.zeros((4, 2))
        xy_list[:3, :] = quad_xy_list[1:4, :] - quad_xy_list[:3, :]
        xy_list[3] = quad_xy_list[0, :] - quad_xy_list[3, :]
        yx_list = np.zeros((4, 2))
        yx_list[:, :] = quad_xy_list[:, -1:-3:-1]
        a = xy_list * ([py, px] - yx_list)
        b = a[:, 0] - a[:, 1]
        if np.amin(b) >= 0 or np.amax(b) <= 0:
            return True
        else:
            return False
    else:
        return False


def point_inside_of_nth_quad(px, py, xy_list, shrink_1, long_edge):
    nth = -1
    vs = [[[0, 0, 3, 3, 0], [1, 1, 2, 2, 1]],
          [[0, 0, 1, 1, 0], [2, 2, 3, 3, 2]]]
    for ith in range(2):
        quad_xy_list = np.concatenate((
            np.reshape(xy_list[vs[long_edge][ith][0]], (1, 2)),
            np.reshape(shrink_1[vs[long_edge][ith][1]], (1, 2)),
            np.reshape(shrink_1[vs[long_edge][ith][2]], (1, 2)),
            np.reshape(xy_list[vs[long_edge][ith][3]], (1, 2))), axis=0)
        p_min = np.amin(quad_xy_list, axis=0)
        p_max = np.amax(quad_xy_list, axis=0)
        if point_inside_of_quad(px, py, quad_xy_list, p_min, p_max):
            if nth == -1:
                nth = ith
            else:
                nth = -1
                break
    return nth


def shrink(xy_list, ratio=cfg.shrink_ratio):
    if ratio == 0.0:
        return xy_list, xy_list
    diff_1to3 = xy_list[:3, :] - xy_list[1:4, :]
    diff_4 = xy_list[3:4, :] - xy_list[0:1, :]
    diff = np.concatenate((diff_1to3, diff_4), axis=0)
    dis = np.sqrt(np.sum(np.square(diff), axis=-1))
    # determine which are long or short edges
    long_edge = int(np.argmax(np.sum(np.reshape(dis, (2, 2)), axis=0)))
    short_edge = 1 - long_edge
    # cal r length array
    r = [np.minimum(dis[i], dis[(i + 1) % 4]) for i in range(4)]
    # cal theta array
    diff_abs = np.abs(diff)
    diff_abs[:, 0] += cfg.epsilon
    theta = np.arctan(diff_abs[:, 1] / diff_abs[:, 0])
    # shrink two long edges
    temp_new_xy_list = np.copy(xy_list)
    shrink_edge(xy_list, temp_new_xy_list, long_edge, r, theta, ratio)
    shrink_edge(xy_list, temp_new_xy_list, long_edge + 2, r, theta, ratio)
    # shrink two short edges
    new_xy_list = np.copy(temp_new_xy_list)
    shrink_edge(temp_new_xy_list, new_xy_list, short_edge, r, theta, ratio)
    shrink_edge(temp_new_xy_list, new_xy_list, short_edge + 2, r, theta, ratio)
    return temp_new_xy_list, new_xy_list, long_edge


def shrink_edge(xy_list, new_xy_list, edge, r, theta, ratio=cfg.shrink_ratio):
    if ratio == 0.0:
        return
    start_point = edge
    end_point = (edge + 1) % 4
    long_start_sign_x = np.sign(
        xy_list[end_point, 0] - xy_list[start_point, 0])
    new_xy_list[start_point, 0] = \
        xy_list[start_point, 0] + \
        long_start_sign_x * ratio * r[start_point] * np.cos(theta[start_point])
    long_start_sign_y = np.sign(
        xy_list[end_point, 1] - xy_list[start_point, 1])
    new_xy_list[start_point, 1] = \
        xy_list[start_point, 1] + \
        long_start_sign_y * ratio * r[start_point] * np.sin(theta[start_point])
    # long edge one, end point
    long_end_sign_x = -1 * long_start_sign_x
    new_xy_list[end_point, 0] = \
        xy_list[end_point, 0] + \
        long_end_sign_x * ratio * r[end_point] * np.cos(theta[start_point])
    long_end_sign_y = -1 * long_start_sign_y
    new_xy_list[end_point, 1] = \
        xy_list[end_point, 1] + \
        long_end_sign_y * ratio * r[end_point] * np.sin(theta[start_point])


arabic = True
def adjust_to_see(img):
    """Rotate and transpose to image visualize (cv2 method or jupyter notebook)"""

    (h, w) = img.shape[:2]
    (cX, cY) = (w // 2, h // 2)

    M = cv2.getRotationMatrix2D((cX, cY), -90, 1.0)
    cos = np.abs(M[0, 0])
    sin = np.abs(M[0, 1])

    nW = int((h * sin) + (w * cos))
    nH = int((h * cos) + (w * sin))

    M[0, 2] += (nW / 2) - cX
    M[1, 2] += (nH / 2) - cY

    img = cv2.warpAffine(img, M, (nW + 1, nH + 1))
    img = cv2.warpAffine(img.transpose(), M, (nW, nH))
    if arabic:
      img = cv2.flip(img,1)
    return img


def augmentation(imgs,
                 rotation_range=0,
                 scale_range=0,
                 height_shift_range=0,
                 width_shift_range=0,
                 dilate_range=1,
                 erode_range=1):
    """Apply variations to a list of images (rotate, width and height shift, scale, erode, dilate)"""

    imgs = imgs.astype(np.float32)
    _, h, w = imgs.shape

    dilate_kernel = np.ones((int(np.random.uniform(1, dilate_range)),), np.uint8)
    erode_kernel = np.ones((int(np.random.uniform(1, erode_range)),), np.uint8)
    height_shift = np.random.uniform(-height_shift_range, height_shift_range)
    rotation = np.random.uniform(-rotation_range, rotation_range)
    scale = np.random.uniform(1 - scale_range, 1)
    width_shift = np.random.uniform(-width_shift_range, width_shift_range)

    trans_map = np.float32([[1, 0, width_shift * w], [0, 1, height_shift * h]])
    rot_map = cv2.getRotationMatrix2D((w // 2, h // 2), rotation, scale)

    trans_map_aff = np.r_[trans_map, [[0, 0, 1]]]
    rot_map_aff = np.r_[rot_map, [[0, 0, 1]]]
    affine_mat = rot_map_aff.dot(trans_map_aff)[:2, :]

    for i in range(len(imgs)):
        imgs[i] = cv2.warpAffine(imgs[i], affine_mat, (w, h), flags=cv2.INTER_NEAREST, borderValue=255)
        imgs[i] = cv2.erode(imgs[i], erode_kernel, iterations=1)
        imgs[i] = cv2.dilate(imgs[i], dilate_kernel, iterations=1)

    return imgs


def normalization(imgs):
    """Normalize list of images"""

    imgs = np.asarray(imgs).astype(np.float32)
    _, h, w = imgs.shape

    for i in range(len(imgs)):
        m, s = cv2.meanStdDev(imgs[i])
        imgs[i] = imgs[i] - m[0][0]
        imgs[i] = imgs[i] / s[0][0] if s[0][0] > 0 else imgs[i]
    
    return np.expand_dims(imgs, axis=-1)

def preprocess(img, input_size):
    """Make the process with the `input_size` to the scale resize"""
    
    if isinstance(img, str):
        img = cv2.imread(img, cv2.IMREAD_GRAYSCALE)

    if isinstance(img, tuple):
        image, boundbox = img
        img = cv2.imread(image, cv2.IMREAD_GRAYSCALE)

        for i in range(len(boundbox)):
            if isinstance(boundbox[i], float):
                total = len(img) if i < 2 else len(img[0])
                boundbox[i] = int(total * boundbox[i])

        img = np.asarray(img[boundbox[0]:boundbox[1], boundbox[2]:boundbox[3]], dtype=np.uint8)
    img=cv2.cvtColor(img,cv2.COLOR_BGR2GRAY).astype('uint8')
    wt, ht, _ = input_size
    h, w = np.asarray(img).shape[:2]
    f = max((w / wt), (h / ht))

    new_size = (max(min(wt, int(w / f)), 1), max(min(ht, int(h / f)), 1))
    img = cv2.resize(img, new_size)
    
    _, binary = cv2.threshold(img, 254, 255, cv2.THRESH_BINARY)

    # Not necessary for the KHATTX dataset, uncomment for IAM or RIMES.
    
    #if np.sum(img) * 0.8 > np.sum(binary):
    #    img = illumination_compensation(img)

    #img = remove_cursive_style(img)

    target = np.ones([ht, wt], dtype=np.uint8) * 255
    target[0:new_size[1], 0:new_size[0]] = img
    img = cv2.transpose(target)
    img = cv2.flip(img,0)
    return img



## NMS

In [0]:
# coding=utf-8
import numpy as np


def should_merge(region, i, j):
    neighbor = {(i, j - 1)}
    return not region.isdisjoint(neighbor)


def region_neighbor(region_set):
    region_pixels = np.array(list(region_set))
    j_min = np.amin(region_pixels, axis=0)[1] - 1
    j_max = np.amax(region_pixels, axis=0)[1] + 1
    i_m = np.amin(region_pixels, axis=0)[0] + 1
    region_pixels[:, 0] += 1
    neighbor = {(region_pixels[n, 0], region_pixels[n, 1]) for n in
                range(len(region_pixels))}
    neighbor.add((i_m, j_min))
    neighbor.add((i_m, j_max))
    return neighbor


def region_group(region_list):
    S = [i for i in range(len(region_list))]
    D = []
    while len(S) > 0:
        m = S.pop(0)
        if len(S) == 0:
            # S has only one element, put it to D
            D.append([m])
        else:
            D.append(rec_region_merge(region_list, m, S))
    return D


def rec_region_merge(region_list, m, S):
    rows = [m]
    tmp = []
    for n in S:
        if not region_neighbor(region_list[m]).isdisjoint(region_list[n]) or \
                not region_neighbor(region_list[n]).isdisjoint(region_list[m]):
            tmp.append(n)
    for d in tmp:
        S.remove(d)
    for e in tmp:
        rows.extend(rec_region_merge(region_list, e, S))
    return rows


def nms(predict, activation_pixels, threshold=cfg.side_vertex_pixel_threshold):
    region_list = []
    for i, j in zip(activation_pixels[0], activation_pixels[1]):
        merge = False
        for k in range(len(region_list)):
            if should_merge(region_list[k], i, j):
                region_list[k].add((i, j))
                merge = True
        if not merge:
            region_list.append({(i, j)})
    D = region_group(region_list)
    quad_list = np.zeros((len(D), 4, 2))
    score_list = np.zeros((len(D), 4))
    for group, g_th in zip(D, range(len(D))):
        total_score = np.zeros((4, 2))
        for row in group:
            for ij in region_list[row]:
                score = predict[ij[0], ij[1], 1]
                if score >= sigmoid(threshold):
                    ith_score = predict[ij[0], ij[1], 2:3]
                    if not (sigmoid(cfg.trunc_threshold) <= ith_score < 
                            sigmoid(1-cfg.trunc_threshold)):
                        ith = int(np.around(ith_score))
                        total_score[ith * 2:(ith + 1) * 2] += score
                        px = (ij[1] + 0.5) * cfg.pixel_size
                        py = (ij[0] + 0.5) * cfg.pixel_size
                        p_v = [px, py] + np.reshape(predict[ij[0], ij[1], 3:7],
                                              (2, 2))
                        quad_list[g_th, ith * 2:(ith + 1) * 2] += score * p_v
                    
                    #print(sigmoid(cfg.trunc_threshold), ith_score, str(sigmoid(1-cfg.trunc_threshold)))
        score_list[g_th] = total_score[:, 0]
        quad_list[g_th] /= (total_score + cfg.epsilon)
    return score_list, quad_list


## Binarizer

In [0]:

def binarize (img_path):
    Use_MedianFilter = True
    # Getting the un-binarized image.
    #raw_image=cv2.imread(img_path,0)
    raw_image = img_path
    if(len(raw_image.shape)!=2):
        grayScale=cv2.cvtColor(raw_image,cv2.COLOR_BGR2GRAY).astype('uint8')
    else:
        grayScale=raw_image.astype('uint8')

    #Getting the height and width of the Image.
    m,n = grayScale.shape
    #cv2_imshow(grayScale)
    #print(grayScale.shape)
    high_thresh, thresh_im = cv2.threshold(grayScale, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    lowThresh = 0.5*high_thresh
    
    #Applying Canny Edge Detector to the Image.
    canny_Image = cv2.Canny(grayScale,350,700)
   
    kernel = np.ones((3,3),np.uint8)
    # cv2.imshow('Canny',canny_Image)
    # cv2.waitKey(0)
    
    #Filling the Blobs Using the imFill Function.
    closing = imFill(canny_Image)
    #cv2.imshow('imfill image',closing) 
    #cv2.waitKey(0)

    #Finding Index Values in Image Where Pixels are White.  
    inds = np.where(closing==255)

    #Finding Index Values in Image Where Pixels are Black.  
    inds2 = np.where(closing==0)

    #Comparing  with original Image to obtain the pixel values for text.  
    pix_val = grayScale[inds]

    #Comparing  with original Image to obtain the pixel values for back.  
    pix_val_back = grayScale[inds2]
    
    #Taking Median of Text and back and then comparing them.
    median_text=np.median(pix_val,axis=0)
    median_back=np.median(pix_val_back,axis=0)
    if median_text>median_back:
        pre_binarize = cv2.bitwise_not(grayScale)
    else:
        pre_binarize=grayScale
 
    #Sharpen the Image.
    sharpen_kernel = np.array([[0,-0.9,0], [-1,4.9,-1], [0,-1,-0]])
    new_img = cv2.filter2D(pre_binarize, -1, sharpen_kernel)

    #Padding the image to remove unecessary lines. 
    #new_img = np.pad(pre_binarize,(9,9),"constant",constant_values=(255,255))

    img= apply_threshold(new_img,thresholder(new_img))

    if Use_MedianFilter:
        img = cv2.medianBlur(img,3)
  
    #name = os.path.basename(img_path)
    #img = cv2.cvtColor(img,cv2.COLOR_GRAY2RGB)

    wt, ht, _ = (1024,64,0)
    h, w = np.asarray(img).shape[:2]
    f = max((w / wt), (h / ht))

    new_size = (max(min(wt, int(w / f)), 1), max(min(ht, int(h / f)), 1))
    img = cv2.resize(img, new_size)

    target = np.ones([ht, wt], dtype=np.uint8) * 255
    target[0:new_size[1], 0:new_size[0]] = img
    img = cv2.transpose(target)
    img = cv2.flip(img,0)
    
    #np.expand_dims(img,-1)
    #img = cv2.cvtColor(img,cv2.COLOR_GRAY2RGB)
    return img

def thresholder(img, w_size=15,k=0.5):
    # Obtaining rows and cols
    rows, cols = img.shape
    i_rows, i_cols = rows + 1, cols + 1

    # Computing integral images
    integ = np.zeros((i_rows, i_cols), np.float)
    sqr_integral = np.zeros((i_rows, i_cols), np.float)

    integ[1:, 1:] = np.cumsum(np.cumsum(img.astype(np.float), axis=0), axis=1)
    sqr_img = np.square(img.astype(np.float))
    sqr_integral[1:, 1:] = np.cumsum(np.cumsum(sqr_img, axis=0), axis=1)

    # Defining grid
    x, y = np.meshgrid(np.arange(1, i_cols), np.arange(1, i_rows))

    # Obtaining local coordinates
    hw_size = w_size // 2
    x1 = (x - hw_size).clip(1, cols)
    x2 = (x + hw_size).clip(1, cols)
    y1 = (y - hw_size).clip(1, rows)
    y2 = (y + hw_size).clip(1, rows)

    # Obtaining local areas size
    l_size = (y2 - y1 + 1) * (x2 - x1 + 1)

    # Computing sums
    sums = (integ[y2, x2] - integ[y2, x1 - 1] -
            integ[y1 - 1, x2] + integ[y1 - 1, x1 - 1])
    sqr_sums = (sqr_integral[y2, x2] - sqr_integral[y2, x1 - 1] -
                sqr_integral[y1 - 1, x2] + sqr_integral[y1 - 1, x1 - 1])

    # Computing local means
    means = sums / l_size

    # Computing local standard deviation
    stds = np.sqrt(sqr_sums / l_size - np.square(means))

    # Computing min and max values
    max_std = np.max(stds)
    min_v = np.min(img)

    # Computing thresholds
    thresholds = ((1.0 - k) * means + k * min_v + k * stds /
                  max_std * (means - min_v))

    return thresholds

def imFill(im_in):
    
    # Threshold.
    # Set values equal to or above 220 to 0.
    # Set values below 220 to 255.
    
    # th, im_th = cv2.threshold(im_in, 220, 255, cv2.THRESH_BINARY_INV);
    th, im_th = cv2.threshold(im_in, 0, 128, cv2.THRESH_BINARY)

    # Copy the thresholded image.
    im_floodfill = im_th.copy()
    
    # Mask used to flood filling.
    # Notice the size needs to be 2 pixels than the image.
    h, w = im_th.shape[:2]
    mask = np.zeros((h+2, w+2), np.uint8)
    
    # Floodfill from point (0, 0)
    cv2.floodFill(im_floodfill, mask, (0,0), 255)
    
    # Invert floodfilled image
    im_floodfill_inv = cv2.bitwise_not(im_floodfill)

    # Combine the two images to get the foreground.
    im_out = im_th | im_floodfill_inv

    return im_out   
def apply_threshold(img, threshold=128, wp_val=255):
    #wp_val is the white pixel value.
    return ((img >= threshold) * wp_val).astype(np.uint8)
   


# Recognition

## Layers

In [0]:
"""
Gated implementations
    GatedConv2D: Introduce a Conv2D layer (same number of filters) to multiply with its sigmoid activation.
    FullGatedConv2D: Introduce a Conv2D to extract features (linear and sigmoid), making a full gated process.
                     This process will double number of filters to make one convolutional process.
"""

from tensorflow.keras import backend as K
from tensorflow.keras.layers import Layer, Conv2D, Multiply, Activation


class GatedConv2D(Conv2D):
    """Gated Convolutional Class"""

    def __init__(self, **kwargs):
        super(GatedConv2D, self).__init__(**kwargs)

    def call(self, inputs):
        """Apply gated convolution"""

        output = super(GatedConv2D, self).call(inputs)
        linear = Activation("linear")(inputs)
        sigmoid = Activation("sigmoid")(output)

        return Multiply()([linear, sigmoid])

    def get_config(self):
        """Return the config of the layer"""

        config = super(GatedConv2D, self).get_config()
        return config


"""
Tensorflow Keras layer implementation of the gated convolution.
    Args:
        filters (int): Number of output filters.
        kwargs: Other Conv2D keyword arguments.
"""


class FullGatedConv2D(Conv2D):
    """Gated Convolutional Class"""

    def __init__(self, filters, **kwargs):
        super(FullGatedConv2D, self).__init__(filters=filters * 2, **kwargs)
        self.nb_filters = filters

    def call(self, inputs):
        """Apply gated convolution"""

        output = super(FullGatedConv2D, self).call(inputs)
        linear = Activation("linear")(output[:, :, :, :self.nb_filters])
        sigmoid = Activation("sigmoid")(output[:, :, :, self.nb_filters:])

        return Multiply()([linear, sigmoid])

    def compute_output_shape(self, input_shape):
        """Compute shape of layer output"""

        output_shape = super(FullGatedConv2D, self).compute_output_shape(input_shape)
        return tuple(output_shape[:3]) + (self.nb_filters,)

    def get_config(self):
        """Return the config of the layer"""

        config = super(FullGatedConv2D, self).get_config()
        config['nb_filters'] = self.nb_filters
        del config['filters']
        return config

## Model

In [0]:
"""Handwritten Text Recognition Neural Network"""

import os
import numpy as np
import tensorflow as tf

from contextlib import redirect_stdout
from tensorflow.keras import backend as K
from tensorflow.keras import Model

from tensorflow.keras.callbacks import CSVLogger, TensorBoard, ModelCheckpoint
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.constraints import MaxNorm

from tensorflow.keras.layers import Conv2D, Bidirectional, LSTM, GRU, Dense
from tensorflow.keras.layers import Dropout, BatchNormalization, LeakyReLU, PReLU
from tensorflow.keras.layers import Input, Add, Activation, Lambda, MaxPooling2D, Reshape


"""
NNModel Class
The NNModel class use Tensorflow 2 Keras module for the use of the
Connectionist Temporal Classification (CTC) with the Hadwritten Text Recognition.

x is the input features and y the labels.
"""


class NNModel_rec:

    def __init__(self,
                 architecture,
                 input_size,
                 vocab_size,
                 greedy=False,
                 beam_width=10,
                 top_paths=1):
        """
        Initialization of a NN Model.

        parameters:
            architecture: option of the architecture model to build and compile
            greedy, beam_width, top_paths: Parameters of the CTC decoding
        """

        self.architecture = globals()[architecture]
        self.input_size = input_size
        self.vocab_size = vocab_size

        self.model = None
        self.greedy = greedy
        self.beam_width = beam_width
        self.top_paths = max(1, top_paths)
    def summary(self, output=None, target=None):
        """Show/Save model structure (summary)"""

        self.model.summary()

        if target is not None:
            os.makedirs(output, exist_ok=True)

            with open(os.path.join(output, target), "w") as f:
                with redirect_stdout(f):
                    self.model.summary()

    def load_checkpoint(self, target):
        """ Load a model with checkpoint file"""

        if os.path.isfile(target):
            if self.model is None:
                self.compile()

            self.model.load_weights(target)

    def get_callbacks(self, logdir, checkpoint, monitor="val_loss", verbose=0):
        """Setup the list of callbacks for the model"""

        callbacks = [
            CSVLogger(
                filename=os.path.join(logdir, "epochs.log"),
                separator=";",
                append=True),
            TensorBoard(
                log_dir=logdir,
                histogram_freq=10,
                profile_batch=0,
                write_graph=True,
                write_images=False,
                update_freq="epoch"),
            ModelCheckpoint(
                filepath=checkpoint,
                monitor=monitor,
                save_best_only=True,
                save_weights_only=True,
                verbose=verbose),
            EarlyStopping(
                monitor=monitor,
                min_delta=1e-8,
                patience=20, # Change this if the training keeps stopping, happened to me earlier.
                restore_best_weights=True,
                verbose=verbose),
            ReduceLROnPlateau(
                monitor=monitor,
                min_delta=1e-8,
                factor=0.2,
                patience=15,
                verbose=verbose)
        ]

        return callbacks

    def compile(self, learning_rate=None):
        """
        Configures the NN Model for training/predict.

        optimizer: optimizer for training
        """

        # define inputs, outputs and optimizer of the chosen architecture
        outs = self.architecture(self.input_size, self.vocab_size + 1, learning_rate)
        inputs, outputs, optimizer = outs

        # create and compile
        self.model = Model(inputs=inputs, outputs=outputs)
        self.model.compile(optimizer=optimizer, loss=self.ctc_loss_lambda_func)

    def fit(self,
            x=None,
            y=None,
            batch_size=None,
            epochs=1,
            verbose=1,
            callbacks=None,
            validation_split=0.0,
            validation_data=None,
            shuffle=True,
            class_weight=None,
            sample_weight=None,
            initial_epoch=0,
            steps_per_epoch=None,
            validation_steps=None,
            validation_freq=1,
            max_queue_size=10,
            workers=1,
            use_multiprocessing=False,
            **kwargs):
        """
        Model training on data yielded (fit function has support to generator).
        A fit() abstration function of TensorFlow 2.

        Provide x parameter of the form: yielding (x, y, sample_weight).

        return: A history object
        """

        out = self.model.fit(x=x, y=y, batch_size=batch_size, epochs=epochs, verbose=verbose,
                             callbacks=callbacks, validation_split=validation_split,
                             validation_data=validation_data, shuffle=shuffle,
                             class_weight=class_weight, sample_weight=sample_weight,
                             initial_epoch=initial_epoch, steps_per_epoch=steps_per_epoch,
                             validation_steps=validation_steps, validation_freq=validation_freq,
                             max_queue_size=max_queue_size, workers=workers,
                             use_multiprocessing=use_multiprocessing, **kwargs)
        return out

    def predict(self,
                x,
                batch_size=None,
                verbose=0,
                steps=1,
                callbacks=None,
                max_queue_size=10,
                workers=1,
                use_multiprocessing=False,
                ctc_decode=True):
        """
        Model predicting on data yielded (predict function has support to generator).
        A predict() abstration function of TensorFlow 2.

        Provide x parameter of the form: yielding [x].

        returns: raw data on `ctc_decode=False` or CTC decode on `ctc_decode=True` (both with probabilities)
        """

        self.model._make_predict_function()

        if verbose == 1:
            print("Model Predict")

        out = self.model.predict(x=x, batch_size=batch_size, verbose=verbose, steps=steps,
                                 callbacks=callbacks, max_queue_size=max_queue_size,
                                 workers=workers, use_multiprocessing=use_multiprocessing)

        if not ctc_decode:
            return np.log(out.clip(min=1e-8))

        steps_done = 0
        if verbose == 1:
            print("CTC Decode")
            progbar = tf.keras.utils.Progbar(target=steps)

        batch_size = int(np.ceil(len(out) / steps))
        input_length = len(max(out, key=len))

        predicts, probabilities = [], []

        while steps_done < steps:
            index = steps_done * batch_size
            until = index + batch_size

            x_test = np.asarray(out[index:until])
            x_test_len = np.asarray([input_length for _ in range(len(x_test))])

            decode, log = K.ctc_decode(x_test,
                                       x_test_len,
                                       greedy=self.greedy,
                                       beam_width=self.beam_width,
                                       top_paths=self.top_paths)

            probabilities.extend([np.exp(x) for x in log])
            decode = [[[int(p) for p in x if p != -1] for x in y] for y in decode]
            predicts.extend(np.swapaxes(decode, 0, 1))

            steps_done += 1
            if verbose == 1:
                progbar.update(steps_done)

        return (predicts, probabilities)

    @staticmethod
    def ctc_loss_lambda_func(y_true, y_pred):
        """Function for computing the CTC loss"""

        if len(y_true.shape) > 2:
            y_true = tf.squeeze(y_true)

        # y_pred.shape = (batch_size, string_length, alphabet_size_1_hot_encoded)
        # output of every model is softmax
        # so sum across alphabet_size_1_hot_encoded give 1
        #               string_length give string length
        input_length = tf.math.reduce_sum(y_pred, axis=-1, keepdims=False)
        input_length = tf.math.reduce_sum(input_length, axis=-1, keepdims=True)

        # y_true strings are padded with 0
        # so sum of non-zero gives number of characters in this string
        label_length = tf.math.count_nonzero(y_true, axis=-1, keepdims=True, dtype="int64")

        loss = K.ctc_batch_cost(y_true, y_pred, input_length, label_length)

        # average loss across all entries in the batch
        loss = tf.reduce_mean(loss)

        return loss


"""
Networks to the Handwritten Text Recognition Model
"""


def bluche(input_size, d_model, learning_rate):
    """
    Gated Convolucional Recurrent Neural Network by Bluche et al.

    Reference:
        Bluche, T., Messina, R.:
        Gated convolutional recurrent neural networks for multilingual handwriting recognition.
    """

    input_data = Input(name="input", shape=input_size)
    cnn = Reshape((input_size[0] // 2, input_size[1] // 2, input_size[2] * 4))(input_data)

    cnn = Conv2D(filters=8, kernel_size=(3,3), strides=(1,1), padding="same", activation="tanh")(cnn)

    cnn = Conv2D(filters=16, kernel_size=(2,4), strides=(2,4), padding="same", activation="tanh")(cnn)
    cnn = GatedConv2D(filters=16, kernel_size=(3,3), strides=(1,1), padding="same")(cnn)

    cnn = Conv2D(filters=32, kernel_size=(3,3), strides=(1,1), padding="same", activation="tanh")(cnn)
    cnn = GatedConv2D(filters=32, kernel_size=(3,3), strides=(1,1), padding="same")(cnn)

    cnn = Conv2D(filters=64, kernel_size=(2,4), strides=(2,4), padding="same", activation="tanh")(cnn)
    cnn = GatedConv2D(filters=64, kernel_size=(3,3), strides=(1,1), padding="same")(cnn)

    cnn = Conv2D(filters=128, kernel_size=(3,3), strides=(1,1), padding="same", activation="tanh")(cnn)
    cnn = MaxPooling2D(pool_size=(1,4), strides=(1,4), padding="valid")(cnn)

    shape = cnn.get_shape()
    blstm = Reshape((shape[1], shape[2] * shape[3]))(cnn)

    blstm = Bidirectional(LSTM(units=128, return_sequences=True))(blstm)
    blstm = Dense(units=128, activation="tanh")(blstm)

    blstm = Bidirectional(LSTM(units=128, return_sequences=True))(blstm)
    output_data = Dense(units=d_model, activation="softmax")(blstm)

    if learning_rate is None:
        learning_rate = 4e-4

    optimizer = tf.keras.optimizers.RMSprop(learning_rate=learning_rate)

    return (input_data, output_data, optimizer)


def puigcerver(input_size, d_model, learning_rate):
    """
    Convolutional Recurrent Neural Network by Puigcerver et al.

    Reference:
        Joan Puigcerver.
        Are multidimensional recurrent layers really necessary for handwritten text recognition?
    """

    input_data = Input(name="input", shape=input_size)

    cnn = Conv2D(filters=16, kernel_size=(3,3), strides=(1,1), padding="same")(input_data)
    cnn = BatchNormalization()(cnn)
    cnn = LeakyReLU(alpha=0.01)(cnn)
    cnn = MaxPooling2D(pool_size=(2,2), strides=(2,2), padding="valid")(cnn)

    cnn = Conv2D(filters=32, kernel_size=(3,3), strides=(1,1), padding="same")(cnn)
    cnn = BatchNormalization()(cnn)
    cnn = LeakyReLU(alpha=0.01)(cnn)
    cnn = MaxPooling2D(pool_size=(2,2), strides=(2,2), padding="valid")(cnn)

    cnn = Dropout(rate=0.2)(cnn)
    cnn = Conv2D(filters=48, kernel_size=(3,3), strides=(1,1), padding="same")(cnn)
    cnn = BatchNormalization()(cnn)
    cnn = LeakyReLU(alpha=0.01)(cnn)
    cnn = MaxPooling2D(pool_size=(2,2), strides=(2,2), padding="valid")(cnn)

    cnn = Dropout(rate=0.2)(cnn)
    cnn = Conv2D(filters=64, kernel_size=(3,3), strides=(1,1), padding="same")(cnn)
    cnn = BatchNormalization()(cnn)
    cnn = LeakyReLU(alpha=0.01)(cnn)

    cnn = Dropout(rate=0.2)(cnn)
    cnn = Conv2D(filters=80, kernel_size=(3,3), strides=(1,1), padding="same")(cnn)
    cnn = BatchNormalization()(cnn)
    cnn = LeakyReLU(alpha=0.01)(cnn)

    shape = cnn.get_shape()
    blstm = Reshape((shape[1], shape[2] * shape[3]))(cnn)

    blstm = Bidirectional(LSTM(units=256, return_sequences=True, dropout=0.5))(blstm)
    blstm = Bidirectional(LSTM(units=256, return_sequences=True, dropout=0.5))(blstm)
    blstm = Bidirectional(LSTM(units=256, return_sequences=True, dropout=0.5))(blstm)
    blstm = Bidirectional(LSTM(units=256, return_sequences=True, dropout=0.5))(blstm)
    blstm = Bidirectional(LSTM(units=256, return_sequences=True, dropout=0.5))(blstm)

    blstm = Dropout(rate=0.5)(blstm)
    output_data = Dense(units=d_model, activation="softmax")(blstm)

    if learning_rate is None:
        learning_rate = 3e-4

    optimizer = tf.keras.optimizers.RMSprop(learning_rate=learning_rate)

    return (input_data, output_data, optimizer)

def yazeed(input_size, d_model, learning_rate):
    """
    Gated Convolutional Recurrent Neural Network by Yazeed Mohi-Eldeen Sabil.
    Based on the Flor et al architecture, modified some portions to fit the KHATT dataset. 
    """

    input_data = Input(name="input", shape=input_size)

    cnn = Conv2D(filters=16, kernel_size=(3,3), strides=(2,2), padding="same", kernel_initializer="he_uniform")(input_data)
    cnn = PReLU(shared_axes=[1,2])(cnn)
    cnn = BatchNormalization(renorm=True)(cnn)
    cnn = FullGatedConv2D(filters=16, kernel_size=(3,3), padding="same")(cnn)

    cnn = Conv2D(filters=32, kernel_size=(3,3), strides=(1,1), padding="same", kernel_initializer="he_uniform")(cnn)
    cnn = PReLU(shared_axes=[1,2])(cnn)
    cnn = BatchNormalization(renorm=True)(cnn)
    cnn = FullGatedConv2D(filters=32, kernel_size=(3,3), padding="same")(cnn)

    cnn = Conv2D(filters=40, kernel_size=(2,4), strides=(2,4), padding="same", kernel_initializer="he_uniform")(cnn)
    cnn = PReLU(shared_axes=[1,2])(cnn)
    cnn = BatchNormalization(renorm=True)(cnn)
    cnn = FullGatedConv2D(filters=40, kernel_size=(3,3), padding="same", kernel_constraint=MaxNorm(4, [0,1,2]))(cnn)
    cnn = Dropout(rate=0.2)(cnn)

    cnn = Conv2D(filters=48, kernel_size=(3,3), strides=(1,1), padding="same", kernel_initializer="he_uniform")(cnn)
    cnn = PReLU(shared_axes=[1,2])(cnn)
    cnn = BatchNormalization(renorm=True)(cnn)
    cnn = FullGatedConv2D(filters=48, kernel_size=(3,3), padding="same", kernel_constraint=MaxNorm(4, [0,1,2]))(cnn)
    cnn = Dropout(rate=0.2)(cnn)

    cnn = Conv2D(filters=56, kernel_size=(2,4), strides=(2,4), padding="same", kernel_initializer="he_uniform")(cnn)
    cnn = PReLU(shared_axes=[1,2])(cnn)
    cnn = BatchNormalization(renorm=True)(cnn)
    cnn = FullGatedConv2D(filters=56, kernel_size=(3,3), padding="same", kernel_constraint=MaxNorm(4, [0,1,2]))(cnn)
    cnn = Dropout(rate=0.2)(cnn)

    cnn = Conv2D(filters=64, kernel_size=(3,3), strides=(1,1), padding="same", kernel_initializer="he_uniform")(cnn)
    cnn = PReLU(shared_axes=[1,2])(cnn)
    cnn = BatchNormalization(renorm=True)(cnn)

    cnn = MaxPooling2D(pool_size=(1,2), strides=(1,2), padding="valid")(cnn)

    shape = cnn.get_shape()
    nb_units = shape[2] * shape[3]

    bgru = Reshape((shape[1], nb_units))(cnn)

    bgru = Bidirectional(GRU(units=nb_units, return_sequences=True, dropout=0.5))(bgru)
    bgru = Dense(units=nb_units * 2)(bgru)

    bgru = Bidirectional(GRU(units=nb_units, return_sequences=True, dropout=0.5))(bgru)
    output_data = Dense(units=d_model, activation="softmax")(bgru)

    if learning_rate is None:
        learning_rate = 5e-4

    optimizer = tf.keras.optimizers.RMSprop(learning_rate=learning_rate)

    return (input_data, output_data, optimizer)

## Predict Text

In [0]:
def predict_text(model,
              x,
              batch_size=None,
                verbose=0,
                steps=1,
                callbacks=None,
                max_queue_size=10,
                workers=1,
                use_multiprocessing=False,
                ctc_decode=True):
      """
      Model predicting on data yielded (predict function has support to generator).
      A predict() abstration function of TensorFlow 2.

      Provide x parameter of the form: yielding [x].

      returns: raw data on `ctc_decode=False` or CTC decode on `ctc_decode=True` (both with probabilities)
      """
      x = normalization(x)
      model._make_predict_function()

      
      print("Model Predict")
      out = model.predict(x=x, batch_size=batch_size, verbose=verbose, steps=steps,
                                callbacks=callbacks, max_queue_size=max_queue_size,
                                workers=workers, use_multiprocessing=use_multiprocessing)

      if not ctc_decode:
          return np.log(out.clip(min=1e-8))

      steps_done = 0
      if verbose == 1:
          print("CTC Decode")
          progbar = tf.keras.utils.Progbar(target=steps)

      batch_size = int(np.ceil(len(out) / steps))
      input_length = len(max(out, key=len))

      predicts, probabilities = [], []
      while steps_done < steps:
          index = steps_done * batch_size
          until = index + batch_size

          x_test = np.asarray(out[index:until])
          x_test_len = np.asarray([input_length for _ in range(len(x_test))])

          decode, log = K.ctc_decode(x_test,
                                      x_test_len,
                                      greedy=cfg.greedy,
                                      beam_width=cfg.beam_width,
                                      top_paths=cfg.top_paths)

          probabilities.extend([np.exp(x) for x in log])
          decode = [[[int(p) for p in x if p != -1] for x in y] for y in decode]
          predicts.extend(np.swapaxes(decode, 0, 1))

          steps_done += 1
          if verbose == 1:
              progbar.update(steps_done)
      return (predicts, probabilities)

## Tokenizer

In [0]:
import re
import cv2
import html
import string
import numpy as np

class Tokenizer():
    """Manages tokens functions and charset/dictionary properties"""

    def __init__(self, chars, max_text_length=128):
        self.PAD_TK, self.UNK_TK = "¶", "¤" # PAD = Blank white space before and after text, UNK = Unknown character
        self.chars = (self.PAD_TK + self.UNK_TK + chars)

        self.PAD = self.chars.find(self.PAD_TK)
        self.UNK = self.chars.find(self.UNK_TK)

        self.vocab_size = len(self.chars)
        self.maxlen = max_text_length
    def encode(self, text):
        """Encode text to vector"""

        #text = unicodedata.normalize("NFKD", text).encode("ASCII", "ignore").decode("ASCII")
        text = " ".join(text.split())
        encoded = []
        for item in text:
            index = self.chars.find(item)
            index = self.UNK if index == -1 else index
            encoded.append(index)
        
        return np.asarray(encoded)

    def decode(self, text):
        """Decode vector to text"""

        decoded = "".join([self.chars[int(x)] for x in text if x > -1])
        decoded = self.remove_tokens(decoded)
        decoded = text_standardize(decoded)

        return decoded

    def remove_tokens(self, text):
        """Remove tokens (PAD) from text"""

        return text.replace(self.PAD_TK, "")

RE_DASH_FILTER = re.compile(r'[\-\˗\֊\‐\‑\‒\–\—\⁻\₋\−\﹣\－]', re.UNICODE)
RE_APOSTROPHE_FILTER = re.compile(r'&#39;|[ʼ՚＇‘’‛❛❜ߴߵ`‵´ˊˋ{}{}{}{}{}{}{}{}{}]'.format(
    chr(768), chr(769), chr(832), chr(833), chr(2387),
    chr(5151), chr(5152), chr(65344), chr(8242)), re.UNICODE)
RE_RESERVED_CHAR_FILTER = re.compile(r'[¶¤«»]', re.UNICODE)
RE_LEFT_PARENTH_FILTER = re.compile(r'[\(\[\{\⁽\₍\❨\❪\﹙\（]', re.UNICODE)
RE_RIGHT_PARENTH_FILTER = re.compile(r'[\)\]\}\⁾\₎\❩\❫\﹚\）]', re.UNICODE)
RE_BASIC_CLEANER = re.compile(r'[^\w\s{}]'.format(re.escape(string.punctuation)), re.UNICODE)

LEFT_PUNCTUATION_FILTER = """!%&),.:;<=>?@\\]^_`|}~"""
RIGHT_PUNCTUATION_FILTER = """"(/<=>@[\\^_`{|~"""
NORMALIZE_WHITESPACE_REGEX = re.compile(r'[^\S\n]+', re.UNICODE)


def text_standardize(text):
    """Organize/add spaces around punctuation marks"""

    if text is None:
        return ""

    text = html.unescape(text).replace("\\n", "").replace("\\t", "")

    text = RE_RESERVED_CHAR_FILTER.sub("", text)
    text = RE_DASH_FILTER.sub("-", text)
    text = RE_APOSTROPHE_FILTER.sub("'", text)
    text = RE_LEFT_PARENTH_FILTER.sub("(", text)
    text = RE_RIGHT_PARENTH_FILTER.sub(")", text)
    text = RE_BASIC_CLEANER.sub("", text)

    text = text.lstrip(LEFT_PUNCTUATION_FILTER)
    text = text.rstrip(RIGHT_PUNCTUATION_FILTER)
    text = text.translate(str.maketrans({c: " "+str(c)+" " for c in string.punctuation}))
    text = NORMALIZE_WHITESPACE_REGEX.sub(" ", text.strip())

    return text


# Replacement