In [None]:
import os
import glob
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, callbacks, optimizers, mixed_precision
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, Callback
from tensorflow.keras.optimizers.schedules import CosineDecayRestarts
from tensorflow.keras.optimizers import AdamW

img_w, img_h     = 300, 80       # CAPTCHA size
batch_size       = 16
max_label_len    = 10
chars            = '0123456789abcdefghijklmnopqrstuvwxyz'
num_classes      = len(chars) + 1  # +1 for CTC blank
blank_label      = num_classes - 1

data_dir         = 'preprocessing/preprocessed_images'
all_paths        = glob.glob(os.path.join(data_dir, '*.png'))
np.random.shuffle(all_paths)
n = len(all_paths)
train_paths = all_paths[:int(0.7 * n)]
val_paths   = all_paths[int(0.7 * n):]

AUTOTUNE = tf.data.AUTOTUNE

mixed_precision.set_global_policy('mixed_float16')

char_to_num = {c: i for i, c in enumerate(chars)}
num_to_char = {i: c for c, i in char_to_num.items()}

def make_label_arrays(paths):
    seqs, lens = [], []
    for p in paths:
        name  = os.path.basename(p)
        label = name.split('-', 1)[0]                  
        arr   = [char_to_num[c] for c in label if c in char_to_num]
        arr   = arr[:max_label_len]
        lens.append(len(arr))
        arr  += [blank_label] * (max_label_len - len(arr))  # pad
        seqs.append(arr)
    return np.array(seqs, dtype=np.int32), np.array(lens, dtype=np.int32)

train_labels, train_lens = make_label_arrays(train_paths)
val_labels,   val_lens   = make_label_arrays(val_paths)

def load_and_preprocess(path, lbl_seq, lbl_len):
    img = tf.io.read_file(path)
    img = tf.io.decode_png(img, channels=1)
    img = tf.image.resize(img, [img_h, img_w])
    img = tf.cast(img, tf.float32) / 255.0

    img = tf.image.random_brightness(img, 0.2)
    img = tf.image.random_contrast(img, 0.5, 1.5)

    return {'input_image': img, 'labels': lbl_seq, 'label_length': lbl_len}, 0.0

def make_dataset(paths, labels, lens, shuffle=True):
    ds = tf.data.Dataset.from_tensor_slices((paths, labels, lens))
    if shuffle:
        ds = ds.shuffle(len(paths), reshuffle_each_iteration=True)
    ds = ds.map(load_and_preprocess, num_parallel_calls=AUTOTUNE)
    ds = ds.cache()
    ds = ds.batch(batch_size * 2, drop_remainder=True)  
    ds = ds.prefetch(AUTOTUNE)
    return ds

train_ds = make_dataset(train_paths, train_labels, train_lens, shuffle=True)
val_ds   = make_dataset(val_paths,   val_labels,   val_lens,   shuffle=False)

def build_crnn():
    inp     = layers.Input((img_h, img_w, 1), name='input_image')
    labels  = layers.Input((max_label_len,), dtype='int32', name='labels')
    lbl_len = layers.Input((), dtype='int32', name='label_length')

    x = inp
    pools = [(2,2),(2,2),(2,1),(2,1)]
    for filters, pool in zip([64,128,256,512], pools):
        x = layers.Conv2D(filters, 3, padding='same', use_bias=False)(x)
        x = layers.BatchNormalization()(x)
        x = layers.LeakyReLU()(x)
        x = layers.MaxPooling2D(pool)(x)

    time_steps  = img_w // 4
    height_pool = img_h // 16
    flat_dim    = height_pool * 512

    x = layers.Permute((2,1,3))(x)
    x = layers.Reshape((time_steps, flat_dim))(x)

    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.3)(x)
    x = layers.Bidirectional(layers.LSTM(256, return_sequences=True, dropout=0.3))(x)
    x = layers.Bidirectional(layers.LSTM(256, return_sequences=True, dropout=0.3))(x)

    y_pred = layers.Dense(num_classes, activation='softmax', name='y_pred')(x)

    def ctc_loss_fn(args):
        y_p, lbls, ll = args
        b   = tf.shape(y_p)[0]
        t   = tf.shape(y_p)[1]
        il  = tf.fill([b,1], t)
        ll2 = tf.reshape(ll, [b,1])
        return tf.keras.backend.ctc_batch_cost(lbls, y_p, il, ll2)

    def ctc_out_shape(shapes):
        return (shapes[0][0], 1)

    loss_out = layers.Lambda(
        ctc_loss_fn,
        output_shape=ctc_out_shape,
        name='ctc'
    )([y_pred, labels, lbl_len])

    model   = tf.keras.Model([inp, labels, lbl_len], loss_out)
    pred_md = tf.keras.Model(inp, y_pred)
    return model, pred_md

