In [1]:
import os

import cv2
import numpy as np
import string
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow.keras.backend as K

from tensorflow import keras
from tensorflow.keras import layers

from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Dense, Reshape, BatchNormalization, Input, Conv2D, MaxPool2D, Lambda, Bidirectional, LSTM
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import *
from tensorflow.keras.utils import to_categorical, Sequence
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tqdm import tqdm
from collections import Counter
from PIL import Image
from itertools import groupby

In [2]:
image_paths = []
image_texts = []

data_folder = "mjsynth_sample"

for path in os.listdir(data_folder):
    image_paths.append(data_folder + "/" + path)
    image_texts.append(path.split("_")[1])

In [3]:
### get vocabulary for the current dataset
vocab = set("".join(map(str, image_texts)))
char_list = sorted(vocab)

In [4]:
max_label_len = max([len(str(text)) for text in image_texts])

In [50]:
print(max_label_len)

23


In [5]:
def encode_to_labels(txt):
    # encoding each output word into digits
    dig_lst = []
    
    for index, char in enumerate(txt):
        try:
            dig_lst.append(char_list.index(char))
        except:
            print(char)
    
    return pad_sequences([dig_lst], maxlen=max_label_len, padding='post', value=len(char_list))[0]
    

In [6]:
padded_image_texts = list(map(encode_to_labels, image_texts))

In [7]:
train_image_paths = image_paths[ : int(len(image_paths) * 0.90)]
train_image_texts = padded_image_texts[ : int(len(image_texts) * 0.90)]

val_image_paths = image_paths[int(len(image_paths) * 0.90) : ]
val_image_texts = padded_image_texts[int(len(image_texts) * 0.90) : ]

In [8]:
def process_single_sample(img_path, label):

    # 1. Read image
    img = tf.io.read_file(img_path)

    # 2. Decode and convert to grayscale
    img = tf.io.decode_png(img, channels=1)

    #img2 = cv2.imread() corresponding img2 = tf.io.read_file() and img2 = tf.io.decode_png(img2, channels=3)
    
    # 3. Convert to float32 in [0, 1] range
    img = tf.image.convert_image_dtype(img, tf.float32)

    # 4. Resize to the desired size
    img = tf.image.resize(img, [32, 128])

    return {"image": img, "label": label}

In [9]:
batch_size = 256

train_dataset = tf.data.Dataset.from_tensor_slices((train_image_paths, train_image_texts))

train_dataset = (
    train_dataset.map(
        process_single_sample, num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .batch(batch_size)
    .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
)

validation_dataset = tf.data.Dataset.from_tensor_slices((val_image_paths, val_image_texts))
validation_dataset = (
    validation_dataset.map(
        process_single_sample, num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .batch(batch_size)
    .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
)

In [10]:
# Mapping characters to integers
char_to_num = layers.experimental.preprocessing.StringLookup(
    vocabulary=char_list, num_oov_indices=0, mask_token=None
)

# Mapping integers back to original character
num_to_char = layers.experimental.preprocessing.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True
)

In [11]:
## Ref: https://keras.io/examples/vision/captcha_ocr/

class CTCLayer(layers.Layer):

    def __init__(self, name=None):

        super().__init__(name=name)
        self.loss_fn = keras.backend.ctc_batch_cost

    def call(self, y_true, y_pred):
        # Compute the training-time loss value and add it
        # to the layer using `self.add_loss()`.

        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
        input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
        label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

        input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)

        # At test time, just return the computed predictions
        return y_pred

In [12]:
def ctc_decoder(predictions):
    '''
    input: given batch of predictions from text rec model
    output: return lists of raw extracted text

    '''
    text_list = []
    
    pred_indcies = np.argmax(predictions, axis=2)
    
    for i in range(pred_indcies.shape[0]):
        ans = ""
        
        ## merge repeats
        merged_list = [k for k,_ in groupby(pred_indcies[i])]
        
        ## remove blanks
        for p in merged_list:
            if p != len(char_list):
                ans += char_list[int(p)]
        
        text_list.append(ans)
        
    return text_list

In [13]:
def create_crnn():
    
    # input with shape of height=32 and width=128 
    inputs = Input(shape=(32, 128, 1), name="image")

    labels = layers.Input(name="label", shape=(None,), dtype="float32")

    conv_1 = Conv2D(32, (3,3), activation = "selu", padding='same')(inputs)
    pool_1 = MaxPool2D(pool_size=(2, 2))(conv_1)
    
    conv_2 = Conv2D(64, (3,3), activation = "selu", padding='same')(pool_1)
    pool_2 = MaxPool2D(pool_size=(2, 2))(conv_2)

    conv_3 = Conv2D(128, (3,3), activation = "selu", padding='same')(pool_2)
    conv_4 = Conv2D(128, (3,3), activation = "selu", padding='same')(conv_3)

    pool_4 = MaxPool2D(pool_size=(2, 1))(conv_4)
    
    conv_5 = Conv2D(256, (3,3), activation = "selu", padding='same')(pool_4)
    
    # Batch normalization layer
    batch_norm_5 = BatchNormalization()(conv_5)
    
    conv_6 = Conv2D(256, (3,3), activation = "selu", padding='same')(batch_norm_5)
    batch_norm_6 = BatchNormalization()(conv_6)
    pool_6 = MaxPool2D(pool_size=(2, 1))(batch_norm_6)
    
    conv_7 = Conv2D(64, (2,2), activation = "selu")(pool_6)
    
    squeezed = Lambda(lambda x: K.squeeze(x, 1))(conv_7)
    
    # bidirectional LSTM layers with units=128
    blstm_1 = Bidirectional(LSTM(128, return_sequences=True))(squeezed)
    blstm_2 = Bidirectional(LSTM(128, return_sequences=True))(blstm_1)

    softmax_output = Dense(len(char_list) + 1, activation = 'softmax', name="dense")(blstm_2)

    output = CTCLayer(name="ctc_loss")(labels, softmax_output) #y_true = labels, y_pred = softmax_output

    #model to be used at training time
    model = Model(inputs=[inputs, labels], outputs=output)
    
    return model

