In [1]:
from __future__ import absolute_import, division, print_function, unicode_literals
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import re
import numpy as np
import os
import time
import json
from glob import glob
from PIL import Image
import pickle
from pathlib import Path
import random

%matplotlib inline

In [2]:
import os
print(os.getcwd())
os.chdir('D:\python')
print(os.getcwd())

C:\Users\user\Documents\python
D:\python


## Preprocess and tokenize

In [3]:
data_path = './words_captcha/spec_train_val.txt'
input_file = open(data_path, 'r')
image_names = [] 
record_list = []
for line in input_file:
    line = line.strip()
    sp = line.split(' ')
    image_names.append('./words_captcha/' + sp[0] + '.png')
    record_list.append(['<start>']+list(sp[1])+['<end>'])

In [4]:
train_image_name = image_names[:100000]
train_record_list = record_list[:100000]
val_image_name = image_names[100000:]
val_record_list = record_list[100000:]

In [6]:
tokenizer = tf.keras.preprocessing.text.Tokenizer()
tokenizer.fit_on_texts(train_record_list)
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'
train_ = tokenizer.texts_to_sequences(train_record_list)
val_ = tokenizer.texts_to_sequences(val_record_list)

In [5]:
def calc_max_length(tensor):
    return max(len(t) for t in tensor)

max_length = calc_max_length(train)

In [10]:
train = tf.keras.preprocessing.sequence.pad_sequences(train_, padding='post',maxlen=max_length)
val = tf.keras.preprocessing.sequence.pad_sequences(val_, padding='post',maxlen=max_length)

## Create a tf.data dataset

In [12]:
BATCH_SIZE = 100
BUFFER_SIZE = 5000
embedding_dim = 256
units = 512
vocab_size = len(tokenizer.word_index) + 1
num_train_steps = len(train) // BATCH_SIZE
num_val_steps = len(val) // BATCH_SIZE

In [11]:
def load_image(image_path,record_list):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, channels=3)
    image = tf.image.resize(image, (224,224))
    image = tf.image.rgb_to_grayscale(image)
    image = (image/255) * 2 - 1
    return image, record_list

In [13]:
dataset_train = tf.data.Dataset.from_tensor_slices((train_image_name, train))
dataset_train = dataset_train.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset_train = dataset_train.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset_train = dataset_train.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

dataset_val = tf.data.Dataset.from_tensor_slices((val_image_name, val))
dataset_val = dataset_val.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset_val = dataset_val.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset_val = dataset_val.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

## Model

In [15]:
class CNN_Encoder(tf.keras.Model):
    # Since you have already extracted the features and dumped it using pickle
    # This encoder passes those features through a Fully connected layer
    def __init__(self, embedding_dim,**kwargs):
        super(CNN_Encoder, self).__init__(**kwargs)
        # shape after fc == (batch_size, 64, embedding_dim)
        self.conv1 = tf.keras.layers.Conv2D(32, 3, strides=1,activation="relu")
        self.conv2 = tf.keras.layers.Conv2D(32, 3, strides=1,activation="relu")
        self.conv3 = tf.keras.layers.Conv2D(64, 3, strides=1,activation="relu")
        self.conv4 = tf.keras.layers.Conv2D(64, 3, strides=1,activation="relu")
        self.conv5 = tf.keras.layers.Conv2D(64, 3, strides=1,activation="relu")
        self.conv6 = tf.keras.layers.Conv2D(128, 3, strides=1,activation="relu")
        self.conv7 = layers.Conv2D(embedding_dim, 3, strides=1,activation="relu")

        self.pool1 = tf.keras.layers.MaxPool2D(pool_size=(2, 2))
        self.pool2 = tf.keras.layers.MaxPool2D(pool_size=(2, 2))
        self.pool3 = tf.keras.layers.MaxPool2D(pool_size=(2, 2))
        self.pool4 = tf.keras.layers.MaxPool2D(pool_size=(2, 2))
        
    def call(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.pool1(x)
        
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.pool2(x)
        
        x = self.conv6(x)
        x = self.pool3(x)
        
        x = self.conv7(x)
        x = self.pool4(x)
        
        x = tf.reshape(x, (x.shape[0], -1, x.shape[3]))
        
        return x

In [16]:
class BahdanauAttention(tf.keras.Model):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, features, hidden):
        # features(CNN_encoder output) shape == (batch_size, 64, embedding_dim)

        # hidden shape == (batch_size, hidden_size)
        # hidden_with_time_axis shape == (batch_size, 1, hidden_size)
        hidden_with_time_axis = tf.expand_dims(hidden, 1)

        # score shape == (batch_size, 64, hidden_size)
        score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))

        # attention_weights shape == (batch_size, 64, 1)
        # you get 1 at the last axis because you are applying score to self.V
        attention_weights = tf.nn.softmax(self.V(score), axis=1)

        # context_vector shape after sum == (batch_size, hidden_size)
        context_vector = attention_weights * features
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights

