In [None]:
import re
from pathlib import Path
import os
import json
from typing import List
from glob import glob

from tqdm import tqdm
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import linregress
from skimage.measure import block_reduce
from scipy.signal import find_peaks

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import load_model

In [None]:
np.random.seed(0)
tf.random.set_seed(0)

In [None]:
images_path = Path('/kaggle/input/sroie-single-line-syn-upd/syn_img/syn_img')
labels_path = Path('/kaggle/input/sroie-single-line-syn-upd/text/text')


images = []
labels = []


for p in tqdm(glob('/kaggle/input/sroie-single-line-syn-upd/text/text/*/*.json')):
    p = Path(p)
    subfolder = p.parts[-2]
    stem = p.stem
    
    with open(labels_path / subfolder / f'{stem}.json', 'r') as f:
        label = json.load(f)
    
    for i in range(5):
        image = cv.imread(str(images_path / subfolder / f'{stem}_{i}.jpg'), cv.IMREAD_GRAYSCALE)
        
        images.append(image)
        labels.append(label)

    
print("Number of images found: ", len(images))

In [None]:
for i in range(len(labels)):
    if 'В' in labels[i]:
        labels[i] = labels[i].replace('В', 'B')

In [None]:
characters = sorted(set([c for s in labels for c in s]))

print("Number of unique characters: ", len(characters))
print("Characters present: ", characters)

In [None]:
plt.figure()
plt.axis('off')
plt.title(labels[0])
plt.imshow(images[0], cmap='gray')
plt.show()

In [None]:
len_labels = [len(x) for x in labels]

plt.figure()
plt.hist(len_labels, bins=50)
plt.grid()
plt.show()

In [None]:
# Maximum length of any captcha in the dataset
max_label_len = 32
print('Maximum length of labels:', max_label_len)

for i in range(len(labels)):
    if len(labels[i]) < max_label_len:
        labels[i] += ' ' * (max_label_len - len(labels[i]))
    else:
        labels[i] = labels[i][:max_label_len]

In [None]:
# Mapping characters to integers
char_to_num = layers.experimental.preprocessing.StringLookup(
    vocabulary=characters, mask_token=None
)

# Mapping integers back to original characters
num_to_char = layers.experimental.preprocessing.StringLookup(
    vocabulary=characters, mask_token=None, invert=True
)

In [None]:
def split_data(images, labels, train_size=0.9, shuffle=True):
    # 1. Get the total size of the dataset
    size = len(images)
    # 2. Make an indices array and shuffle it, if required
    indices = np.arange(size)
    if shuffle:
        np.random.shuffle(indices)
    # 3. Get the size of training samples
    train_samples = int(size * train_size)
    # 4. Split data into training and validation sets
    x_train, y_train = images[indices[:train_samples]], labels[indices[:train_samples]]
    x_valid, y_valid = images[indices[train_samples:]], labels[indices[train_samples:]]
    return x_train, x_valid, y_train, y_valid

In [None]:
# Desired image dimensions
img_width = 256
img_height = 32

images = [cv.resize(img, (img_width, img_height)) for img in images]

In [None]:
# Splitting data into training and validation sets
x_train, x_valid, y_train, y_valid = split_data(np.array(images), np.array(labels))

In [None]:
def encode_single_sample(image, label):
    # 1. Convert grayscale image to 3-dimensional tensor
    image = tf.reshape(image, [image.shape[0], image.shape[1], 1])
    # 2. Convert to float32 in [0, 1] range
    image = tf.image.convert_image_dtype(image, tf.float32)
    # 3. Transpose the image because we want the time
    # dimension to correspond to the width of the image.
    image = tf.transpose(image, perm=[1, 0, 2])
    # 4. Map the characters in label to numbers
    label = char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8"))
    # 5. Return a dict as our model is expecting two inputs
    return {"image": image, "label": label}

In [None]:
# Batch size for training and validation
batch_size = 32


