In [1]:
import os
import tarfile
from tqdm import tqdm
from io import BytesIO
from zipfile import ZipFile
from urllib.request import urlopen

import torch
import torch.optim as optim
from torchsummaryX import summary

from mltu.torch.model import Model
from mltu.torch.losses import CTCLoss #to recognize words different lengths of words
from mltu.torch.dataProvider import DataProvider #multi process and threading, not to have a bottleneck
from mltu.torch.metrics import CERMetric, WERMetric 
from mltu.torch.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard, Model2onnx, ReduceLROnPlateau

from mltu.preprocessors import ImageReader #to read image
from mltu.transformers import ImageResizer, LabelIndexer, LabelPadding, ImageShowCV2 #Read and display
from mltu.augmentors import RandomBrightness, RandomRotate, RandomErodeDilate, RandomSharpen #augmentation for robustness
from mltu.annotations.images import CVImage

from model import Network
from configs import ModelConfigs

In [3]:
def download_and_unzip(url, extract_to="Datasets", chunk_size=1024*1024):
    http_response = urlopen(url)

    data = b""
    iterations = http_response.length // chunk_size + 1
    for _ in tqdm(range(iterations)):
        data += http_response.read(chunk_size)

    zipfile = ZipFile(BytesIO(data))
    zipfile.extractall(path=extract_to)

dataset_path = os.path.join("Datasets", "IAM_Words")
if not os.path.exists(dataset_path):
    download_and_unzip("https://git.io/J0fjL", extract_to="Datasets")

    file = tarfile.open(os.path.join(dataset_path, "words.tgz"))
    file.extractall(os.path.join(dataset_path, "words"))

100%|██████████| 784/784 [03:10<00:00,  4.12it/s]


In [4]:
dataset, vocab, max_len = [], set(), 0

# Preprocess the dataset by the specific IAM_Words dataset file structure
words = open(os.path.join(dataset_path, "words.txt"), "r").readlines()
for line in tqdm(words):
    if line.startswith("#"):
        continue

    line_split = line.split(" ")
    if line_split[1] == "err":
        continue

    folder1 = line_split[0][:3]
    folder2 = "-".join(line_split[0].split("-")[:2])
    file_name = line_split[0] + ".png"
    label = line_split[-1].rstrip("\n")

    rel_path = os.path.join(dataset_path, "words", folder1, folder2, file_name)
    if not os.path.exists(rel_path):
        print(f"File not found: {rel_path}")
        continue

    dataset.append([rel_path, label])
    vocab.update(list(label))
    max_len = max(max_len, len(label))

configs = ModelConfigs()

# Save vocab and maximum text length to configs
configs.vocab = "".join(sorted(vocab))
configs.max_text_length = max_len
configs.save()

100%|██████████| 115338/115338 [00:10<00:00, 11195.16it/s]


In [5]:
# Create a data provider for the dataset
data_provider = DataProvider(
    dataset=dataset,
    skip_validation=True,
    batch_size=configs.batch_size,
    data_preprocessors=[ImageReader(CVImage)],
    transformers=[
        # ImageShowCV2(), # uncomment to show images when iterating over the data provider
        ImageResizer(configs.width, configs.height, keep_aspect_ratio=False),
        LabelIndexer(configs.vocab),
        LabelPadding(max_word_length=configs.max_text_length, padding_value=len(configs.vocab))
        ],
    use_cache=True,
)

In [1]:
# for _ in data_provider:
#     pass

In [7]:
# Split the dataset into training and validation sets
train_dataProvider, test_dataProvider = data_provider.split(split = 0.9)

# Augment training data with random brightness, rotation and erode/dilate
train_dataProvider.augmentors = [
    RandomBrightness(), 
    RandomErodeDilate(),
    RandomSharpen(),
    RandomRotate(angle=10), 
    ]

In [8]:
network = Network(len(configs.vocab), activation="leaky_relu", dropout=0.3)
loss = CTCLoss(blank=len(configs.vocab))
optimizer = optim.Adam(network.parameters(), lr=configs.learning_rate)

# uncomment to print network summary, torchsummaryX package is required
summary(network, torch.zeros((1, configs.height, configs.width, 3)))

                                Kernel Shape      Output Shape    Params  \
Layer                                                                      
0_rb1.convb1.Conv2d_conv       [3, 16, 3, 3]  [1, 16, 32, 128]     448.0   
1_rb1.convb1.BatchNorm2d_bn             [16]  [1, 16, 32, 128]      32.0   
2_rb1.LeakyReLU_act1                       -  [1, 16, 32, 128]         -   
3_rb1.convb2.Conv2d_conv      [16, 16, 3, 3]  [1, 16, 32, 128]     2.32k   
4_rb1.convb2.BatchNorm2d_bn             [16]  [1, 16, 32, 128]      32.0   
5_rb1.Conv2d_shortcut          [3, 16, 1, 1]  [1, 16, 32, 128]      64.0   
6_rb1.LeakyReLU_act2                       -  [1, 16, 32, 128]         -   
7_rb1.Dropout_dropout                      -  [1, 16, 32, 128]         -   
8_rb2.convb1.Conv2d_conv      [16, 16, 3, 3]   [1, 16, 16, 64]     2.32k   
9_rb2.convb1.BatchNorm2d_bn             [16]   [1, 16, 16, 64]      32.0   
10_rb2.LeakyReLU_act1                      -   [1, 16, 16, 64]         -   
11_rb2.convb

  df_sum = df.sum()


