In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '20'

In [2]:
import tensorflow as tf

In [3]:
tf.__version__

'2.15.1'

In [4]:
tf.config.experimental.list_physical_devices()

[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU'),
 PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

In [5]:
print(tf.config.list_physical_devices('GPU'))

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [6]:
device = tf.config.list_physical_devices('GPU')[0]
tf.config.experimental.set_memory_growth(device, True)

In [7]:
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
from keras.layers import Lambda, Concatenate, Reshape, Conv2D, MaxPooling2D, Dropout, Dense, Flatten, LSTM, Input, BatchNormalization, GlobalAveragePooling2D
from keras.layers import Embedding
from keras.applications import ResNet50
from keras.applications.resnet50 import preprocess_input
import os
from keras import layers

In [8]:
train_dir = "mergedData/"

In [9]:
path = os.listdir(train_dir)
lbl = []
for e in path:
    lbl.append(e.split('_')[0])

In [10]:
characters = set(char for label in lbl for char in label)
characters.add('G')
characters.add('E')

characters = sorted(list(characters))


char_to_num = layers.StringLookup(
    vocabulary=list(characters), mask_token=None
)
num_to_char = layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True
)

In [11]:
dict_ = char_to_num.get_vocabulary()

In [12]:
image_paths = []
tokenized_lbls = []
for i in path:
    image_paths.append(os.path.join(train_dir, i))
    temp = []
    for char in i.split('_')[0]:
        temp.append(int(char_to_num(char)))
    tokenized_lbls.append([int(char_to_num('G'))] + temp + [int(char_to_num('E'))])

In [13]:
tf.config.optimizer.set_jit(True)
AUTOTUNE = tf.data.experimental.AUTOTUNE
desired_height = 65
desired_width = 256

def normalization(image):
    image = tf.cast(image, tf.float32)
    image = image / 255.0
    return image

def load_image(path):
    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image)
    image = normalization(image)    
    image = tf.image.resize(image, [desired_height, desired_width])
    return image

# def data_augmentation(image):
#     image = tf.image.adjust_contrast(image, 0.4)
#     image = tf.image.adjust_brightness(image, 0.3)
#     image = tf.image.adjust_saturation(image, 0.3)
#     return image




def load_image_label(path, label):
    image = load_image(path)
    image = tf.transpose(image, perm=[1, 0, 2])
    # image = data_augmentation(image)
    label = tf.cast(label, tf.float32)
    return (image, label), label[1:]
    
SHUFFLE_BUFFER_SIZE = 256
BATCH_SIZE = 32

In [14]:
dataset = tf.data.Dataset.from_tensor_slices((image_paths, tokenized_lbls))


validation_split = 0.1
DATASET_SIZE = len(list(dataset))
print("Dataset size: ", DATASET_SIZE)
train_size = int((1-validation_split) * DATASET_SIZE)
print("train size: ", train_size)
train_dataset = dataset.take(train_size)
validation_dataset = dataset.skip(train_size)



train_dataset = train_dataset.map(load_image_label, num_parallel_calls=AUTOTUNE)
train_dataset = train_dataset.shuffle(SHUFFLE_BUFFER_SIZE)
train_dataset = train_dataset.batch(BATCH_SIZE, num_parallel_calls=AUTOTUNE)
train_dataset = train_dataset.prefetch(AUTOTUNE)

validation_dataset = validation_dataset.map(load_image_label, num_parallel_calls=AUTOTUNE)
validation_dataset = validation_dataset.batch(BATCH_SIZE, num_parallel_calls=AUTOTUNE)
validation_dataset = validation_dataset.prefetch(AUTOTUNE)


Dataset size:  8519
train size:  7667


In [15]:
from keras.layers import Dense, Activation, Conv2D, MaxPool2D, GlobalAveragePooling1D, Input, Dropout, BatchNormalization

In [16]:
ENCODER_UNITS = 256
EMBEDDING_DIM = 128
ATTENTION_UNITS = 8
VOCAB_SIZE = len(dict_)
START_TOKEN = char_to_num('G')
END_TOKEN = char_to_num('E')

In [17]:
base = ResNet50(include_top=False, input_shape=(desired_height, desired_width, 3))
base.trainable = False

