In [1]:
DATA_PATH = '/home/liauzhanyi/cs4243/data'
TEST_DATASET_PATH = DATA_PATH + '/test'
TRAIN_DATASET_PATH = DATA_PATH + '/train'
OUTPUT_PATH = f'{DATA_PATH}/segmented_train'
SEGMENTED_TRAIN_DATASET_PATH = f'{DATA_PATH}/segmented_train'
AUGMENTED_SEGMENTED_TRAIN_DATASET_PATH = f'{DATA_PATH}/augmented_segmented_train'

IMG_HEIGHT, IMG_WIDTH = 80, 400

In [2]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

from tensorflow.keras.preprocessing.image import ImageDataGenerator

2024-10-26 09:55:18.844444: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1729907718.943670    6907 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1729907718.977884    6907 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-10-26 09:55:19.241953: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
def dilate(image, kernel_size=(3, 3)):
    kernel = np.ones(kernel_size, np.uint8)
    dilation = cv2.dilate(image, kernel, iterations=1)
    return dilation

def erode(image, kernel_size=(3, 3)):
    kernel = np.ones(kernel_size, np.uint8)
    erosion = cv2.erode(image, kernel, iterations=1)
    return erosion

def gaussian_thresholding(image, kernel=(5, 5), block_size=11, C=2):
    blur = cv2.GaussianBlur(image, kernel, 0)
    thresholded_image = cv2.adaptiveThreshold(
        blur, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
        cv2.THRESH_BINARY, blockSize=block_size, C=C
    )
    return thresholded_image

def segment_characters(image):
    inverted_image = cv2.bitwise_not(image)
    contours, _ = cv2.findContours(inverted_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    char_images = []
    for contour in contours:
        x, y, w, h = cv2.boundingRect(contour)
        if h > 10:  # Filter small noise
            char_img = image[y:y+h, x:x+w]
            char_images.append((x, char_img))
    char_images = sorted(char_images, key=lambda c: c[0])  # Sort by x-coordinate

    return [char[1] for char in char_images]

In [9]:
def process_and_save_images(input_dir, output_dir):
    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Keep track of how many captchas were skipped
    skipped_count = 0

    # Process each image in the input directory
    for filename in os.listdir(input_dir):
        if filename.endswith('.png'):
            # Extract the true label from the filename
            base_name = filename.split('-')[0]

            # Load and preprocess the image
            image_path = os.path.join(input_dir, filename)
            image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)

            # Apply preprocessing steps
            image = cv2.bitwise_not(image)
            image = erode(image, kernel_size=(2, 2))
            image = cv2.bitwise_not(image)
            image = gaussian_thresholding(image, kernel=(5, 5), block_size=11, C=2)
            image = erode(image, kernel_size=(2, 2))
            image = gaussian_thresholding(image, kernel=(5, 5), block_size=11, C=2)
            image = erode(image, kernel_size=(2, 2))

            # Segment characters
            char_images = segment_characters(image)

            # Validate the number of segmented parts
            if len(char_images) != len(base_name):
                print(f"Skipping {filename}: Expected {len(base_name)} parts, got {len(char_images)}.")
                skipped_count += 1
                continue  # Skip saving this image

            # Save each segmented character follow the naming convention <label_str>-<char>.png
            for idx, char_img in enumerate(char_images):
                char_label = base_name[idx]
                save_name = f"{base_name}-{char_label}.png"
                save_path = os.path.join(output_dir, save_name)

                # Save the segmented character
                cv2.imwrite(save_path, char_img)

    print(f"Processing complete! Segmented characters are saved in {output_dir}")
    print(f"Total skipped: {skipped_count}")

process_and_save_images(TRAIN_DATASET_PATH, SEGMENTED_TRAIN_DATASET_PATH)


Skipping f9305osy-0.png: Expected 8 parts, got 13.
Skipping shfkvjtx-0.png: Expected 8 parts, got 7.
Skipping mhpky4-0.png: Expected 6 parts, got 7.
Skipping 2at5l1k9-0.png: Expected 8 parts, got 2.
Skipping ifg6ly-0.png: Expected 6 parts, got 7.
Skipping md79ncz4-0.png: Expected 8 parts, got 9.
Skipping ot728-0.png: Expected 5 parts, got 6.
Skipping ji9zjl3b-0.png: Expected 8 parts, got 7.
Skipping hiczvfiu-0.png: Expected 8 parts, got 7.
Skipping 6rw7x-0.png: Expected 5 parts, got 4.
Skipping 5uzppk-0.png: Expected 6 parts, got 7.
Skipping wjss-0.png: Expected 4 parts, got 3.
Skipping 916e-0.png: Expected 4 parts, got 3.
Skipping j5e1ag-0.png: Expected 6 parts, got 5.
Skipping 4d3wu-0.png: Expected 5 parts, got 6.
Skipping pw6m9n-0.png: Expected 6 parts, got 8.
Skipping gdrbjx-0.png: Expected 6 parts, got 5.
Skipping u5zba0-0.png: Expected 6 parts, got 5.
Skipping 69n150-0.png: Expected 6 parts, got 5.
Skipping bsqrf4-0.png: Expected 6 parts, got 4.
Skipping k0x12-0.png: Expected 5 p