In [17]:
class RNN_Decoder(tf.keras.Model):
    def __init__(self, embedding_dim, units, vocab_size):
        super(RNN_Decoder, self).__init__()
        self.units = units

        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc1 = tf.keras.layers.Dense(self.units)
        self.fc2 = tf.keras.layers.Dense(vocab_size)

        self.attention = BahdanauAttention(self.units)

    def call(self, x, features, hidden):
        # defining attention as a separate model
        context_vector, attention_weights = self.attention(features, hidden)

        # x shape after passing through embedding == (batch_size, 1, embedding_dim)
        x = self.embedding(x)

        # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

        # passing the concatenated vector to the GRU
        output, state = self.gru(x)

        # shape == (batch_size, max_length, hidden_size)
        x = self.fc1(output)

        # x shape == (batch_size * max_length, hidden_size)
        x = tf.reshape(x, (-1, x.shape[2]))

        # output shape == (batch_size * max_length, vocab)
        x = self.fc2(x)

        return x, state, attention_weights

    def reset_state(self, batch_size):
        return tf.zeros((batch_size, self.units))

In [18]:
encoder = CNN_Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, vocab_size)

In [19]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_mean(loss_)

In [20]:
checkpoint_path = "./checkpoints/train_13-02-try5"
ckpt = tf.train.Checkpoint(encoder=encoder,
                           decoder=decoder,
                           optimizer = optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)
start_epoch = 0
if ckpt_manager.latest_checkpoint:
    start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])

## Training

In [21]:
loss_plot = []
val_acc_plot = []

In [22]:
@tf.function
def train_step(img_tensor, target):
    loss = 0

    # initializing the hidden state for each batch
    # because the captions are not related from image to image
    hidden = decoder.reset_state(batch_size=target.shape[0])

    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * BATCH_SIZE, 1)

    with tf.GradientTape() as tape:
        features = encoder(img_tensor)

        for i in range(1, target.shape[1]):
            # passing the features through the decoder
            predictions, hidden, _ = decoder(dec_input, features, hidden)

            loss += loss_function(target[:, i], predictions)

            # using teacher forcing
            dec_input = tf.expand_dims(target[:, i], 1)

    total_loss = (loss / int(target.shape[1]))

    trainable_variables = encoder.trainable_variables + decoder.trainable_variables

    gradients = tape.gradient(loss, trainable_variables)

    optimizer.apply_gradients(zip(gradients, trainable_variables))

    return loss, total_loss

In [23]:
@tf.function
def validation_step(img_tensor, target):
    hidden = decoder.reset_state(batch_size=target.shape[0])
    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * target.shape[0], 1)
    features = encoder(img_tensor)
    
    pred_result = tf.zeros((target.shape[0], 1),dtype=tf.float32)

    for i in range(1, target.shape[1]):
        # passing the features through the decoder
        predictions, hidden, _ = decoder(dec_input, features, hidden)
        dec_input = tf.expand_dims(tf.cast(tf.argmax(predictions, axis=1),tf.float32), 1)
        
        pred_result = tf.concat((pred_result,dec_input),axis=1)
    
    Target = target[:,1:]
    Pred = pred_result[:,1:]
    
    mask = tf.math.logical_not(tf.math.equal(Target, 0))
    
    mask = tf.cast(mask, dtype = Pred.dtype)
    Target = tf.cast(Target, dtype = Pred.dtype)
    
    Pred *= mask
    
    is_the_same = tf.reduce_all(tf.math.equal(Pred, Target), axis=1)
    acc = tf.math.reduce_mean(tf.cast(is_the_same, tf.float32))

    return acc