In [18]:
base = keras.models.Model(inputs=base.inputs, outputs=base.get_layer('conv4_block6_out').output)

In [19]:
new_shape = (desired_width // 16, (desired_height // 16) * 256)
nclasses = VOCAB_SIZE

# Encoder
input_layer = tf.keras.Input(shape=(desired_width, desired_height, 3))
conv2d1 = layers.Conv2D(32, (3, 3), activation="relu", padding="same")(input_layer)
pool1 = layers.MaxPooling2D((2, 2))(conv2d1)

conv2d2 = layers.Conv2D(64, (3, 3), activation="relu", padding="same")(pool1)
pool2 = layers.MaxPooling2D((2, 2))(conv2d2)

conv2d3 = layers.Conv2D(128, (3, 3), activation="relu", padding="same")(pool2)
pool3 = layers.MaxPooling2D((2, 2))(conv2d3)

conv2d4 = layers.Conv2D(256, (3, 3), activation="relu", padding="same")(pool3)
pool4 = layers.MaxPooling2D((2, 2))(conv2d4)

reshape = layers.Reshape(target_shape=new_shape)(pool4)
reshape = Dropout(0.4)(reshape)
encoder_outputs, h, c = layers.LSTM(units=ENCODER_UNITS, return_sequences=True, return_state=True, dropout=0.2)(reshape)

encoder = keras.models.Model(inputs=[input_layer], outputs=[encoder_outputs, h, c])

In [20]:
encoder.summary()

Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 256, 65, 3)]      0         
                                                                 
 conv2d (Conv2D)             (None, 256, 65, 32)       896       
                                                                 
 max_pooling2d (MaxPooling2  (None, 128, 32, 32)       0         
 D)                                                              
                                                                 
 conv2d_1 (Conv2D)           (None, 128, 32, 64)       18496     
                                                                 
 max_pooling2d_1 (MaxPoolin  (None, 64, 16, 64)        0         
 g2D)                                                            
                                                                 
 conv2d_2 (Conv2D)           (None, 64, 16, 128)       7385

In [21]:
class Attention(keras.models.Model):
    def __init__(self, attention_units):
        super(Attention, self).__init__()
        self.attention_units = attention_units

        self.W1 = Dense(units=attention_units, use_bias=True)
        self.W2 = Dense(units=attention_units, use_bias=True)
        self.V = Dense(units=1, use_bias=True)

    def call(self, encoder_outputs, decoder_state, training=False):
        # Encoder_outputs: [Batch_size, num_sequences, encoder_units]
        # decoder_state: [Batch_size, decoder_units]
        decoder_state = tf.expand_dims(decoder_state, axis=1)
        # e: [Batch_size, num_sequences, 1]
        e = self.V(tf.nn.tanh(self.W1(encoder_outputs) + self.W2(decoder_state)))
        alpha = tf.nn.softmax(e, axis=1)

        # [Batch_size, encoder_units]
        return tf.reduce_sum(alpha * encoder_outputs, axis=1)

In [22]:
decoder_inputs = keras.Input(shape=(None,))
encoder_outputs = keras.Input(shape=(None, ENCODER_UNITS))
h_inp = keras.Input(shape=(ENCODER_UNITS,))
c_inp = keras.Input(shape=(ENCODER_UNITS,))

attention = Attention(ATTENTION_UNITS)
decoder_state = tf.concat([h_inp, c_inp], axis=-1)
context = attention(encoder_outputs, decoder_state)
context = tf.expand_dims(context, axis=1, name='concat_attention')

# Embed: [Batch_size, 1, num_units]
embed = Embedding(input_dim=VOCAB_SIZE, output_dim=EMBEDDING_DIM)(decoder_inputs)
context = tf.concat([embed, context], axis=-1)

