In [None]:
import pandas as pd
import skimage
import matplotlib.pyplot as plt
import numpy as np
import easyocr
import itertools

reader = easyocr.Reader(['en']) # this needs to run only once to load the model into memory


from IPython.display import Image, display

ANNOTATIONS_FILE = "annotations.pkl"
IMAGE_PREFIX = "../data/train/images/"

In [None]:
annotations = pd.read_pickle(ANNOTATIONS_FILE)
annotations = annotations[annotations['chart-type'] == 'vertical_bar']
annotations.head()

In [None]:
from skimage.color import rgb2gray

IMAGE_SIZE = (128, 128)
img_width, img_height = IMAGE_SIZE

def extract_labels(row):
    image = rgb2gray(skimage.io.imread(IMAGE_PREFIX + row['image']))
    tick_labels = [text for text in row['text'] if text['role'] == 'tick_label']
    labels = []
    for tick_label in tick_labels:
        polygon = tick_label['polygon']
        x_min = min(polygon['x0'], polygon['x1'], polygon['x2'], polygon['x3'])
        x_max = max(polygon['x0'], polygon['x1'], polygon['x2'], polygon['x3'])
        y_min = min(polygon['y0'], polygon['y1'], polygon['y2'], polygon['y3'])
        y_max = max(polygon['y0'], polygon['y1'], polygon['y2'], polygon['y3'])
        sub_image = image[y_min:y_max, x_min:x_max]
        output_image = (np.ones(shape=IMAGE_SIZE)*1).astype(float)
        output_image[:len(sub_image),:len(sub_image[0])] = sub_image
        labels.append((output_image, tick_label['text']))
    return labels

examples = list(itertools.chain.from_iterable(annotations.sample(100, random_state=123).apply(extract_labels, 1).values))

print(np.array([example[0].shape for example in examples]).max(axis=0))
len(examples)

In [None]:
for example in examples[:1]:
    plt.imshow(example[0])
    plt.title(example[1])
    plt.show()

In [None]:
train_x, train_y = zip(*examples)

In [None]:
characters = set(char for label in train_y for char in label)
characters = sorted(list(characters))

max_length = max([len(label) for label in train_y])

In [None]:
print("Number of images found: ", len(train_y))
print("Number of labels found: ", len(train_y))
print("Number of unique labels found: ", len(set(train_y)))
print("Number of unique characters: ", len(characters))
print("Characters present: ", characters)
print("Max. label length: ", max_length)

In [None]:
# from: https://keras.io/examples/vision/captcha_ocr/
import tensorflow as tf
from tensorflow import keras
from keras import layers

# Mapping characters to integers
char_to_num = layers.StringLookup(
    vocabulary=list(characters), mask_token=None
)

# Mapping integers back to original characters
num_to_char = layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True
)

class CTCLayer(layers.Layer):
    def __init__(self, name=None):
        super().__init__(name=name)
        self.loss_fn = keras.backend.ctc_batch_cost

    def call(self, y_true, y_pred):
        # Compute the training-time loss value and add it
        # to the layer using `self.add_loss()`.
        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
        input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
        label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

        input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)

        # At test time, just return the computed predictions
        return y_pred


def build_model():
    # Inputs to the model
    input_img = layers.Input(
        shape=(img_width, img_height, 1), name="image", dtype="float32"
    )
    labels = layers.Input(name="label", shape=(None,), dtype="float32")

    # First conv block
    x = layers.Conv2D(
        32,
        (3, 3),
        activation="relu",
        kernel_initializer="he_normal",
        padding="same",
        name="Conv1",
    )(input_img)
    x = layers.MaxPooling2D((2, 2), name="pool1")(x)

    # Second conv block
    x = layers.Conv2D(
        64,
        (3, 3),
        activation="relu",
        kernel_initializer="he_normal",
        padding="same",
        name="Conv2",
    )(x)
    x = layers.MaxPooling2D((2, 2), name="pool2")(x)

    # We have used two max pool with pool size and strides 2.
    # Hence, downsampled feature maps are 4x smaller. The number of
    # filters in the last layer is 64. Reshape accordingly before
    # passing the output to the RNN part of the model
    new_shape = ((img_width // 4), (img_height // 4) * 64)
    x = layers.Reshape(target_shape=new_shape, name="reshape")(x)
    x = layers.Dense(64, activation="relu", name="dense1")(x)
    x = layers.Dropout(0.2)(x)

    # RNNs
    x = layers.Bidirectional(layers.LSTM(128, return_sequences=True, dropout=0.25))(x)
    x = layers.Bidirectional(layers.LSTM(64, return_sequences=True, dropout=0.25))(x)

    # Output layer
    x = layers.Dense(
        len(char_to_num.get_vocabulary()) + 1, activation="softmax", name="dense2"
    )(x)

    # Add CTC layer for calculating CTC loss at each step
    output = CTCLayer(name="ctc_loss")(labels, x)

    # Define the model
    model = keras.models.Model(
        inputs=[input_img, labels], outputs=output, name="ocr_model_v1"
    )
    # Optimizer
    opt = keras.optimizers.Adam()
    # Compile the model and return
    model.compile(optimizer=opt)
    return model


# Get the model
model = build_model()
model.summary()


In [None]:
def encode_text(text: str):
    encoded = char_to_num(tf.strings.unicode_split(text, input_encoding="UTF-8"))
    print(type(text), text)
    padding = tf.zeros(max_length - tf.strings.length(text), tf.int64)
    return tf.concat([encoded, padding], 0)

encode_text("bye")


In [None]:
def encode_single_sample(image, label):
    print(label)
    image = tf.reshape(image, (img_width, img_height, 1))
    image = tf.transpose(image, perm=[1, 0, 2])
    # 6. Map the characters in label to numbers
    label = encode_text(label)
    # 7. Return a dict as our model is expecting two inputs
    return {"image": image, "label": label}

batch_size = 8

train_dataset = tf.data.Dataset.from_tensor_slices((list(train_x), list(train_y)))
train_dataset = (
    train_dataset.map(
        encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE
    )
    .batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

In [None]:
for batch in train_dataset.take(1):
    images = batch["image"]
    # labels = batch["label"]
    #print(labels)

In [None]:
epochs = 100
early_stopping_patience = 10
# Add early stopping
early_stopping = keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=early_stopping_patience, restore_best_weights=True
)

def generator():
    for x, y in zip(train_x, train_y):
        yield {"image": x, "label": encode_text(y)}, 0

# Train the model
history = model.fit(
    train_dataset,
    epochs=epochs,
    callbacks=[early_stopping],
)