In [10]:
def augment_and_save_images(input_dir, output_dir, augment_count=3):
    """Augment images from `input_dir` and save them with proper naming to `output_dir`."""
    os.makedirs(output_dir, exist_ok=True)  # Ensure the output directory exists

    # Define a more controlled data augmentation pipeline
    datagen = ImageDataGenerator(
        rotation_range=10,          # Small rotation for better quality
        width_shift_range=0.05,     # Slight horizontal shift
        height_shift_range=0.05,    # Slight vertical shift
        shear_range=0.1,            # Mild shearing
        zoom_range=0.1,             # Mild zoom
        fill_mode='nearest'         # Avoid introducing artifacts
    )

    # Loop through all images in the input directory
    for filename in os.listdir(input_dir):
        if filename.endswith('.png'):
            # Load the original image
            image_path = os.path.join(input_dir, filename)
            img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)

            # Expand dimensions for augmentation
            img = np.expand_dims(img, axis=-1)  # Add channel dimension
            img = np.expand_dims(img, axis=0)   # Add batch dimension

            # Extract the base filename without extension (e.g., 'abc-o')
            base_name = filename.split('.')[0]

            # Generate and save augmented images
            for i, batch in enumerate(datagen.flow(img, batch_size=1)):
                # Save each augmented image with proper naming
                augmented_filename = f"{base_name}-{i + 1}.png"
                augmented_path = os.path.join(output_dir, augmented_filename)

                # Save the augmented image
                cv2.imwrite(augmented_path, batch[0].squeeze().astype(np.uint8))

                # Stop after generating `augment_count` images per original image
                if i + 1 >= augment_count:
                    break

    print(f"Augmentation complete! Augmented images saved in {output_dir}")


# Example usage
input_dir = SEGMENTED_TRAIN_DATASET_PATH
output_dir = AUGMENTED_SEGMENTED_TRAIN_DATASET_PATH
augment_and_save_images(input_dir, output_dir)


Augmentation complete! Augmented images saved in /home/liauzhanyi/cs4243/data/augmented_segmented_train


In [3]:
from sklearn.model_selection import train_test_split

output_dir = AUGMENTED_SEGMENTED_TRAIN_DATASET_PATH

# Define character set (e.g., 0-9, a-z)
CHAR_SET = "0123456789abcdefghijklmnopqrstuvwxyz"
CHAR_TO_IDX = {char: idx for idx, char in enumerate(CHAR_SET)}

def load_dataset(data_dir):
    images = []
    labels = []

    for filename in os.listdir(data_dir):
        if filename.endswith('.png'):
            # Extract the character from the filename (e.g., 'o' from 'oq1j-o.png')
            char_label = filename.split('-')[1][0]

            if char_label in CHAR_TO_IDX:
                # Load the image in grayscale
                image_path = os.path.join(data_dir, filename)
                img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)

                # Resize to 28x28 (if needed)
                img = cv2.resize(img, (28, 28))
                img = img / 255.0  # Normalize pixel values to [0, 1]

                # Store the image and label
                images.append(img)
                labels.append(CHAR_TO_IDX[char_label])

    # Convert to numpy arrays
    images = np.array(images).reshape(-1, 28, 28, 1)  # Add channel dimension
    labels = np.array(labels)

    return images, labels

# Load the dataset
images, labels = load_dataset(output_dir)
print(f"Loaded {len(images)} images with {len(set(labels))} unique labels.")

Loaded 112638 images with 36 unique labels.


In [4]:
# Split the data into training and validation sets (80% train, 20% validation)
X_train, X_val, y_train, y_val = train_test_split(images, labels, test_size=0.2, random_state=42)

print(f"Training set size: {X_train.shape[0]}, Validation set size: {X_val.shape[0]}")

Training set size: 90110, Validation set size: 22528


In [None]:
#MODEL 1 ~8%

# import tensorflow as tf
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout

# def build_model(num_classes):
#     model = Sequential([
#         Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
#         MaxPooling2D((2, 2)),
#         Dropout(0.2),

#         Conv2D(64, (3, 3), activation='relu'),
#         MaxPooling2D((2, 2)),
#         Dropout(0.2),

#         Flatten(),
#         Dense(128, activation='relu'),
#         Dropout(0.5),
#         Dense(num_classes, activation='softmax')
#     ])

