# Baseline CRNN Model + CTC loss function to crack Captcha OCR problem

This is the baseline to crack/solve the CAPTCHA image OCR problem without worrying about character segmentation.

In [23]:
from __future__ import absolute_import, division, print_function, unicode_literals
import pathlib
import os
import fnmatch
import cv2
import numpy as np
import string
import time
import json

# make sure we use tensorflow 2.0
import tensorflow as tf
print(tf.__version__)

# import padding library
from tensorflow.keras.preprocessing.sequence import pad_sequences

# import our model, different layers and activation function 
from tensorflow.keras.layers import Dense, LSTM, Reshape, BatchNormalization, Input, Conv2D, MaxPool2D, Lambda, Bidirectional
from tensorflow.keras.models import Model
from tensorflow.keras.activations import relu, sigmoid, softmax
import tensorflow.keras.backend as K
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import CSVLogger, TensorBoard, ModelCheckpoint, EarlyStopping, ReduceLROnPlateau

2.18.0


In [24]:
DATASET_ZIP_PATH="data/captcha_images.zip"
import zipfile
with zipfile.ZipFile(DATASET_ZIP_PATH, 'r') as zip_ref:
    zip_ref.extractall("captcha_project")

In [25]:
!ls

'ls' is not recognized as an internal or external command,
operable program or batch file.


In [26]:
# finding where we are so we dont need to type absolute path everytime
# current_directory_path = pathlib.Path(".").absolute()
current_directory_path = pathlib.Path("./captcha_project")
current_directory_path

WindowsPath('captcha_project')

In [27]:
# path to our directory images

# our raw folders
RAW_FOLDER = os.path.join(current_directory_path, "raw")

In [28]:
print(RAW_FOLDER)

captcha_project\raw


In [29]:
# we working on localhost or VM Cloud for now so no need this code section
# from google.colab.patches import cv2_imshow #only in Colab because using cv2.imshow is not allowed in Colab
# from google.colab import drive
# drive.mount('/gdrive')

In [30]:
#!ls ./captcha_project/raw

## Reading images and organise folders

In [31]:
# checking the total alphabet characters of the whole dataset
raw_data_path = pathlib.Path(RAW_FOLDER)
char_list= set()
dict_file_label={}
for item in raw_data_path.glob('*'):
    label = os.path.basename(os.path.splitext(item)[0])
    dict_file_label[str(item)]=label
    char_list.update(set((label)))
char_list=sorted(char_list)

print("Total number of characters : {}".format(len(char_list)))

Total number of characters : 32


In [32]:
#dict_file_label

In [33]:
# show all possible labels characters
"".join(char_list)

'23456789ABCDEFGHJKLMNPQRSTUVWXYZ'

In [34]:
# find the maximum label length
label_lens= []
for label in dict_file_label.values():
    label_lens.append(len(label))
max_label_len = max(label_lens)

In [35]:
# Only because this dataset is CAPTCHA so the length of all labels are 4.
max_label_len

4

In [36]:
# convert the words to array of indexs based on the char_list
def encode_to_labels(txt):
    # encoding each output word into digits of indexes
    dig_lst = []
    for index, char in enumerate(txt):
        try:
            dig_lst.append(char_list.index(char))
        except:
            print("No found in char_list :", char)
        
    return dig_lst

In [37]:
# testing our encode function (text to number)
encode_to_labels("2345ABCDE") 

[0, 1, 2, 3, 8, 9, 10, 11, 12]

In [38]:
# all possible image paths for training
all_image_paths = list(dict_file_label.keys())

In [39]:
all_image_paths[:5]

['captcha_project\\raw\\222X.png',
 'captcha_project\\raw\\226U.png',
 'captcha_project\\raw\\2274.png',
 'captcha_project\\raw\\22A6.png',
 'captcha_project\\raw\\22BJ.png']

In [40]:
# find all widths and heights of images (this is useful if our dataset images got different sizes)
widths = []
heights = []
for image_path in all_image_paths:
    img = cv2.imread(image_path)
    (height, width, _) = img.shape
    heights.append(height)
    widths.append(width)

In [41]:
min_height = min(heights)
max_height = max(heights)
min_width = min(widths)
max_width = max(widths)