In [14]:
model = create_crnn()

model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
image (InputLayer)              [(None, 32, 128, 1)] 0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 32, 128, 32)  320         image[0][0]                      
__________________________________________________________________________________________________
max_pooling2d (MaxPooling2D)    (None, 16, 64, 32)   0           conv2d[0][0]                     
__________________________________________________________________________________________________
conv2d_1 (Conv2D)               (None, 16, 64, 64)   18496       max_pooling2d[0][0]              
______________________________________________________________________________________________

In [15]:
optimizer = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, clipnorm=1.0)
model.compile(optimizer = optimizer)

file_path = "C_LSTM_best1.hdf5"

checkpoint = ModelCheckpoint(filepath=file_path, 
                            monitor='val_loss', 
                            verbose=1, 
                            save_best_only=True, 
                            mode='min')

callbacks_list = [checkpoint, 
                  EarlyStopping(patience=3, verbose=1)]

In [None]:
history = model.fit(train_dataset, 
                        epochs = 30,
                        validation_data=validation_dataset,
                        verbose = 1,
                        callbacks = callbacks_list,
                        shuffle=True)

In [16]:
model.load_weights('C_LSTM_best.hdf5')

In [17]:
model.get_layer(name="dense").output

<KerasTensor: shape=(None, 31, 63) dtype=float32 (created by layer 'dense')>

In [18]:
# Get the prediction model by extracting layers till the output layer
prediction_model = keras.models.Model(
    model.input[0], model.get_layer(name="dense").output #model.input[0] corresponses model.get_layer(name="inage").input
)
prediction_model.summary()

Model: "model_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
image (InputLayer)           [(None, 32, 128, 1)]      0         
_________________________________________________________________
conv2d (Conv2D)              (None, 32, 128, 32)       320       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 16, 64, 32)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 16, 64, 64)        18496     
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 8, 32, 64)         0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 8, 32, 128)        73856     
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 8, 32, 128)        1475

In [19]:
test_image = cv2.imread("credit_card.png")
test_image = cv2.cvtColor(test_image, cv2.COLOR_BGR2GRAY)

test_image = cv2.resize(test_image, (128, 32))

cv2.imshow("image", test_image)
cv2.waitKey(0)

test_image = test_image / 255

test_image = np.expand_dims(test_image, axis=0)

test_image = test_image.reshape([32, 128, 1])

test_image = test_image[np.newaxis]

In [20]:
preds = prediction_model.predict(test_image)
pred_texts = ctc_decoder(preds)
print(pred_texts)

['CREDITCARD']


In [21]:
text_list = []

print(preds)

pred_indcies = np.argmax(preds, axis=2)

print(pred_indcies)

[[[9.0820930e-07 5.6513776e-09 1.2383899e-07 ... 8.7395343e-08
   1.7204249e-07 6.1065897e-05]
  [1.1961998e-07 5.4290901e-09 1.6112102e-08 ... 2.8037553e-08
   2.9852409e-09 9.9166155e-01]
  [4.1305523e-09 4.3449941e-09 3.3888328e-09 ... 1.4585847e-08
   3.0949904e-10 9.9997580e-01]
  ...
  [3.9084491e-09 3.1678280e-09 3.8347156e-10 ... 4.2657183e-11
   1.6613957e-10 9.9998033e-01]
  [2.2875629e-07 5.9499676e-08 1.1205553e-08 ... 1.5436293e-09
   5.1021911e-09 9.9690896e-01]
  [1.8435275e-06 2.7236644e-09 3.9523425e-08 ... 3.2539202e-07
   1.1442454e-08 1.2383174e-08]]]
[[12 62 62 62 27 62 62 14 62 62 13 62 18 18 62 29 62 62 62 12 62 62 10 62
  62 62 27 62 62 62 13]]


In [37]:
print(len(preds[0])) # max len of word ??? 

31


In [47]:
print(len(preds[0][0]))

63


In [49]:
print(np.argmax(preds[0][0]))

12


In [35]:
print(np.argmax(preds, axis=2))

[[12 62 62 62 27 62 62 14 62 62 13 62 18 18 62 29 62 62 62 12 62 62 10 62
  62 62 27 62 62 62 13]]


In [39]:
print([k for k,_ in groupby(pred_indcies[0])])

[12, 62, 27, 62, 14, 62, 13, 62, 18, 62, 29, 62, 12, 62, 10, 62, 27, 62, 13]


In [None]:
ans = ""

## merge repeats
merged_list = [k for k,_ in groupby(pred_indcies[0])]

## remove blanks
for p in merged_list:
    if p != len(char_list):
        ans += char_list[int(p)]

text_list.append(ans)