# Architecture

### CNN to extract the features of the image

### RNN to interpret the sequential data extracted by CNNs, LSTM (Long Short-Term Memory) units are often used due to their efficiency in handling sequences.

### CTC Loss To handle the alignment between input sequences (the image features) and the target sequences (the transcribed text).

In [None]:
import pandas as pd
import re
import cv2 
import os
import numpy as np
import tensorflow as tf
from keras import backend as K
from keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, Reshape, Bidirectional, LSTM, Dense, Lambda, Activation, BatchNormalization, Dropout
from keras.optimizers import Adam
import matplotlib.pyplot as plt

: 

In [None]:
with open(r'bressay-competition\bressay\sets\test.txt', 'r') as file:
    test_content = file.readlines()

with open(r'bressay-competition\bressay\sets\training.txt', 'r') as file:
    training_content = file.readlines()

with open(r'bressay-competition\bressay\sets\validation.txt', 'r') as file:
    validation_content = file.readlines()



In [None]:
# counter_rasura = 0
# if "--xxx---" in data_text:
#     counter_rasura += 1

# Extracting line dataset

In [None]:
lines_directory = r'bressay-competition\bressay\data\lines'
test = {}
train = {}
val = {}

def count_unique_characters(*data_structures):
    unique_characters = set()

    for data_structure in data_structures:
        for data in data_structure.values():
            if 'txt' in data:
                for text_entry in data['txt']:
                    unique_characters.update(text_entry)

    sorted_characters = sorted(unique_characters)
    characters_string = ''.join(sorted_characters)
    return characters_string


def find_max_text_length(*data_structures):
    max_length = 0
    for data_structure in data_structures:
        len_line = 0
        for data in data_structure.values():
            if 'txt' in data:
                for line in data["txt"]:
                    if len(line) > len_line:
                        len_line = len(line)
                if len_line > max_length:
                    max_length = len_line

    return max_length

def preprocess(img):
    (h, w) = img.shape
    
    final_img = np.ones([64, 256])*255 # blank white image
    
    # crop
    if w > 256:
        img = img[:, :256]
        
    if h > 64:
        img = img[:64, :]
    
    
    final_img[:h, :w] = img
    return cv2.rotate(final_img, cv2.ROTATE_90_CLOCKWISE)

def collect_file_data_to_dataframe(dir_name, base_path):
    data = []
    full_path = os.path.join(base_path, dir_name)
    
    if os.path.isdir(full_path):
        for file in os.listdir(full_path):
            if file.endswith('.png'):
                img_file_path = os.path.join(full_path, file)
                txt_file_name = file.replace('.png', '.txt')
                txt_file_path = os.path.join(full_path, txt_file_name)
                
                if os.path.exists(txt_file_path):
                    with open(txt_file_path, 'r', encoding='utf-8') as txt_file:
                        identity = txt_file.read()
                        data.append([img_file_path, identity])
    
    return pd.DataFrame(data, columns=['FILENAME', 'IDENTITY'])



    

In [None]:
test_dfs = []
training_dfs = []
val_dfs = []

for dir_name in test_content:
    dir_name = dir_name.strip()
    test_dfs.append(collect_file_data_to_dataframe(dir_name, lines_directory))

for dir_name in training_content:
    dir_name = dir_name.strip()
    training_dfs.append(collect_file_data_to_dataframe(dir_name, lines_directory))

# for dir_name in val_content:
#     dir_name = dir_name.strip()
#     val_dfs.append(collect_file_data_to_dataframe(dir_name, lines_directory))

# Concatenate all DataFrames in the lists
test = pd.concat(test_dfs, ignore_index=True)
train = pd.concat(training_dfs, ignore_index=True)
# val_df = pd.concat(val_dfs, ignore_index=True)


In [None]:
train_size = 19628
test_size = 56

In [None]:
train_x = []
for i in range(train_size):
    
    relative_path = train.loc[i, 'FILENAME'].replace('\\\\', '\\')
    img_dir = fr"{relative_path}"

    image = cv2.imread(img_dir, cv2.IMREAD_GRAYSCALE)
    
    if image is not None:
        image = preprocess(image)
        image = image / 255.0  # Normalize
        train_x.append(image)
    else:
        print(f"Image not found or unable to load: {img_dir}")

train_x = np.array(train_x)

In [None]:
test_x = []
for i in range(test_size):
    
    relative_path = test.loc[i, 'FILENAME'].replace('\\\\', '\\')
    img_dir = fr"{relative_path}"

    image = cv2.imread(img_dir, cv2.IMREAD_GRAYSCALE)
    
    if image is not None:
        image = preprocess(image)
        image = image / 255.0  # Normalize
        test_x.append(image)
    else:
        print(f"Image not found or unable to load: {img_dir}")

test_x = np.array(test_x)

In [None]:
train_x = np.array(train_x).reshape(-1, 256, 64, 1)
test_x = np.array(test_x).reshape(-1, 256, 64, 1)

### Vectorizing txt (preparing for CTC loss)

In [None]:
def preprocess_text(text):
    special_token_mappings = {
        '--xxx--': '+',
        '@@???@@': '&',
        '##--xxx--##': '%',
        '$$--xxx--$$': '`',
        '##@@???@@##': '{',
        '$$@@???@@$$': '}'
    }
    for token, unique_char in special_token_mappings.items():
        text = text.replace(token, unique_char)
    return text

train['IDENTITY'] = train['IDENTITY'].apply(preprocess_text)
test['IDENTITY'] = test['IDENTITY'].apply(preprocess_text)


