In [None]:
# load the data
df = pd.read_pickle('data/words_df_all.pkl.gz')

In [None]:
from tensorflow.keras.layers import StringLookup
batch_size = 64
padding_token = 99
image_width = 128
image_height = 32
# consider only SegmentationResluts == 'ok'
df = df[df['SegmentationResult'] == 'ok']
for i, img in enumerate(df['ImageData']):
    if type(img) != np.ndarray:
        # delete the current row from df
        df.drop(i, inplace=True)
df = df[df['ImageData'].notnull()]
vocabulary = sorted(set(''.join(df['Transcription'].values)))
# shuffle
df = df.sample(frac=1).reset_index(drop=True)
# consider only words that do not contain spaces or pontuation
punctuation = [' ', '.', ',', '!', '?', "'", '"', '(', ')', '[', ']', '{', '}', '/', '\\', '|', '*', '+', '=', '_', '#', '@', '%', '&', '^', '~', '`', '<', '>', ':', ';']
df = df[df['Transcription'].apply(lambda x: all(char not in x for char in punctuation))]
# Split data into training and validation sets
X = np.stack(df['ImageData'].values)  # Convert the list of images to a numpy array
X = tf.convert_to_tensor(X)
y = np.array(df['Transcription'].values)


char_to_num = StringLookup(vocabulary=vocabulary, mask_token=None)
num_to_char = StringLookup(
    vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True
)

def vectorize_label(label):
    label = char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8"))
    length = tf.shape(label)[0]
    pad_amount = max_len - length
    label = tf.pad(label, paddings=[[0, pad_amount]], constant_values=padding_token)
    return label

# calculate the maximum length of the labels
max_len = max([len(label) for label in y])
for i in range(len(y)):
    y[i] = vectorize_label(y[i])
y = tf.convert_to_tensor(list(y))

In [None]:
AUTOTUNE = tf.data.AUTOTUNE

def process_images_labels(image, label):
    return {"image": image, "labels": label}


def prepare_dataset(X, y):
    dataset = tf.data.Dataset.from_tensor_slices((X, y))
    dataset = dataset.map(process_images_labels, num_parallel_calls=AUTOTUNE)
    return dataset.batch(batch_size).cache().prefetch(AUTOTUNE)

dataset = prepare_dataset(X, y)

In [None]:
# Shuffle the dataset. The buffer size should be equal to or larger than the size of the dataset for perfect shuffling.
dataset = dataset.shuffle(buffer_size=len(X))

# Assuming you know the size of the dataset, if not you can compute it
total_size = len(X)  # Replace with actual size if not using the full dataset
train_size = int(total_size * 0.8)
val_size = int(total_size * 0.1)
test_size = total_size - train_size - val_size  # Remaining data for testing

# Define the training dataset
train_dataset = dataset.take(train_size)

# Skip over the training data and take the next chunk for validation
val_dataset = dataset.skip(train_size).take(val_size)

# Skip over the training and validation data for the test set
test_dataset = dataset.skip(train_size + val_size)

In [None]:
validation_images = []
validation_labels = []

for batch in val_dataset:
    validation_images.append(batch["images"])
    validation_labels.append(batch["labels"])

In [None]:
import keras
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Dense, Dropout, Reshape, LSTM, Layer

In [None]:
class CTCLayer(keras.layers.Layer):
    def __init__(self, name=None):
        super().__init__(name=name)
        self.loss_fn = keras.backend.ctc_batch_cost

    def call(self, y_true, y_pred):
        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
        input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
        label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

        input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)

        # At test time, just return the computed predictions.
        return y_pred