In [42]:
# this information is useful for making decision for padding and resizing
(min_height, max_height, min_width, max_width)

(24, 24, 72, 72)

The result is (24, 24, 72, 72) for this dataset so it is definitely telling us 
that we don't need to resize the image at all. 
But imagine if the result is (a,b,c,d). Then we should resize all images to the height of a or b
Then padding horizontally following the new max width of resized images

In this case, our images got the same size so no need to do anything at all

In [43]:
# being a good Data Scientist, we need to have train set and test set
from sklearn.model_selection import train_test_split
test_size = 0.2
train_image_paths, test_image_paths = train_test_split(all_image_paths, test_size=test_size, random_state=42)

## Preprocessing

In [44]:
# Constants (we choose this number as we later discover that the output of our CNN is (1,9,512) from out input)
TIME_STEPS = 9

In [45]:
# TO DO LIST: BUILD THE PIPELINE FOR THE IMAGES, definitely this is super basic pipeline and can be improved

# lists for training dataset
training_img = []
training_txt = []
train_input_length = []
train_label_length = []
orig_txt = []

i=0
for train_img_path in train_image_paths:
    # print(f_name)
    # read input image and convert into gray scale image
    img = cv2.cvtColor(cv2.imread(train_img_path), cv2.COLOR_BGR2GRAY)

    # in this dataset, we don't need to do any resize at all here.
    
    # add channel dimension
    img = np.expand_dims(img , axis = 2)
    
    # Normalize each image
    img = img/255.
    
    label = dict_file_label[train_img_path]

    # split data into validation and training dataset as 10% and 90% respectively
    orig_txt.append(label)   
    train_label_length.append(len(label))

    # our time steps for valid input
    train_input_length.append(TIME_STEPS)
    training_img.append(img)

    # convert words to digits based on charlist
    training_txt.append(encode_to_labels(label)) 
    i+=1
    if (i%500 == 0):
        print ("has processed trained {} files".format(i))

has processed trained 500 files
has processed trained 1000 files
has processed trained 1500 files
has processed trained 2000 files
has processed trained 2500 files
has processed trained 3000 files
has processed trained 3500 files
has processed trained 4000 files
has processed trained 4500 files
has processed trained 5000 files
has processed trained 5500 files
has processed trained 6000 files
has processed trained 6500 files
has processed trained 7000 files
has processed trained 7500 files


In [46]:
#lists for validation dataset
valid_img = []
valid_txt = []
valid_input_length = []
valid_label_length = []
valid_orig_txt = []

i=0

for test_img_path in test_image_paths:
    # print(f_name)
    # read input image and convert into gray scale image
    img = cv2.cvtColor(cv2.imread(test_img_path), cv2.COLOR_BGR2GRAY)
    
    # in this dataset, we don't need to do any resize at all here.
    
    # add channel dimension
    img = np.expand_dims(img , axis = 2)
    
    # Normalize each image
    img = img/255.

    label = dict_file_label[test_img_path]

    valid_orig_txt.append(label)   
    valid_label_length.append(len(label))

    # our time steps for valid input
    valid_input_length.append(TIME_STEPS)
    valid_img.append(img)

    # convert words to digits based on charlist
    valid_txt.append(encode_to_labels(label))
    i+=1
    if (i%500 == 0):
        print ("has processed test {} files".format(i))

has processed test 500 files
has processed test 1000 files
has processed test 1500 files


In [47]:
# this is the most controversial part when our max_label_len should be set different or slightly smaller than TIME_STEPS
# but let stick to the conventional/normal way: we should to be the same with our TIME_STEPS
max_label_len = TIME_STEPS

In [48]:
# pad each output label to maximum text length, remember we did that so that we keep training with rnn consistent?
train_padded_txt = pad_sequences(training_txt, maxlen=max_label_len, padding='post', value = 0)
valid_padded_txt = pad_sequences(valid_txt, maxlen=max_label_len, padding='post', value = 0)

## Model Building

In [49]:
# OUR FULL MODEL OF CRNN AND LSTM

# input with shape of height=32 and width=128 
inputs = Input(shape=(24,72,1))
 
# convolution layer with kernel size (3,3)  #(24,72,64)
conv_1 = Conv2D(64, (3,3), activation = 'relu', padding='same')(inputs)