train['IDENTITY'] = train['IDENTITY'].apply(preprocess_text)
test['IDENTITY'] = test['IDENTITY'].apply(preprocess_text)


In [None]:
# with open("train", 'w', encoding='utf-8') as file:
#     # Iterate over each row in the dataframe
#     for index, row in train.iterrows():
#         # Write the formatted string to the file
#         file.write(f"{row['FILENAME']}\t{row['IDENTITY']}\n")


In [None]:
alphabets = r' +&!"%`\'(),}-.{/0123456789$:;=#?ABCDEFGHIJKLMNOPQRSTUVWXYZ\\]`abcdefghijklmnopqrstuvwxyz|ª°´ºÀÁÈÉÍÓÚàáâãçèéêíóôõúü'
max_str_len = 250
num_of_characters = len(alphabets) + 1
num_of_timestamps = 64


def label_to_num(label):
    label_num = []
    for ch in label:
        label_num.append(alphabets.find(ch))
        
    return np.array(label_num)

def num_to_label(num):
    ret = ""
    for ch in num:
        if ch == -1:  # CTC Blank
            break
        else:
            ret+=alphabets[ch]
    return ret

In [None]:
name = 'ra crítica por meio dos & metragens em exibição'
print(name, '\n',label_to_num(name))

In [None]:
train_size

In [None]:
train_y = np.ones([train_size, max_str_len]) * -1
train_label_len = np.zeros([train_size, 1])
train_input_len = np.ones([train_size, 1]) * (num_of_timestamps-2)
train_output = np.zeros([train_size])

for i in range(train_size):
    train_label_len[i] = len(train.loc[i, 'IDENTITY'])
    train_y[i, 0:len(train.loc[i, 'IDENTITY'])]= label_to_num(train.loc[i, 'IDENTITY']) 

In [None]:
test_y = np.ones([test_size, max_str_len]) * -1
test_label_len = np.zeros([test_size, 1])
test_input_len = np.ones([test_size, 1]) * (num_of_timestamps-2)
test_output = np.zeros([test_size])

for i in range(test_size):
    test_label_len[i] = len(test.loc[i, 'IDENTITY'])
    test_y[i, 0:len(test.loc[i, 'IDENTITY'])]= label_to_num(test.loc[i, 'IDENTITY'])  

In [None]:
print('True label : ',train.loc[100, 'IDENTITY'] , '\ntrain_y : ',train_y[100],'\ntrain_label_len : ',train_label_len[100], 
      '\ntrain_input_len : ', train_input_len[100])

In [None]:
print('True label : ',test.loc[10, 'IDENTITY'] , '\ntrain_y : ',test_y[10],'\ntrain_label_len : ',test_label_len[10], 
      '\ntrain_input_len : ', test_input_len[10])

# Model

In [None]:
input_data = Input(shape=(256, 64, 1), name='input')

inner = Conv2D(32, (3, 3), padding='same', name='conv1', kernel_initializer='he_normal')(input_data)  
inner = BatchNormalization()(inner)
inner = Activation('relu')(inner)
inner = MaxPooling2D(pool_size=(2, 2), name='max1')(inner)

inner = Conv2D(64, (3, 3), padding='same', name='conv2', kernel_initializer='he_normal')(inner)
inner = BatchNormalization()(inner)
inner = Activation('relu')(inner)
inner = MaxPooling2D(pool_size=(2, 2), name='max2')(inner)
inner = Dropout(0.3)(inner)

inner = Conv2D(128, (3, 3), padding='same', name='conv3', kernel_initializer='he_normal')(inner)
inner = BatchNormalization()(inner)
inner = Activation('relu')(inner)
inner = MaxPooling2D(pool_size=(1, 2), name='max3')(inner)
inner = Dropout(0.3)(inner)

# CNN to RNN
inner = Reshape(target_shape=((64, 1024)), name='reshape')(inner)
inner = Dense(64, activation='relu', kernel_initializer='he_normal', name='dense1')(inner)

## RNN
inner = Bidirectional(LSTM(256, return_sequences=True), name = 'lstm1')(inner)
inner = Bidirectional(LSTM(256, return_sequences=True), name = 'lstm2')(inner)

## OUTPUT
inner = Dense(num_of_characters, kernel_initializer='he_normal',name='dense2')(inner)
y_pred = Activation('softmax', name='softmax')(inner)

model = Model(inputs=input_data, outputs=y_pred)
model.summary()

In [None]:
def ctc_lambda_func(args):
    y_pred, labels, input_length, label_length = args
    # the 2 is critical here since the first couple outputs of the RNN
    # tend to be garbage
    y_pred = y_pred[:, 2:, :]
    return K.ctc_batch_cost(labels, y_pred, input_length, label_length)

In [None]:
labels = Input(name='gtruth_labels', shape=[max_str_len], dtype='float32')
input_length = Input(name='input_length', shape=[1], dtype='int64')
label_length = Input(name='label_length', shape=[1], dtype='int64')

ctc_loss = Lambda(ctc_lambda_func, output_shape=(1,), name='ctc')([y_pred, labels, input_length, label_length])
model_final = Model(inputs=[input_data, labels, input_length, label_length], outputs=ctc_loss)

# TRAINNNN!!

In [None]:
model_final.compile(loss={'ctc': lambda y_true, y_pred: y_pred}, optimizer=Adam(lr = 0.0001))

model_final.fit(x=[train_x, train_y, train_input_len, train_label_len], y=train_output, 
                validation_data=([test_x, test_y, test_input_len, test_label_len], test_output),
                epochs=60, batch_size=128)