Unnamed: 0_level_0,Kernel Shape,Output Shape,Params,Mult-Adds
Layer,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0_rb1.convb1.Conv2d_conv,"[3, 16, 3, 3]","[1, 16, 32, 128]",448.0,1769472.0
1_rb1.convb1.BatchNorm2d_bn,[16],"[1, 16, 32, 128]",32.0,16.0
2_rb1.LeakyReLU_act1,-,"[1, 16, 32, 128]",,
3_rb1.convb2.Conv2d_conv,"[16, 16, 3, 3]","[1, 16, 32, 128]",2320.0,9437184.0
4_rb1.convb2.BatchNorm2d_bn,[16],"[1, 16, 32, 128]",32.0,16.0
...,...,...,...,...
65_rb9.LeakyReLU_act2,-,"[1, 64, 4, 16]",,
66_rb9.Dropout_dropout,-,"[1, 64, 4, 16]",,
67_lstm,-,"[1, 64, 256]",198656.0,196608.0
68_lstm_dropout,-,"[1, 64, 256]",,


In [11]:
input_shape=(1, configs.height, configs.width, 3)

In [9]:
# put on cuda device if available
if torch.cuda.is_available():
    network = network.cuda()

# create callbacks
earlyStopping = EarlyStopping(monitor="val_CER", patience=20, mode="min", verbose=1)
modelCheckpoint = ModelCheckpoint(configs.model_path + "/model.pt", monitor="val_CER", mode="min", save_best_only=True, verbose=1)
tb_callback = TensorBoard(configs.model_path + "/logs")
reduce_lr = ReduceLROnPlateau(monitor="val_CER", factor=0.9, patience=10, verbose=1, mode="min", min_lr=1e-6)
model2onnx = Model2onnx(
    saved_model_path=configs.model_path + "/model.pt",
    input_shape=(1, configs.height, configs.width, 3), 
    verbose=1,
    metadata={"vocab": configs.vocab}
    )

# create model object that will handle training and testing of the network
model = Model(network, optimizer, loss, metrics=[CERMetric(configs.vocab), WERMetric(configs.vocab)])
model.fit(
    train_dataProvider, 
    test_dataProvider, 
    epochs=1000, 
    callbacks=[earlyStopping, modelCheckpoint, tb_callback, reduce_lr, model2onnx]
    )