# poolig layer with kernel size (2,2) to make the height/2 and width/2  #(12,36,64)
pool_1 = MaxPool2D(pool_size=(2, 2), strides=2)(conv_1)

#(12,36,128)
conv_2 = Conv2D(128, (3,3), activation = 'relu', padding='same')(pool_1)

# poolig layer with kernel size (2,2) to make the height/2 and width/2  #(6,18,128)
pool_2 = MaxPool2D(pool_size=(2, 2), strides=2)(conv_2)

#(6,18,256)
conv_3 = Conv2D(256, (3,3), activation = 'relu', padding='same')(pool_2)

# poolig layer with kernel size (2,2) to make the height/2  #(3,9,256)
pool_3 = MaxPool2D(pool_size=(2, 2))(conv_3)

batch_norm_3 = BatchNormalization()(pool_3)

#(3,9,256)
conv_4 = Conv2D(256, (3,3), activation = 'relu', padding='same')(batch_norm_3)

# Batch normalization layer #(3,9,256)
batch_norm_5 = BatchNormalization()(conv_4)

#(3,9,512)
conv_6 = Conv2D(512, (3,3), activation = 'relu', padding='same')(batch_norm_5)
batch_norm_6 = BatchNormalization()(conv_6)

# poolig layer with kernel size (2,2) to make the height/2 #(1,9,512)
pool_6 = MaxPool2D(pool_size=(3, 1))(batch_norm_6)
 
# # to remove the first dimension of one: (1,9,512) -> (9,512)
squeezed = Lambda(lambda x: K.squeeze(x, 1))(pool_6)
 
# # # bidirectional LSTM layers with units=128
blstm_1 = Bidirectional(LSTM(256, return_sequences=True, dropout = 0.2))(squeezed)
blstm_2 = Bidirectional(LSTM(256, return_sequences=True, dropout = 0.2))(blstm_1)

# # this is our softmax character proprobility with timesteps 
outputs = Dense(len(char_list)+1, activation = 'softmax')(blstm_2)

# model to be used at test time

act_model = Model(inputs, outputs)




In [50]:
act_model.summary()

In [51]:
### ctc definition part 
"""
The reason why we use ctc_batch_cost instead of ctc_cost because
In batch cost, the program managed to mask out padded sequence using label_length, 
So the cost won't include the padding 0 section.
"""

"""
The way CTC is modelled currently in Keras is that you need to implement the loss function as a layer, you did that already (loss_out). 
Your problem is that the inputs you give that layer are not tensors from Theano/TensorFlow but numpy arrays.
To change that one option is to model these values as inputs to your model.
That is why we need to manually create Input for labels, input length and label_length.
"""

# define the label input shape for ctc
labels = Input(name='the_labels', shape=[max_label_len], dtype='float32')

"""
where input_length and label_length are constants you created previously
the easiest way here is to have a fixed batch size in training 
the lengths should have the same batch size (see shapes in the link for ctc_cost)   
"""

# define the length of input and label for ctc
input_length = Input(name='input_length', shape=[1], dtype='int64')
label_length = Input(name='label_length', shape=[1], dtype='int64')
 
# define a ctc lambda function to take arguments and return ctc_bach_cost
def ctc_lambda_func(args):
    y_pred, labels, input_length, label_length = args
    """
    labels: tensor (number of samples, max_string_length) containing the truth labels.
    y_pred: tensor (number of samples, time_steps, num_character_labels) containing the prediction, or output of the softmax.
    input_length: tensor (number of samples, 1) containing the sequence length for each batch item in y_pred.
    label_length: tensor (number of samples, 1) containing the sequence length for each batch item in y_true.
    """
    return K.ctc_batch_cost(labels, y_pred, input_length, label_length)
 
# out loss function (just take the inputs and put it in our ctc_batch_cost)
loss_out = Lambda(ctc_lambda_func, output_shape=(1,), name='ctc')([outputs, labels, input_length, label_length])

#model to be used at training time
model = Model(inputs=[inputs, labels, input_length, label_length], outputs=loss_out)

In [53]:
# ready ctc loss function and optimizers
model.compile(loss={'ctc': lambda y_true, y_pred: y_pred}, optimizer = 'adam')