#                   Create Database objects

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = (
    train_dataset.map(
        encode_single_sample, num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .batch(batch_size)
    .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
)

validation_dataset = tf.data.Dataset.from_tensor_slices((x_valid, y_valid))
validation_dataset = (
    validation_dataset.map(
        encode_single_sample, num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .batch(batch_size)
    .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
)

In [None]:
#                   Visualize the data

_, ax = plt.subplots(4, 4, figsize=(10, 5))
for batch in train_dataset.take(1):
    imgs = batch["image"]
    lbls = batch["label"]
    for i in range(16):
        img = (imgs[i] * 255).numpy().astype("uint8")
        lbl = tf.strings.reduce_join(num_to_char(lbls[i])).numpy().decode("utf-8")
        ax[i // 4, i % 4].imshow(img[:, :, 0].T, cmap="gray")
        ax[i // 4, i % 4].set_title(lbl, size=6)
        ax[i // 4, i % 4].axis("off")
plt.show()

In [None]:
class CTCLayer(layers.Layer):

    def __init__(self, name=None, **kwargs):
        super(CTCLayer, self).__init__(name=name)
        super(CTCLayer, self).__init__(**kwargs)
        self.loss_fn = keras.backend.ctc_batch_cost

    def get_config(self):
        config = super(CTCLayer, self).get_config()
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

    def call(self, y_true, y_pred):
        # Compute the training-time loss value and add it
        # to the layer using `self.add_loss()`.
        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
        input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
        label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

        input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)

        # At test time, just return the computed predictions
        return y_pred

In [None]:
def build_model():
    # Inputs to the model
    input_img = layers.Input(
        shape=(img_width, img_height, 1), name="image", dtype="float32"
    )
    labels = layers.Input(name="label", shape=max_label_len + 1, dtype="float32")

    # First conv block
    x = layers.Conv2D(
        32,
        (3, 3),
        activation="relu",
        kernel_initializer="he_normal",
        padding="same",
        name="conv1",
    )(input_img)
    x = layers.MaxPooling2D((2, 2), name="pool1")(x)

    # Second conv block
    x = layers.Conv2D(
        64,
        (3, 3),
        activation="relu",
        kernel_initializer="he_normal",
        padding="same",
        name="conv2",
    )(x)
    x = layers.MaxPooling2D((2, 2), name="pool2")(x)

    # We have used two max pool with pool size and strides 2.
    # Hence, downsampled feature maps are 4x smaller. The number of
    # filters in the last layer is 64. Reshape accordingly before
    # passing the output to the RNN part of the model
    new_shape = ((img_width // 4), (img_height // 4) * 64)
    x = layers.Reshape(target_shape=new_shape, name="reshape")(x)
    x = layers.Dense(64, activation="relu", name="dense1")(x)
    x = layers.Dropout(0.2, name="dropout")(x)

    # RNNs
    x = layers.Bidirectional(layers.LSTM(128, return_sequences=True, dropout=0.25), name="bidirectional1")(x)
    x = layers.Bidirectional(layers.LSTM(64, return_sequences=True, dropout=0.25), name="bidirectional2")(x)

    # Output layer
    x = layers.Dense(
        len(char_to_num.get_vocabulary()) + 1, activation="softmax", name="dense2"
    )(x)

    # Add CTC layer for calculating CTC loss at each step
    output = CTCLayer(name="ctc_loss")(labels, x)

    # Define the model
    model = keras.models.Model(
        inputs=[input_img, labels], outputs=output, name="ocr_model_v1"
    )
    # Optimizer
    opt = keras.optimizers.Adam()
    # Compile the model and return
    model.compile(optimizer=opt)
    return model


# Get the model
model = build_model()
model.summary()

In [None]:
#                   Training

epochs = 10
# Train the model
history = model.fit(
    train_dataset,
    epochs=epochs,
    validation_data=validation_dataset,
    verbose=1
)

model.save('sroie-single-line-syn-10.h5')

with open('history-10.json', 'w') as f:
    json.dump(history.history, f, indent=4)