#     model.compile(optimizer='adam',
#                   loss='sparse_categorical_crossentropy',
#                   metrics=['accuracy'])
#     return model

# # Build the model for 36 classes (0-9, a-z)
# num_classes = len(CHAR_SET)
# model = build_model(num_classes)
# model.summary()

In [None]:
#MODEL 2 ~24%

# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Conv2D, BatchNormalization, MaxPooling2D, Flatten, Dense, Dropout, ReLU

# def build_custom_cnn(num_classes):
#     model = Sequential()

#     # First Conv Block
#     model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1), padding='same'))
#     model.add(BatchNormalization())
#     model.add(MaxPooling2D(pool_size=(2, 2)))

#     # Second Conv Block
#     model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
#     model.add(BatchNormalization())
#     model.add(MaxPooling2D(pool_size=(2, 2)))

#     # Third Conv Block
#     model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
#     model.add(BatchNormalization())
#     model.add(MaxPooling2D(pool_size=(2, 2)))

#     # Fully Connected Layers
#     model.add(Flatten())
#     model.add(Dropout(0.5))
#     model.add(Dense(128, activation='relu'))
#     model.add(Dense(36, activation='softmax'))  # 36 classes: 0-9 + a-z

#     # Compile the model
#     model.compile(optimizer='adam',
#                   loss='sparse_categorical_crossentropy',
#                   metrics=['accuracy'])

#     return model


# num_classes = len(CHAR_SET)
# model = build_model(num_classes)
# model.summary()


In [None]:
# #MODEL 3 ~33% (AlexNet)

# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, ReLU

# def build_alexnet():
#     model = Sequential()

#     # 1st Convolutional Layer
#     model.add(Conv2D(64, (3, 3), strides=1, padding='same', input_shape=(28, 28, 1)))
#     model.add(BatchNormalization())
#     model.add(ReLU())
#     model.add(MaxPooling2D(pool_size=(2, 2)))

#     # 2nd Convolutional Layer
#     model.add(Conv2D(128, (3, 3), strides=1, padding='same'))
#     model.add(BatchNormalization())
#     model.add(ReLU())
#     model.add(MaxPooling2D(pool_size=(2, 2)))

#     # 3rd Convolutional Layer
#     model.add(Conv2D(256, (3, 3), strides=1, padding='same'))
#     model.add(BatchNormalization())
#     model.add(ReLU())
#     model.add(MaxPooling2D(pool_size=(2, 2)))

#     # Flatten the output for Fully Connected Layers
#     model.add(Flatten())

#     # 1st Fully Connected Layer
#     model.add(Dense(512, activation='relu'))
#     model.add(Dropout(0.5))

#     # 2nd Fully Connected Layer
#     model.add(Dense(512, activation='relu'))
#     model.add(Dropout(0.5))

#     # Output Layer
#     model.add(Dense(36, activation='softmax'))

#     model.compile(optimizer='adam',
#                   loss='sparse_categorical_crossentropy',
#                   metrics=['accuracy'])

#     return model

# model = build_alexnet()
# model.summary()

In [5]:
import tensorflow as tf
from tensorflow.keras import layers, Model

def build_crnn_model(input_shape=(28, 28, 1), num_classes=36, dropout_rate=0.5):
    inputs = layers.Input(shape=input_shape)

    # Convolutional layers
    x = layers.Conv2D(32, (3, 3), padding='same', activation='relu')(inputs)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)

    x = layers.Conv2D(64, (3, 3), padding='same', activation='relu')(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)

    x = layers.Conv2D(128, (3, 3), padding='same', activation='relu')(x)

    # Adding a Dropout layer after convolutional layers
    x = layers.Dropout(dropout_rate)(x)

    # Calculate the new shape
    height = x.shape[1]  # height after convolution and pooling
    width = x.shape[2]   # width after convolution and pooling
    channels = x.shape[3]  # channels after convolution
    new_shape = (height, width * channels)  # Reshape to (time_steps, features)

    # Reshape for LSTM input
    x = layers.Reshape(target_shape=new_shape)(x)

    # Recurrent layers
    x = layers.Bidirectional(layers.LSTM(128, return_sequences=True))(x)
    x = layers.Dropout(dropout_rate)(x)  # Adding dropout after the first LSTM

    x = layers.Bidirectional(layers.LSTM(128, return_sequences=False))(x)
    x = layers.Dropout(dropout_rate)(x)  # Adding dropout after the second LSTM

    # Output layer
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = Model(inputs, outputs)
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

    return model

# Build the CRNN model
model = build_crnn_model()
model.summary()


I0000 00:00:1729907801.861060    6907 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 2259 MB memory:  -> device: 0, name: NVIDIA GeForce GTX 1650, pci bus id: 0000:01:00.0, compute capability: 7.5