# our callbacks hell to optimize our learning
callbacks = [
    TensorBoard(
        log_dir='./logs',
        histogram_freq=10,
        profile_batch=0,
        write_graph=True,
        write_images=False,
        update_freq="epoch"),
    ModelCheckpoint(
        filepath='checkpoint_weights.weights.h5',
        monitor='val_loss',
        save_best_only=True,
        save_weights_only=True,
        verbose=1),
    EarlyStopping(
        monitor='val_loss',
        min_delta=1e-8,
        patience=15,
        restore_best_weights=True,
        verbose=1),
    ReduceLROnPlateau(
        monitor='val_loss',
        min_delta=1e-8,
        factor=0.2,
        patience=10,
        verbose=1)
]
callbacks_list = callbacks

In [54]:
model.summary()

In [55]:
# notice I convert everything to numpy array here since model can fit either tensors and numpy array
# the reason why i stick with numpy array because it is simple to code and also because it is baseline solution
# I want to use openCV (can't run with tensors because it is CPU operations)
# And if I want to use tensors then i will make sure my image preprocessing functions uses tensor compatible operations

# ready our training data
training_img = np.array(training_img)
train_input_length = np.array(train_input_length)  # all must be equal length to T timesteps
train_label_length = np.array(train_label_length)  # different length (only the same in Captcha dataset)

# ready our test data
valid_img = np.array(valid_img)
valid_input_length = np.array(valid_input_length) # all must be equal length to T timesteps
valid_label_length = np.array(valid_label_length) # different length (only the same in Captcha dataset)

## Model Training

 **Yann LeCun** said:
"Training with large minibatches is bad for your health.
 More importantly, it's bad for your test error.
 Friends dont let friends use minibatches larger than 32."

In [56]:
# choose batchsize and epochs

batch_size = 32
epochs = 100

model.fit(x=[training_img, train_padded_txt, train_input_length, train_label_length], 
          y=np.zeros(len(training_img)),
          batch_size=batch_size, 
          epochs = epochs,
          validation_data = ([valid_img, valid_padded_txt, valid_input_length, valid_label_length], [np.zeros(len(valid_img))]),
          verbose = 1, callbacks = callbacks_list)

Epoch 1/100



  return {key: serialize_keras_object(value) for key, value in obj.items()}