In [24]:
EPOCHS = 16
best_acc = 0
best_step = 0

for epoch in range(start_epoch, EPOCHS):
    print('Epoch {} '.format(epoch + 1))
    start = time.time()
    total_loss = 0
    total_acc_val = 0

    for (batch, (img_tensor, target)) in enumerate(dataset_train):
        batch_loss, t_loss = train_step(img_tensor, target)
        total_loss += t_loss

    for (batch_val, (img_tensor_val, target_val)) in enumerate(dataset_val):
        acc_val = validation_step(img_tensor_val, target_val)
        total_acc_val += acc_val
        
    curr_val_acc = total_acc_val/num_val_steps
    
    loss_plot.append(total_loss / num_train_steps)
    val_acc_plot.append(curr_val_acc)
        
    ckpt_manager.save(checkpoint_number=epoch+1)

    print ('Train Loss: {} \ Validation Accuracy: {}'.format(total_loss/num_train_steps, curr_val_acc))
    print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

Epoch 1 
Train Loss: 1.7574057579040527 \ Validation Accuracy: 0.00019999999494757503
Time taken for 1 epoch 1639.8656494617462 sec

Epoch 2 
Train Loss: 1.5401090383529663 \ Validation Accuracy: 0.007799995131790638
Time taken for 1 epoch 321.63067531585693 sec

Epoch 3 
Train Loss: 0.7254821062088013 \ Validation Accuracy: 0.4276999235153198
Time taken for 1 epoch 321.8474566936493 sec

Epoch 4 
Train Loss: 0.30197519063949585 \ Validation Accuracy: 0.633699893951416
Time taken for 1 epoch 322.04762148857117 sec

Epoch 5 
Train Loss: 0.16273635625839233 \ Validation Accuracy: 0.7433498501777649
Time taken for 1 epoch 322.08063197135925 sec

Epoch 6 
Train Loss: 0.1146981343626976 \ Validation Accuracy: 0.7929000854492188
Time taken for 1 epoch 322.9812400341034 sec

Epoch 7 
Train Loss: 0.10251742601394653 \ Validation Accuracy: 0.8279998302459717
Time taken for 1 epoch 321.9898319244385 sec

Epoch 8 
Train Loss: 0.07154557853937149 \ Validation Accuracy: 0.8303501605987549
Time take

## Testing

In [25]:
test_image_name = []
test = []
for i in range(120000,140000):
    test_image_name.append('./words_captcha/' + 'a' + str(i) + '.png')
    test.append('<start>')

In [26]:
dataset_test = tf.data.Dataset.from_tensor_slices((test_image_name, test))
dataset_test = dataset_test.map(load_image).batch(BATCH_SIZE)

In [27]:
output_file = open('./Lab13-2_108024522.txt', 'w')
for (batch_test, (img_tensor_test, target_test)) in enumerate(dataset_test):
    hidden = decoder.reset_state(batch_size=BATCH_SIZE)
    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * BATCH_SIZE, 1)
    features = encoder(img_tensor_test)

    pred_result = tf.zeros((target_test.shape[0], 1),dtype=tf.float32)
    for i in range(1, max_length):
        # passing the features through the decoder
        predictions, hidden, _ = decoder(dec_input, features, hidden)
        dec_input = tf.expand_dims(tf.cast(tf.argmax(predictions, axis=1),tf.float32), 1)

        pred_result = tf.concat((pred_result,dec_input),axis=1)
        
    pred = pred_result[:,1:]
    mask = tf.math.logical_not(tf.math.equal(pred, 0))

    for i in range(BATCH_SIZE):
        output = 'a'+str(batch_test*BATCH_SIZE+i+120000)
        pred_index = tf.cast(pred[i],dtype = tf.int32).numpy()
        pred_index_clear = []
        
        for j in (pred_index):
            if(j == 2):
                break
                
            pred_index_clear.append(j)
            
        pred_str = [tokenizer.index_word[j] for j in pred_index_clear]
        output =output+' '+(''.join(pred_str))
        output_file.write(output+'\n')
output_file.close()