In [6]:
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=3,
    restore_best_weights=True
)
lr_scheduler = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.1,
    patience=3
)

# Train the model with EarlyStopping
history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=100,
    batch_size=64,
    callbacks=[early_stopping, lr_scheduler]
)

: 

In [None]:
# Convert to a DataFrame
import pandas as pd
history_df = pd.DataFrame(history.history)

# Visualize the History
import plotly.express as px
fig = px.line(history_df, y=["loss", "val_loss"], title="Learning Curve")
fig.update_layout(
    xaxis_title="Epochs",
    yaxis_title="Sparse Categorical Crossentropy",
)
fig.show()

In [None]:
val_loss, val_accuracy = model.evaluate(X_val, y_val)
print(f"Validation Accuracy: {val_accuracy * 100:.2f}%")


In [None]:
# Save the trained model
def save_model(model, dir_path, model_name="character_recognition_model.h5"):
    # Ensure the directory exists
    os.makedirs(dir_path, exist_ok=True)

    # Create the full path to the model file
    model_path = os.path.join(dir_path, model_name)

    # Save the model
    model.save(model_path)
    print(f"Model saved successfully at: {model_path}")

# Example usage
save_dir = DATA_PATH + '/model'
save_model(model, save_dir, 'crnn_85_37.keras')

In [14]:
import os
import cv2
import numpy as np
from tensorflow.keras.models import load_model

# Load the trained model
MODEL_PATH = DATA_PATH + '/model/alexnet_v2.keras'
model = load_model(MODEL_PATH)

# Define the character set used during training
CHAR_SET = "0123456789abcdefghijklmnopqrstuvwxyz"
CHAR_MAP = {idx: char for idx, char in enumerate(CHAR_SET)}

def preprocess_image(image_path):
    """Preprocess the image using the same steps as in training."""
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    image = cv2.bitwise_not(image)
    image = erode(image, kernel_size=(2, 2))
    image = cv2.bitwise_not(image)
    image = scipy.ndimage.median_filter(image, (3, 1))
    image = scipy.ndimage.median_filter(image, (1, 3))
    image = gaussian_thresholding(image, kernel=(5, 5), block_size=11, C=2)
    image = erode(image, kernel_size=(2, 2))
    image = gaussian_thresholding(image, kernel=(5, 5), block_size=11, C=2)
    image = erode(image, kernel_size=(2, 2))
    return image

def predict_character(image, model):
    """Predict a single character using the trained model."""
    img = cv2.resize(image, (28, 28)) / 255.0  # Resize and normalize
    img = np.expand_dims(img, axis=[0, -1])  # Add batch and channel dimensions

    prediction = model.predict(img)
    predicted_label = np.argmax(prediction)
    predicted_char = CHAR_MAP[predicted_label]
    return predicted_char

def predict_captcha_string(image_path, model):
    """Predict the entire CAPTCHA string from the input image."""
    # Preprocess the image
    processed_image = preprocess_image(image_path)

    # Segment the characters
    char_images = segment_characters(processed_image)

    # Predict each character and reconstruct the string
    predicted_string = ""
    for char_img in char_images:
        predicted_char = predict_character(char_img, model)
        predicted_string += predicted_char

    return predicted_string

# Example usage
captcha_image_path = TEST_DATASET_PATH + "/2d76-0.png"
predicted_string = predict_captcha_string(captcha_image_path, model)
print(f"Predicted CAPTCHA string: {predicted_string}")


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 311ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
Predicted CAPTCHA string: 2d76


In [18]:
import os

def evaluate_on_unseen_data(unseen_dir, model, num_images=20):
    """Evaluate the model on the first `num_images` from the unseen directory."""
    correct_predictions = 0
    total_images = 0

    # Get the first `num_images` files from the directory
    all_files = [f for f in os.listdir(unseen_dir) if f.endswith('.png')]
    all_files = sorted(all_files)[:num_images]  # Limit to first `num_images`

    for filename in all_files:
        # Extract the true label from the filename (e.g., '2d76' from '2d76-0.png')
        true_label = filename.split('-')[0]

        # Get the full path to the image
        image_path = os.path.join(unseen_dir, filename)

        # Predict the CAPTCHA string
        predicted_string = predict_captcha_string(image_path, model)
        print(f"True: {true_label}, Predicted: {predicted_string}")

        # Compare the predicted string with the true label
        if predicted_string == true_label:
            correct_predictions += 1

        total_images += 1

    # Calculate accuracy
    accuracy = correct_predictions / total_images * 100
    print(f"\nAccuracy on the first {num_images} images: {accuracy:.2f}%")

# Example usage
unseen_dir = TEST_DATASET_PATH
evaluate_on_unseen_data(unseen_dir, model, 1000)


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
True: 002e23, Predicted: 002e23
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step
True: 03yl9s, Predicted: 03yi9s
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