# Save training and validation datasets as csv files
train_dataProvider.to_csv(os.path.join(configs.model_path, "train.csv"))
test_dataProvider.to_csv(os.path.join(configs.model_path, "val.csv"))

 [1mIters[0m     [1mElapsed Time[0m      [1mSpeed[0m                                              
 [99m0/[93m1357[0m[0m  [99m        -        [0m  [99m   -    [0m                                            
  return F.conv2d(input, weight, bias, self.stride,
[K[F[K[F [1mIters[0m     [1mElapsed Time[0m      [1mSpeed[0m      [1mloss[0m      [1mCER[0m    [1mWER[0m   [1mlr[0m               
 [99m1/[93m1357[0m[0m  [99m00:00:10<[93m03:50:48[0m[0m  [99m0.10it/s[0m  [99m106.9154[0m  [99m13.8403[0m  [99m1.0[0m  [99m0.002[0m             
[1mEpoch 1: [0m   0.1% |                                                             |[K[F[K[F [1mIters[0m     [1mElapsed Time[0m      [1mSpeed[0m     [1mloss[0m     [1mCER[0m    [1mWER[0m   [1mlr[0m                 
 [99m2/[93m1357[0m[0m  [99m00:00:11<[93m02:15:05[0m[0m  [99m0.17it/s[0m  [99m96.9251[0m  [99m7.4397[0m  [99m1.0[0m  [99m0.002[0m               
[1mEpoch 1: [0

KeyboardInterrupt: 

In [4]:
import os
import cv2
import pytesseract
from PIL import Image

# Configure the path to Tesseract if necessary
# pytesseract.pytesseract.tesseract_cmd = r'C:/Program Files/Tesseract-OCR/tesseract.exe'

# Load the image
image_path = "C:/Users/wangd/Ismail/NMIMS/Extra/Hackathon/Marsh_Mclennan/a01-000u copy.png"  # Replace with your image path
image = cv2.imread(image_path)

# Use Tesseract to get word bounding boxes
data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)

# Create a folder to store the word images
output_folder = 'word_images'
os.makedirs(output_folder, exist_ok=True)

# Initialize variables to track merged words
word_images = []
merged_word = None

# Loop through each detected word
for i in range(len(data['text'])):
    word = data['text'][i]
    if word.strip():  # Check if the word is not just whitespace
        x = data['left'][i]
        y = data['top'][i]
        w = data['width'][i]
        h = data['height'][i]

        if merged_word is None:
            merged_word = (x, y, w, h, word)
        else:
            # Calculate the distance and check for vertical alignment
            distance = x - (merged_word[0] + merged_word[2])  # distance from end of last word
            if (abs(y - merged_word[1]) < h) and (distance < 15):  # adjust threshold as necessary
                # Merge the words
                merged_word = (
                    min(merged_word[0], x),
                    min(merged_word[1], y),
                    (merged_word[0] + merged_word[2]) - min(merged_word[0], x) + w,
                    max(merged_word[1] + merged_word[3], y + h) - min(merged_word[1], y),
                    merged_word[4] + ' ' + word
                )
            else:
                # Save the previous merged word
                word_images.append(merged_word)
                # Start a new merged word
                merged_word = (x, y, w, h, word)

# Save the last merged word
if merged_word:
    word_images.append(merged_word)

# Now, save each merged word as an image
for i, (x, y, w, h, word) in enumerate(word_images):
    # Crop the merged word from the image
    word_image = image[y:y+h, x:x+w]

    # Convert to PIL Image for saving
    pil_image = Image.fromarray(word_image)

    # Save the merged word image
    word_filename = os.path.join(output_folder, f'word_{i + 1}.png')
    pil_image.save(word_filename)

print(f"Extracted {len(word_images)} words into '{output_folder}' folder.")


Extracted 50 words into 'word_images' folder.


In [5]:
import cv2
import numpy as np
import os
from mltu.inferenceModel import OnnxInferenceModel
from mltu.utils.text_utils import ctc_decoder

class ImageToWordModel(OnnxInferenceModel):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def predict(self, image: np.ndarray):
        image = cv2.resize(image, self.input_shapes[0][1:3][::-1])
        image_pred = np.expand_dims(image, axis=0).astype(np.float32)
        preds = self.model.run(self.output_names, {self.input_names[0]: image_pred})[0]
        text = ctc_decoder(preds, self.metadata["vocab"])[0]
        return text

if __name__ == "__main__":
    # Initialize the model
    model = ImageToWordModel(model_path="Models/08_handwriting_recognition_torch/202410071035/model.onnx")

    # Specify the folder containing images
    folder_path = "./word_images"  # Replace with the path to your images
    output_file = "extracted_text.txt"  # Output file for extracted text

    # Initialize a list to hold the extracted text
    full_text = []

    # Get a sorted list of image files in the folder
    image_files = sorted(
        [f for f in os.listdir(folder_path) if f.endswith(('.png', '.jpg', '.jpeg'))],
        key=lambda x: int(x.split('_')[1].split('.')[0])  # Sort by the number in the filename
    )

    # Loop through each sorted image file
    for filename in image_files:
        image_path = os.path.join(folder_path, filename)
        image = cv2.imread(image_path)

        # Make sure the image path is valid
        if image is not None:
            # Predict text for the image
            prediction_text = model.predict(image)

            # Append the predicted text to the list
            full_text.append(prediction_text)
            print(f"Processed {filename}: {prediction_text}")
        else:
            print(f"Failed to load image: {image_path}")

    # Write the extracted text as a single paragraph
    with open(output_file, 'w') as f:
        # Join the text with spaces for a single paragraph format
        f.write(' '.join(full_text))

    print(f"Text extraction completed. Check the file: {output_file}")


Processed word_1.png: 1
Processed word_2.png: MOVE
Processed word_3.png: to
Processed word_4.png: stop
Processed word_5.png: Mr.
Processed word_6.png: Gait
Processed word_7.png: kell
Processed word_8.png: from
Processed word_9.png: non
Processed word_10.png: rat
Processed word_11.png: by
Processed word_12.png: any
Processed word_13.png: more
Processed word_14.png: Labour
Processed word_15.png: life
Processed word_16.png: Peers
Processed word_17.png: is
Processed word_18.png: to
Processed word_19.png: be
Processed word_20.png: made
Processed word_21.png: ata
Processed word_22.png: meeting
Processed word_23.png: of
Processed word_24.png: Labour
Processed word_25.png: Ps
Processed word_26.png: tomorrrow
Processed word_27.png: Mr.
Processed word_28.png: Michal
Processed word_29.png: Foot
Processed word_30.png: has
Processed word_31.png: put
Processed word_32.png: down
Processed word_33.png: a
Processed word_34.png: reslution
Processed word_35.png: on
Processed word_36.png: the
Processed wo