In [4]:
!pip install fastwer

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import tensorflow as tf
from tensorflow.keras.layers.experimental.preprocessing import StringLookup
from tensorflow import keras

import matplotlib.pyplot as plt
import os
import glob
import cv2

import fastwer #for calculating CER & WER

In [5]:
np.random.seed(63)
tf.random.set_seed(63)

In [6]:
train=pd.read_csv('../input/handwriting-recognition/written_name_train_v2.csv')
val=pd.read_csv('../input/handwriting-recognition/written_name_validation_v2.csv')
test=pd.read_csv('../input/handwriting-recognition/written_name_test_v2.csv')

train.dropna(inplace=True)
train.reset_index(drop=True, inplace=True)
test.dropna(inplace=True)
test.reset_index(drop=True, inplace=True)
val.dropna(inplace=True)
val.reset_index(drop=True, inplace=True)

In [7]:
train['Length']=train['IDENTITY'].apply(lambda x : len(str(x)))
max_name = 16
min_name = 2
train21=train[train['Length']>max_name]
train=train[train['Length']<=max_name]
train=train[train['Length']>=min_name]
train['IDENTITY']=train['IDENTITY'].str.upper()

val['IDENTITY']=val['IDENTITY'].str.upper()
test['IDENTITY']=test['IDENTITY'].str.upper()

In [8]:
train21

In [9]:
train[train['IDENTITY'] == 'UNREADABLE']

In [10]:
train = train.loc[(train['IDENTITY'] != 'UNREADABLE')]
val = val.loc[(val['IDENTITY'] != 'UNREADABLE')]
#test = train.loc[(train['IDENTITY'] != 'UNREADABLE')]

In [11]:
max_data_train = 100000
max_data_val = 10000
train = train.iloc[:max_data_train]
val = val.iloc[:max_data_val]

In [12]:
print(f"Total training samples: {train.shape[0]}")
print(f"Total validation samples: {val.shape[0]}")

In [13]:
# retrieve dataframe dataset
base_path = '../input/handwriting-recognition'

def get_image_paths_and_labels(df, status):
    paths = []
    corrected_samples = []
    data_path = os.path.join(base_path, status+'_v2', status)
    fn=df['FILENAME'].to_numpy()
    label=df['IDENTITY'].to_numpy()
    
    for i in range (len(fn)):
        img_path = os.path.join(
            data_path, fn[i]
        )
        if os.path.getsize(img_path):
            paths.append(img_path)
            corrected_samples.append(label[i])
            
    return paths, corrected_samples

In [14]:
train_img_paths, train_labels = get_image_paths_and_labels(train, 'train')
val_img_paths, val_labels = get_image_paths_and_labels(val, 'validation')
test_img_paths, test_labels = get_image_paths_and_labels(test, 'test')

In [15]:
# Get character
characters = set()

for label in train_labels:
    for char in label:
        characters.add(char)

characters = sorted(list(characters))
print("Vocab size: ", len(characters))
print(characters)

In [16]:
# Building character vocabulary
AUTOTUNE = tf.data.AUTOTUNE

# Mapping characters to integers.
char_to_num = StringLookup(vocabulary=list(characters), mask_token=None)

# Mapping integers back to original characters.
num_to_char = StringLookup(
    vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True
)

In [17]:
def distortion_free_resize(image, img_size):
    w, h = img_size
    image = tf.image.resize_with_pad(image, h, w)

    # Check the amount of padding needed to be done.
#     pad_height = h - tf.shape(image)[0]
#     pad_width = w - tf.shape(image)[1]

    image = tf.transpose(image, perm=[1, 0, 2])
    image = tf.image.flip_left_right(image)
    return image

In [18]:
batch_size = 64
padding_token = 99
image_width = 256
image_height = 64
max_len = 32


def preprocess_image(image_path, img_size=(image_width, image_height)):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, 1)
    image = distortion_free_resize(image, img_size)
    image = tf.cast(image, tf.float32) / 255.0
    return image


def vectorize_label(label):
    label = char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8"))
    length = tf.shape(label)[0]
    pad_amount = max_len - length
    label = tf.pad(label, paddings=[[0, pad_amount]], constant_values=padding_token)
    return label


def process_images_labels(image_path, label):
    image = preprocess_image(image_path)
    label = vectorize_label(label)
    return {"image": image, "label": label}


def prepare_dataset(image_paths, labels):
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels)).map(
        process_images_labels, num_parallel_calls=AUTOTUNE
    )
    return dataset.batch(batch_size).cache().prefetch(AUTOTUNE)

In [19]:
train_ds = prepare_dataset(train_img_paths, train_labels)
val_ds = prepare_dataset(val_img_paths, val_labels)
test_ds = prepare_dataset(test_img_paths, test_labels)

In [20]:
train_ds