model, pred_model = build_crnn()
model.summary()

class CTCMonitor(Callback):
    def __init__(self, dataset, name):
        self.ds, self.name = dataset, name
    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        tot_s=cor_s=tot_c=cor_c=0
        for batch in self.ds:
            imgs = batch[0]['input_image']
            lbls = batch[0]['labels'].numpy()
            preds = pred_model.predict(imgs, verbose=0)
            L = np.ones(preds.shape[0]) * preds.shape[1]
            decs,_ = tf.keras.backend.ctc_decode(
                preds, input_length=L,
                greedy=False, beam_width=5, top_paths=1
            )
            for p, t in zip(decs[0].numpy(), lbls):
                s   = ''.join(num_to_char[i] for i in p   if i!=blank_label)
                t_s = ''.join(num_to_char[i] for i in t   if i!=blank_label)
                tot_s += 1
                cor_s += (s==t_s)
                tot_c += len(t_s)
                cor_c += sum(s[i]==t_s[i] for i in range(min(len(s),len(t_s))))
        sa, ca = cor_s/tot_s, cor_c/tot_c
        print(f"\n[{self.name}] string_acc={sa:.2%}, char_acc={ca:.2%}")
        if self.name=='Val': logs['val_string_acc'] = sa

steps   = tf.data.experimental.cardinality(train_ds).numpy()
lr_sched = CosineDecayRestarts(
    initial_learning_rate=1e-3,
    first_decay_steps=steps*5,
    t_mul=2.0, m_mul=1.0, alpha=1e-6
)
opt = AdamW(
    learning_rate=lr_sched,
    weight_decay=1e-5,
    clipnorm=5.0
)

model.compile(
    optimizer=opt,
    loss={'ctc': lambda y_true, y_pred: y_pred}
)

cbs = [
    CTCMonitor(train_ds, 'Train'),
    CTCMonitor(val_ds,   'Val'),
    ModelCheckpoint('best_crnn.h5', monitor='val_string_acc',
                    save_best_only=True, mode='max', verbose=1),
    EarlyStopping(monitor='val_string_acc', patience=10,
                  restore_best_weights=True, mode='max', verbose=1)
]




In [None]:
model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=100,
    callbacks=cbs
)

In [None]:
def evaluate_model(model, eval_paths, batch_size, name="Test"):
    # Make label arrays from paths
    eval_labels, eval_lens = make_label_arrays(eval_paths)

    eval_ds = make_dataset(eval_paths, eval_labels, eval_lens, shuffle=False)

    pred_model = keras.Model(
        inputs=model.get_layer('input_image').input,
        outputs=model.get_layer('y_pred').output
    )

    total_strings = 0
    correct_strings = 0
    total_chars = 0
    correct_chars = 0

    for batch in eval_ds:
        imgs = batch[0]['input_image']
        lbls = batch[0]['labels'].numpy()

        preds = pred_model.predict(imgs, verbose=0)
        input_lengths = np.ones(preds.shape[0]) * preds.shape[1]

        decoded, _ = tf.keras.backend.ctc_decode(
            preds, input_length=input_lengths,
            greedy=False, beam_width=5, top_paths=1
        )
        decoded = decoded[0].numpy()

        for i, pred_seq in enumerate(decoded):
            pred_text = ''.join(num_to_char.get(ch, '') for ch in pred_seq if ch != blank_label)
            true_text = ''.join(num_to_char.get(ch, '') for ch in lbls[i] if ch != blank_label)

            if pred_text == true_text:
                correct_strings += 1
            total_strings += 1

            match_len = min(len(pred_text), len(true_text))
            correct_chars += sum(1 for a, b in zip(pred_text, true_text) if a == b)
            total_chars += len(true_text)

    string_acc = (correct_strings / total_strings) if total_strings else 0
    char_acc = (correct_chars / total_chars) if total_chars else 0
    print(f"\n[{name}] Final Evaluation:")
    print(f"String Accuracy:    {string_acc:.2%}")
    print(f"Character Accuracy: {char_acc:.2%}")


In [None]:
evaluate_model(model, val_paths, batch_size, name="Validation")