[1m249/249[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 143ms/step - loss: 10.7170
Epoch 1: val_loss improved from inf to 50.51460, saving model to checkpoint_weights.weights.h5
[1m249/249[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m46s[0m 157ms/step - loss: 10.6954 - val_loss: 50.5146 - learning_rate: 0.0010
Epoch 2/100
[1m 82/249[0m [32m━━━━━━[0m[37m━━━━━━━━━━━━━━[0m [1m24s[0m 149ms/step - loss: 0.1387

KeyboardInterrupt: 

In [None]:
# save our model
model.save('lastest_model.h5')

: 

## Decode and Prediction

In [None]:
# load the saved best model weights
act_model.load_weights('lastest_model.h5')

: 

In [None]:
# predict outputs on validation images
NO_PREDICTS = 200
OFFSET=0
prediction = act_model.predict(valid_img[OFFSET:OFFSET+NO_PREDICTS])
#prediction = act_model.predict(valid_img)

: 

In [None]:
valid_img.shape

: 

In [None]:
# NO_PREDICTS image results
# 9 timesteps
# 32 chars + 1 blank
prediction.shape

: 

In [None]:
# use CTC decoder
out = K.get_value(K.ctc_decode(prediction, input_length=np.ones(prediction.shape[0])*prediction.shape[1],
                         greedy=True)[0][0])
 
# see the results
all_predictions =[]
i = 0
for x in out:
    print("original_text  = ", valid_orig_txt[i+OFFSET])
    print("predicted text = ", end = '')
    pred = ""
    for p in x:  
        if int(p) != -1:
            pred += char_list[int(p)]
    print(pred)
    all_predictions.append(pred)
    i+=1

: 

In [None]:
# WRITE TO PRINT OUT THE IMAGES IN NICE 2D ARRAY PLOT WITH ORIGINAL TEXT AND PREDICTED TEXT 
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
plt.figure(figsize=(10,100))
col=0
row=1
gs1 = gridspec.GridSpec(NO_PREDICTS//5, 5)
gs1.update(wspace=0.025, hspace=0.025)
for n in range(NO_PREDICTS):
    plt.subplot(gs1[n])
    plt.title("True:"+valid_orig_txt[n+OFFSET], fontsize=15, color="green")
    plt.imshow(valid_img[n][:,:,0], cmap="gray")
    plt.xlabel("Pred:"+all_predictions[n+OFFSET], fontsize=15, color="red")


: 

## Evaluation using CER, WER and SER

Tool to metrics calculation through data and label (string and string).
 * Calculation from Optical Character Recognition (OCR) metrics with editdistance.
This is borrowed from https://github.com/arthurflor23/handwritten-text-recognition/blob/master/src/data/evaluation.py

It is not needed to understand this function deeply, it only helps to calculate three evaluation metris:
- CER (Character Error Rate)
- WER (Word Error Rate)
- SER (Sequence Error Rate)

In [None]:
import string
import unicodedata
import editdistance

def ocr_metrics(predicts, ground_truth, norm_accentuation=False, norm_punctuation=False):
    """Calculate Character Error Rate (CER), Word Error Rate (WER) and Sequence Error Rate (SER)"""

    if len(predicts) == 0 or len(ground_truth) == 0:
        return (1, 1, 1)

    cer, wer, ser = [], [], []

    for (pd, gt) in zip(predicts, ground_truth):

        if norm_accentuation:
            pd = unicodedata.normalize("NFKD", pd).encode("ASCII", "ignore").decode("ASCII")
            gt = unicodedata.normalize("NFKD", gt).encode("ASCII", "ignore").decode("ASCII")

        if norm_punctuation:
            pd = pd.translate(str.maketrans("", "", string.punctuation))
            gt = gt.translate(str.maketrans("", "", string.punctuation))

        pd_cer, gt_cer = list(pd.lower()), list(gt.lower())
        dist = editdistance.eval(pd_cer, gt_cer)
        cer.append(dist / (max(len(pd_cer), len(gt_cer))))

        pd_wer, gt_wer = pd.lower().split(), gt.lower().split()
        dist = editdistance.eval(pd_wer, gt_wer)
        wer.append(dist / (max(len(pd_wer), len(gt_wer))))

        pd_ser, gt_ser = [pd], [gt]
        dist = editdistance.eval(pd_ser, gt_ser)
        ser.append(dist / (max(len(pd_ser), len(gt_ser))))

    cer_f = sum(cer) / len(cer)
    wer_f = sum(wer) / len(wer)
    ser_f = sum(ser) / len(ser)

    return (cer_f, wer_f, ser_f)

: 

In [None]:
evaluate = ocr_metrics(predicts=all_predictions,
                                  ground_truth=valid_orig_txt,
                                  norm_accentuation=False,
                                  norm_punctuation=False)

e_corpus = "\n".join([
    "Metrics:",
    "Character Error Rate: {}".format(evaluate[0]),
    "Word Error Rate:      {}".format(evaluate[1]),
    "Sequence Error Rate:  {}".format(evaluate[2]),
])

: 

This is a easy dataset so I got perfect score for absolutely the test set! Not even a challenge for CRNN power!

In [None]:
print(e_corpus)

: 

**So what do we learn from here**:
- CRNN + CTC is not that challenging, just want sure we follow above process step by step
- Keep our height and width is a power of 2 or at least even number is making our time much 
easier to divide by half (it is not really important, since it is related to design your model and preprocessing)
- The number bi LSTM paramater is larger the number of timestep since our biLSTM /2 will be at least the size of hidden node for each single LSTM.
- The max label length should be the same to the number of time steps, but some people report if they set it to be slightly lower than time step, it helps. 
But you should stick with the basics!
- The data is super clean and same image dimension. So for other datasets, maybe a bit of noise cleaning and binarization may help!


**Things to improve for other datasets**:
- Resize image logics with multiple image sizes (maybe as following):
    + find min, max of height and width
    + resize to a fixed height you want
    + calculate the max width of all resized images
    + padding to all images to that max width
- Combine the logic of preprocessing of train set and test set together
- Convert them to tfdataset pipeline (note that it is challenging since OpenCV won't work with tensor)
- Rework/redesign new CRNN model to fit with your new processed images. 