In [None]:
def create_cnn_rnn_model(input_shape=(32, 128, 1), num_classes=len(vocabulary)+2, sequence_length=max_len):
    # Define the input layers
    inputs = keras.Input(shape=input_shape, name="image")
    labels = keras.layers.Input(name="labels", shape=(None,))  # input for CTC
    
    # First CNN layer with 96 units
    x = Conv2D(96, (3, 3), activation='relu')(inputs)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    # Second CNN layer with 96 units
    x = Conv2D(96, (3, 3), activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    # Third CNN layer with 96 units
    x = Conv2D(96, (3, 3), activation='relu')(x)
    x = MaxPooling2D(pool_size=(1, 2))(x)

    # Flatten the output of the CNN layers
    x = tf.keras.layers.Flatten()(x)

    # Dense layer with sequence_length * num_classes units
    x = Dense(sequence_length * num_classes, activation='relu')(x)
    x = Dropout(0.1)(x)

    # Reshape the output for the RNN layers
    # (batch_size, time_steps, units)
    x = Reshape((sequence_length*num_classes,1))(x)

    # First RNN (LSTM) layer, 128 units, returns sequences for the next LSTM layer
    x = LSTM(128, return_sequences=True)(x)

    # Second RNN (LSTM) layer, 128 units, this time we do not return sequences
    x = LSTM(128)(x)

    """# Final Dense layer
    x = Dense(sequence_length * num_classes, activation='softmax')(x)

    # Reshape the output to match the target shape
    outputs = Reshape((sequence_length, num_classes))(x)

    # Create the model
    model = Model(inputs=inputs, outputs=outputs)

    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001))"""

    # Compile the model with Adam optimizer and a learning rate of 0.001
    # model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    #               loss='categorical_crossentropy',
    #               metrics=['accuracy'])

    # Final Dense layer
    x = Dense(num_classes*sequence_length, activation='softmax', name="dense2")(x)  # logits for CTC
    x = Reshape((sequence_length, num_classes))(x)

    # Define the CTC layer and get the loss
    ctc_output = CTCLayer(name='ctc_loss')(labels, x)

    # Create the model
    model = Model(inputs=[inputs, labels], outputs=ctc_output)

    # Compile the model with a dummy optimizer and loss since CTC loss is computed in the CTC layer
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001))
  
    return model

create_cnn_rnn_model(input_shape=(32, 128, 1), num_classes=len(vocabulary)+2, sequence_length=len(vocabulary)+2).summary()



Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 image (InputLayer)          [(None, 32, 128, 1)]         0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 30, 126, 96)          960       ['image[0][0]']               
                                                                                                  
 max_pooling2d (MaxPooling2  (None, 15, 63, 96)           0         ['conv2d[0][0]']              
 D)                                                                                               
                                                                                                  
 conv2d_1 (Conv2D)           (None, 13, 61, 96)           83040     ['max_pooling2d[0][0]']   

In [None]:
def calculate_edit_distance(labels, predictions):
    # Get a single batch and convert its labels to sparse tensors.
    saprse_labels = tf.cast(tf.sparse.from_dense(labels), dtype=tf.int64)

    # Make predictions and convert them to sparse tensors.
    input_len = np.ones(predictions.shape[0]) * predictions.shape[1]
    predictions_decoded = keras.backend.ctc_decode(
        predictions, input_length=input_len, greedy=True
    )[0][0][:, :max_len]
    sparse_predictions = tf.cast(
        tf.sparse.from_dense(predictions_decoded), dtype=tf.int64
    )

    # Compute individual edit distances and average them out.
    edit_distances = tf.edit_distance(
        sparse_predictions, saprse_labels, normalize=False
    )
    return tf.reduce_mean(edit_distances)


class EditDistanceCallback(keras.callbacks.Callback):
    def __init__(self, pred_model):
        super().__init__()
        self.prediction_model = pred_model

    def on_epoch_end(self, epoch, logs=None):
        edit_distances = []

        for i in range(len(validation_images)):
            labels = validation_labels[i]
            predictions = self.prediction_model.predict(validation_images[i])
            edit_distances.append(calculate_edit_distance(labels, predictions).numpy())

        print(
            f"Mean edit distance for epoch {epoch + 1}: {np.mean(edit_distances):.4f}"
        )

In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

epochs = 50  # To get good results this should be at least 50.

model = create_cnn_rnn_model()
prediction_model = keras.models.Model(
    model.get_layer(name="image").input, model.get_layer(name="dense2").output
)
edit_distance_callback = EditDistanceCallback(prediction_model)

# add callback to save the best model during the training and stop training when there is no improvement in the validation loss for 3 consecutive epochs
early_stopping = EarlyStopping(monitor='val_loss', patience=3)
best_model_path = 'crnn.h5'
model_checkpoint = ModelCheckpoint(best_model_path, 
                                   monitor='val_loss', 
                                   save_best_only=True, 
                                   verbose=1)
# Train the model.
history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=epochs,
    callbacks=[edit_distance_callback, early_stopping, model_checkpoint],
)



Epoch 1/50

2023-12-30 04:47:29.736358: W ./tensorflow/core/util/ctc/ctc_loss_calculator.h:499] No valid path found.




2023-12-30 04:54:18.008367: W ./tensorflow/core/util/ctc/ctc_loss_calculator.h:499] No valid path found.




ValueError: Expected input data to be non-empty.