In [21]:
for data in train_ds.take(1):
    images, labels = data["image"], data["label"]

    _, ax = plt.subplots(4, 4, figsize=(15, 8))

    for i in range(16):
        img = images[i]
        img = tf.image.flip_left_right(img)
        img = tf.transpose(img, perm=[1, 0, 2])
        img = (img * 255.0).numpy().clip(0, 255).astype(np.uint8)
        img = img[:, :, 0]

        # Gather indices where label!= padding_token.
        label = labels[i]
        indices = tf.gather(label, tf.where(tf.math.not_equal(label, padding_token)))
        # Convert to string.
        label = tf.strings.reduce_join(num_to_char(indices))
        label = label.numpy().decode("utf-8")

        ax[i // 4, i % 4].imshow(img, cmap="gray")
        ax[i // 4, i % 4].set_title(label)
        ax[i // 4, i % 4].axis("off")


plt.show()

In [22]:
class CTCLayer(keras.layers.Layer):
    def __init__(self, name=None):
        super().__init__(name=name)
        self.loss_fn = keras.backend.ctc_batch_cost

    def call(self, y_true, y_pred):
        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
        input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
        label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

        input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)

        # At test time, just return the computed predictions.
        return y_pred


def build_model():
    # Inputs to the model
    input_img = keras.Input(shape=(image_width, image_height, 1), name="image")
    labels = keras.layers.Input(name="label", shape=(None,))

    # First conv block.
    x = keras.layers.Conv2D(
        32,
        (3, 3),
        activation="relu",
        kernel_initializer="he_normal",
        padding="same",
        name="Conv1",
    )(input_img)
    x = keras.layers.MaxPooling2D((2, 2), name="pool1")(x)
    x = keras.layers.Dropout(0.3)(x)

    # Second conv block.
    x = keras.layers.Conv2D(
        64,
        (3, 3),
        activation="relu",
        kernel_initializer="he_normal",
        padding="same",
        name="Conv2",
    )(x)
    x = keras.layers.MaxPooling2D((2, 2), name="pool2")(x)
    x = keras.layers.Dropout(0.3)(x)

    # We have used two max pool with pool size and strides 2.
    # Hence, downsampled feature maps are 4x smaller. The number of
    # filters in the last layer is 64. Reshape accordingly before
    # passing the output to the RNN part of the model.
    new_shape = ((image_width // 4), (image_height // 4) * 64)
    x = keras.layers.Reshape(target_shape=new_shape, name="reshape")(x)
    x = keras.layers.Dense(64, activation="relu", name="dense1")(x)
    x = keras.layers.Dropout(0.2)(x)

    # RNNs.
    x = keras.layers.Bidirectional(
        keras.layers.LSTM(128, return_sequences=True, dropout=0.2)
    )(x)
    x = keras.layers.Bidirectional(
        keras.layers.LSTM(64, return_sequences=True, dropout=0.25)
    )(x)

    # +2 is to account for the two special tokens introduced by the CTC loss.
    # The recommendation comes here: https://git.io/J0eXP.
    x = keras.layers.Dense(
        len(char_to_num.get_vocabulary()) + 2, activation="softmax", name="dense2"
    )(x)

    # Add CTC layer for calculating CTC loss at each step.
    output = CTCLayer(name="ctc_loss")(labels, x)

    # Define the model.
    model = keras.models.Model(
        inputs=[input_img, labels], outputs=output, name="handwriting_recognizer"
    )
    # Optimizer.
    opt = keras.optimizers.SGD(learning_rate=0.002,
                               decay=1e-6,
                               momentum=0.9,
                               nesterov=True,
                               clipnorm=5)
    # Compile the model and return.
    model.compile(optimizer=opt)
    return model


# Get the model.
model = build_model()
model.summary()

In [23]:
val_images = []
val_labels = []

for batch in val_ds:
    val_images.append(batch["image"])
    val_labels.append(batch["label"])

In [24]:
# val_labels[:10]

In [25]:
def calculate_edit_distance(labels, predictions):
    # Get a single batch and convert its labels to sparse tensors.
    sparse_labels = tf.cast(tf.sparse.from_dense(labels), dtype=tf.int64)

    # Make predictions and convert them to sparse tensors.
    input_len = np.ones(predictions.shape[0]) * predictions.shape[1]
    predictions_decoded = keras.backend.ctc_decode(
        predictions, input_length=input_len, greedy=True
    )[0][0][:, :max_len]
    sparse_predictions = tf.cast(
        tf.sparse.from_dense(predictions_decoded), dtype=tf.int64
    )

    # Compute individual edit distances and average them out.
    edit_distances = tf.edit_distance(
        sparse_predictions, sparse_labels, normalize=False
    )
    return tf.reduce_mean(edit_distances)


class EditDistanceCallback(keras.callbacks.Callback):
    def __init__(self, pred_model):
        super().__init__()
        self.prediction_model = pred_model

    def on_epoch_end(self, epoch, logs=None):
        edit_distances = []

        for i in range(len(val_images)):
            labels = val_labels[i]
            predictions = self.prediction_model.predict(val_images[i])
            edit_distances.append(calculate_edit_distance(labels, predictions).numpy())

        print(
            f"Mean edit distance for epoch {epoch + 1}: {np.mean(edit_distances):.4f}"
        )

In [26]:
# Add early stopping
es = keras.callbacks.EarlyStopping(monitor='val_loss',
                                   patience=5,
                                   restore_best_weights=True)

In [27]:
epochs = 30  # To get good results this should be at least 50.

model = build_model()
prediction_model = keras.models.Model(
    model.get_layer(name="image").input, model.get_layer(name="dense2").output
)
edit_distance_callback = EditDistanceCallback(prediction_model)

# Train the model.
if 'prediction_model_ocr.h5' not in os.listdir('./'):
    history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=epochs,
        callbacks=[edit_distance_callback],
        )

In [41]:
if 'prediction_model_ocr.h5' not in os.listdir('./'):
    prediction_model.save('prediction_model_ocr.h5')
    #prediction_model=M.load_model('model1.h5')

In [29]:
# A utility function to decode the output of the network.
def decode_batch_predictions(pred):
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
    # Use greedy search. For complex tasks, you can use beam search.
    results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0][
        :, :max_len
    ]
    # Iterate over the results and get back the text.
    output_text = []
    for res in results:
        res = tf.gather(res, tf.where(tf.math.not_equal(res, -1)))
        res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8")
        output_text.append(res)
    return output_text


#  Let's check results on some test samples.
for batch in test_ds.take(1):
    batch_images = batch["image"]
    _, ax = plt.subplots(4, 4, figsize=(15, 8))

    preds = prediction_model.predict(batch_images)
    pred_texts = decode_batch_predictions(preds)

    for i in range(16):
        img = batch_images[i]
        img = tf.image.flip_left_right(img)
        img = tf.transpose(img, perm=[1, 0, 2])
        img = (img * 255.0).numpy().clip(0, 255).astype(np.uint8)
        img = img[:, :, 0]

        title = f"Prediction: {pred_texts[i]}"
        ax[i // 4, i % 4].imshow(img, cmap="gray")
        ax[i // 4, i % 4].set_title(title)
        ax[i // 4, i % 4].axis("off")

plt.show()

In [35]:
train.reset_index(drop=True, inplace=True)
test.reset_index(drop=True, inplace=True)
val.reset_index(drop=True, inplace=True)

In [36]:
train["PREDICTION"] = ""
prediksi = []
start = 0
finish = batch_size-1
number_of_batches = 100
for batch in train_ds.take(number_of_batches):
    batch_images = batch["image"]
    preds = prediction_model.predict(batch_images)
    pred_texts = decode_batch_predictions(preds)
    train.loc[start:finish,"PREDICTION"] = pred_texts
    start = start + len(pred_texts)
    finish = finish + len(pred_texts)
train_output = train[:batch_size*number_of_batches]
train_output["CER"] = train_output.apply(lambda x: fastwer.score_sent(x["PREDICTION"], x["IDENTITY"], char_level=True), axis = 1)
train_output["WER"] = train_output.apply(lambda x: fastwer.score_sent(x["PREDICTION"], x["IDENTITY"], char_level=False), axis = 1)
display(train_output)
print("CER_test_average: ", train_output["CER"].mean())
print("WER_test_average: ", train_output["WER"].mean())

In [39]:
val["PREDICTION"] = ""
prediksi = []
start = 0
finish = batch_size-1
number_of_batches = 50
i = 0
for batch in val_ds.take(number_of_batches):
    finish = finish
    batch_images = batch["image"]
    preds = prediction_model.predict(batch_images)
    pred_texts = decode_batch_predictions(preds)
    val.loc[start:finish,"PREDICTION"] = pred_texts
    start = start + len(pred_texts)
    finish = finish + len(pred_texts)
val_output = val[:batch_size*number_of_batches]
val_output["CER"] = val_output.apply(lambda x: fastwer.score_sent(x["PREDICTION"], x["IDENTITY"], char_level=True), axis = 1)
val_output["WER"] = val_output.apply(lambda x: fastwer.score_sent(x["PREDICTION"], x["IDENTITY"], char_level=False), axis = 1)
display(val_output)
print("CER_test_average: ", val_output["CER"].mean())
print("WER_test_average: ", val_output["WER"].mean())

In [40]:
test["PREDICTION"] = ""
prediksi = []
start = 0
finish = batch_size-1
number_of_batches = 50
i = 0
for batch in test_ds.take(number_of_batches):
    finish = finish
    batch_images = batch["image"]
    preds = prediction_model.predict(batch_images)
    pred_texts = decode_batch_predictions(preds)
    test.loc[start:finish,"PREDICTION"] = pred_texts
    start = start + len(pred_texts)
    finish = finish + len(pred_texts)
test_output = test[:batch_size*number_of_batches]
test_output["CER"] = test_output.apply(lambda x: fastwer.score_sent(x["PREDICTION"], x["IDENTITY"], char_level=True), axis = 1)
test_output["WER"] = test_output.apply(lambda x: fastwer.score_sent(x["PREDICTION"], x["IDENTITY"], char_level=False), axis = 1)
display(test_output)
print("CER_test_average: ", test_output["CER"].mean())
print("WER_test_average: ", test_output["WER"].mean())