lstm_layer, h, c = LSTM(ENCODER_UNITS, return_sequences=True, return_state=True, dropout=0.2)(inputs=context, initial_state=[h_inp, c_inp])
lstm_layer = Dropout(0.4)(lstm_layer)
output = Dense(VOCAB_SIZE, activation='softmax')(lstm_layer)
decoder = keras.models.Model(inputs=[decoder_inputs, encoder_outputs, h_inp, c_inp], outputs=[output, h, c])
decoder.summary()


Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_5 (InputLayer)        [(None, 256)]                0         []                            
                                                                                                  
 input_6 (InputLayer)        [(None, 256)]                0         []                            
                                                                                                  
 input_4 (InputLayer)        [(None, None, 256)]          0         []                            
                                                                                                  
 tf.concat (TFOpLambda)      (None, 512)                  0         ['input_5[0][0]',             
                                                                     'input_6[0][0]']       

In [23]:
class MyModel(keras.models.Model):
    def __init__(self, encoder, decoder):
        super(MyModel, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    @tf.function
    def call(self, inputs, training=False):
        source, target = inputs
        batch_size = tf.shape(source)[0]
        encoder_outputs, h, c = self.encoder(source, training=training)
        output_array = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
        for i in range(1, tf.shape(target)[1]):
            decoder_output, h, c = self.decoder([target[:, i - 1, tf.newaxis], encoder_outputs, h, c], training=training)
            decoder_output = tf.squeeze(decoder_output, axis=1)
            output_array = output_array.write(i - 1, decoder_output)

        outputs = output_array.stack()
        outputs = tf.transpose(outputs, perm=[1, 0, 2])
        return outputs

In [24]:
model = MyModel(encoder, decoder)

In [25]:
opt = tf.keras.optimizers.Adam(learning_rate=1e-3)
loss_obj = keras.losses.SparseCategoricalCrossentropy()
accuracy = keras.metrics.Accuracy()

model.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy']
)
early_stopping_patience = 15
early_stopping = keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=early_stopping_patience, restore_best_weights=True
)
reduce_lr = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.7,
                              patience=3, min_lr=0.0001)

callbacks = [keras.callbacks.TensorBoard(), early_stopping, reduce_lr]

In [26]:
model.fit(train_dataset, epochs=50, validation_data=validation_dataset, callbacks=callbacks)

Epoch 1/50


I0000 00:00:1719527348.010845   59301 device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.src.callbacks.History at 0x7ff7f45416d0>

In [27]:
import cv2
test_image = cv2.imread('4.JPG', 0)
test_image = cv2.cvtColor(test_image, cv2.COLOR_GRAY2RGB)
resized_image = cv2.resize(test_image, (256, 65))
resized_image = tf.transpose(resized_image, perm=[1, 0, 2])
resized_image = resized_image[np.newaxis, ...]
resized_image = tf.cast(resized_image, tf.float32)
resized_image = resized_image / 255.0

In [28]:
encoder_outputs, h, c = encoder(resized_image, training=False)
outputs = tf.expand_dims([START_TOKEN] * 1, axis=1)
for i in range(9):
    dec_inp = outputs[:, -1]
    dec_inp = dec_inp[:, tf.newaxis]
    output, h, c = decoder([dec_inp, encoder_outputs, h, c], training=False)
    output = tf.argmax(output[:, 0], axis=-1)
    outputs = tf.concat([outputs, output[tf.newaxis, ...]], axis=-1)
    print(output)

tf.Tensor([2], shape=(1,), dtype=int64)
tf.Tensor([2], shape=(1,), dtype=int64)
tf.Tensor([11], shape=(1,), dtype=int64)
tf.Tensor([10], shape=(1,), dtype=int64)
tf.Tensor([3], shape=(1,), dtype=int64)
tf.Tensor([5], shape=(1,), dtype=int64)
tf.Tensor([3], shape=(1,), dtype=int64)
tf.Tensor([4], shape=(1,), dtype=int64)
tf.Tensor([15], shape=(1,), dtype=int64)


In [29]:
''.join([num_to_char(x).numpy().decode('utf-8') for x in outputs.numpy()[0]][1:-1])

'11A92423'

In [30]:
model.save('TForcingAttentionModel.tf')

INFO:tensorflow:Assets written to: TForcingAttentionModel.tf/assets


INFO:tensorflow:Assets written to: TForcingAttentionModel.tf/assets










In [31]:
# best_model = tf.keras.models.load_model('./attention_model_verygood.tf/')