In [None]:
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense, Dropout, MaxPooling2D, Flatten, Input, Conv2D
from tensorflow.keras.models import load_model, Sequential
from tensorflow.keras import losses
from tensorflow.keras import backend as K
import matplotlib.pyplot as plt
from tensorflow_addons.layers import SpatialPyramidPooling2D
import numpy as np
from imageio import imread
import pandas as pd
import math
import os
import tensorflow
from tensorflow.keras.utils import Sequence
from tensorflow.keras.preprocessing.image import img_to_array, ImageDataGenerator, load_img
import random
import sys

In [None]:
old_stdout = sys.stdout
log_file = open("phosc-training.log","w")
sys.stdout = log_file

In [None]:
# DataSequence class to pass data(images/vector) in batches
class DataSequence(Sequence):
    def __init__(self, imagefiles, phoslabels, phoclabels, batch_size):
        self.bsz = batch_size  # batch size

        # Take labels and a list of image locations in memory
        self.labels = []
        for phos, phoc in zip(phoslabels, phoclabels):
            self.labels.append({"phosnet": np.asarray(phos).astype(np.float32),
                                "phocnet": np.asarray(phoc).astype(np.float32)})
        self.im_list = imagefiles

    def __len__(self):
        # compute number of batches to yield
        return int(math.ceil(len(self.im_list) / float(self.bsz)))

    def on_epoch_end(self):
        # Shuffles indexes after each epoch if in training mode
        self.indexes = range(len(self.im_list))
        self.indexes = random.sample(self.indexes, k=len(self.indexes))

    def get_batch_labels(self, idx):
        # Fetch a batch of labels
        return np.array(self.labels[idx * self.bsz: (idx + 1) * self.bsz])

    def get_batch_features(self, idx):
        # Fetch a batch of inputs
        return np.array([img_to_array(load_img(im, target_size=(110, 110))) for im in self.im_list[idx * self.bsz: (1 + idx) * self.bsz]])

    def __getitem__(self, idx):
        batch_x = self.get_batch_features(idx)
        batch_y = self.get_batch_labels(idx)
        l1 = []
        l2 = []
        for x in batch_y:
            l1.append(x['phosnet'])
            l2.append(x['phocnet'])
        # return batch_x, batch_y
        return batch_x, {'phosnet': np.asarray(l1), 'phocnet': np.asarray(l2)}

In [None]:
def base_model(img_width, img_height, weight_path=None):
    if K.image_data_format() == 'channels_first':
        input_shapes = (3, img_width, img_height)
    else:
        input_shapes = (img_width, img_height, 3)
    inp = Input(shape=input_shapes)
    model = Conv2D(64, (3, 3), padding='same', activation='relu')(inp)
    model = Conv2D(64, (3, 3), padding='same', activation='relu')(model)
    model = (MaxPooling2D(pool_size=(2, 2), strides=2))(model)
    model = (Conv2D(128, (3, 3), padding='same', activation='relu'))(model)
    model = (Conv2D(128, (3, 3), padding='same', activation='relu'))(model)
    model = (MaxPooling2D(pool_size=(2, 2), strides=2))(model)
    model = (Conv2D(256, (3, 3), padding='same', activation='relu'))(model)
    model = (Conv2D(256, (3, 3), padding='same', activation='relu'))(model)
    model = (Conv2D(256, (3, 3), padding='same', activation='relu'))(model)
    model = (Conv2D(256, (3, 3), padding='same', activation='relu'))(model)
    model = (Conv2D(256, (3, 3), padding='same', activation='relu'))(model)
    model = (Conv2D(256, (3, 3), padding='same', activation='relu'))(model)
    model = (Conv2D(512, (3, 3), padding='same', activation='relu'))(model)
    model = (Conv2D(512, (3, 3), padding='same', activation='relu'))(model)
    model = (Conv2D(512, (3, 3), padding='same', activation='relu'))(model)
    model = (SpatialPyramidPooling2D([1, 2, 4]))(model)
    model = (Flatten())(model)

    phosnet_op = Dense(4096, activation='relu')(model)
    phosnet_op = Dropout(0.5)(phosnet_op)
    phosnet_op = Dense(4096, activation='relu')(phosnet_op)
    phosnet_op = Dropout(0.5)(phosnet_op)
    phosnet_op = Dense(270, activation='relu', name="phosnet")(phosnet_op)

    phocnet = Dense(4096, activation='relu')(model)
    phocnet = Dropout(0.5)(phocnet)
    phocnet = Dense(4096, activation='relu')(phocnet)
    phocnet = Dropout(0.5)(phocnet)
    phocnet = Dense(830, activation='sigmoid', name="phocnet")(phocnet)

    model = Model(inputs=inp, outputs=[phosnet_op, phocnet])
    losses = {
        "phosnet": tensorflow.keras.losses.MSE,
        "phocnet": 'binary_crossentropy',
    }
    lossWeights = {"phosnet": 1.5, "phocnet": 4.5}
    # initialize the optimizer and compile the model

    opt = tensorflow.keras.optimizers.Adam(learning_rate=1e-4, decay=5e-5)
    model.compile(optimizer=opt, loss=losses, loss_weights=lossWeights,
                  metrics=[tensorflow.keras.metrics.CosineSimilarity(axis=1)])
    if weight_path:
        df = pd.read_pickle(weight_path)
        tmp_weights = df.values
        N = len(tmp_weights)
        weights = []
        for i in range(N):
            weights.append(tmp_weights[i][0])
        model.set_weights(weights)

    model.summary()
    from keras.utils.vis_utils import plot_model as plot

    plot(model, to_file="phoscnet.png", show_shapes=True)
    return model

# PHOC Label generator

In [None]:
'''This code will take an input word as in string and will
output the PHOC label of the word. The Phoc label is a
vector of length 3784.
((2 + 3 + 4 + 5) * languageCharactersAndNumbersCount) + (2*commonBigram)
((2+3+4+5) * 256) + (2*100) = 3784
((2+3+4+5) * 45) + (2*100) = 830
Reference: https://ieeexplore.ieee.org/document/6857995/?part=1
'''

def generate_45(word):
  '''The vector is a binary and stands for:
  https://en.wikipedia.org/wiki/Arabic_script_in_Unicode
  arabic unicode characters is 256
  '''
  generate_45 = [0 for i in range(45)]
  for char in word:
      generate_45[ord(char) - ord('ا')] = 1

  return generate_45

def generate_100(word):
  '''This vector is going to count the number of most frequent
  bigram words found in the text
  '''

  bigram = ['لم', 'لل', 'ين', 'لت', 'لي', 'يت', 'لع', 'هم', 'لن', 'تم', 'في', 'عل',
            'لب', 'ست', 'بي', 'يم', 'مت', 'ته', 'لح', 'لق', 'ما', 'لف', 'من', 'ها',
            'له', 'كم', 'يس', 'مل', 'بت', 'لك', 'نا', 'لس', 'يب', 'بع', 'مس', 'سب',
            'يع', 'تح', 'يل', 'فت', 'فل', 'مع', 'تع', 'لا', 'تن', 'تب', 'يح', 'يه',
            'لج', 'فع', 'سم', 'تق', 'لش', 'ير', 'ني', 'يك', 'لو', 'مي', 'بم', 'نف',
            'مح', 'تف', 'عن', 'لخ', 'سي', 'يق', 'قت', 'يف', 'حي', 'نص', 'عم', 'جم',
            'به', 'بل', 'كت', 'نه', 'صي', 'نت', 'نق', 'تل', 'خل', 'لغ', 'لص', 'تك',
            'با', 'تس', 'يا', 'نب', 'قب', 'مو', 'حم', 'عت', 'قل', 'يخ', 'عي', 'قي',
            'مه', 'نس', 'تا', 'سن']

  vector_100 = [0 for i in range(100)]
  for char in word:
    try:
      vector_100[bigram.index(char)] = 1
    except:
      continue

  return vector_100

def generate_label(word):
  word = word.lower()
  vector = []
  L = len(word)
  for split in range(2, 6):
    parts = L//split
    for mul in range(split-1):
      vector += generate_45(word[mul*parts:mul*parts+parts])
    vector += generate_45(word[(split-1)*parts:L])

  # Append the most common 100 bigram text using L2 split
  vector += generate_100(word[0:L//2])
  vector += generate_100(word[L//2: L])
  return vector

# PHOS Label generator

In [None]:
import csv

# Input: CSV file name that has shape counts for each alphabet
# Output: Number of shapes/columns

def get_number_of_columns(csv_file):
    with open(csv_file, encoding='UTF-8-sig') as file:
        reader = csv.reader(file, delimiter=',', skipinitialspace=True)
        return len(next(reader))-1


# Input: CSV file name that has shape counts for each alphabet
# Output: A dictionary where alphabet is key mapped to its shape count vector(np-array)

def create_alphabet_dictionary(csv_file):
    alphabet_dict = dict()

    with open(csv_file, encoding='UTF-8-sig') as file:
        reader = csv.reader(file, delimiter=',', skipinitialspace=True)

        for index, line in enumerate(reader):
            alphabet_dict[line[0]] = index

    return alphabet_dict

alphabet_csv = "Arabic_alphabet.csv"

alphabet_dict = create_alphabet_dictionary(alphabet_csv)
csv_num_cols = get_number_of_columns(alphabet_csv)
numpy_csv = np.genfromtxt(alphabet_csv, dtype=int, delimiter=",")
numpy_csv=np.delete(numpy_csv,0,1)

# Input: A word segment(string)
# Output: A shape count vector for all alphabets in input word segment (np-array)

def word_vector(word):
    vector = np.zeros(csv_num_cols)
    for letter in word:
        letter_index = alphabet_dict[letter]
        vector += numpy_csv[letter_index]
    return vector

# Input: A word(string) 
# Output: PHOS vector

def phos_generate_label(word):
    vector = word_vector(word)
    L = len(word)
    for split in range(2, 6):
        parts = L//split
        for mul in range(split-1):
            vector=np.concatenate((vector,word_vector(word[mul*parts:mul*parts+parts])),axis=0)
        vector=np.concatenate((vector,word_vector(word[(split-1)*parts:L])),axis=0)
    return vector

# Input: A list of words(strings)
# Output: A dictionary of PHOS vectors in which the words serve as the key

def gen_label(word_list):
    label={}
    for word in word_list:
        label[word]=phos_generate_label(word)
    return label


# Input: A text file name that has a list of words(strings)
# Output: A dictionary of PHOS vectors in which the words serve as the key

def label_maker(word_txt):
    label={}
    with open(word_txt, "r") as file:
        for word_index, line in enumerate(file):
            word = line.split()[0]
            label[word]=phos_generate_label(word)
    return label
    #write_s_file(s_matrix_csv, s_matrix, word_list)


In [None]:
def get_generator_value(class_indicates, index):
    key_list = list(class_indicates.keys())
    val_list = list(class_indicates.values())
    return key_list[val_list.index(index)]

In [None]:
train_path = 'asar-dataset/train'
test_path = 'asar-dataset/test'
val_path = 'asar-dataset/val'

In [None]:
train_datagen = ImageDataGenerator()

In [None]:
#val_datagen = ImageDataGenerator(rescale=1. / 255.)
val_datagen = ImageDataGenerator()

In [None]:
test_datagen = ImageDataGenerator()

In [None]:
train_generator = train_datagen.flow_from_directory(
        # This is the target directory
        train_path,
        shuffle= False,
        # All images will be resized to 150x150
        target_size=(16, 16),
        batch_size=1,
        # binary: use binary_crossentropy loss, we need binary labels
        # categorical : use categorical_crossentropy loss, then need categorical labels
        class_mode='binary')

In [None]:
val_generator = val_datagen.flow_from_directory(
        # This is the target directory
        val_path,
        shuffle= False,
        target_size=(16, 16),
        batch_size=1,
        class_mode='binary')

In [None]:
test_generator = test_datagen.flow_from_directory(
        test_path,
        shuffle= False,
        target_size=(16, 16),
        batch_size=1,
        class_mode='binary')

In [None]:
train_generator.reset()
y_train = train_generator.labels
X_train_files = [train_path + '/' + filename for filename in train_generator.filenames]

val_generator.reset()
y_val = val_generator.labels
X_val_files = [val_path + '/' + filename for filename in val_generator.filenames]

test_generator.reset()
y_test = test_generator.labels
X_test_files = [test_path + '/' + filename for filename in test_generator.filenames]

In [None]:
# num_of_classes = len(train_generator.class_indices)
# test_transcripts = [get_generator_value(test_generator.class_indices, int(i)) for i in y_test]
# test_transcripts = np.array(test_transcripts)
y_train_phoc_labels = [generate_label(get_generator_value(train_generator.class_indices, int(i))) for i in y_train]
y_train_phos_labels = [phos_generate_label(get_generator_value(train_generator.class_indices, int(i))) for i in y_train]

y_val_phoc_labels = [generate_label(get_generator_value(val_generator.class_indices, int(i))) for i in y_val]
y_val_phos_labels = [phos_generate_label(get_generator_value(val_generator.class_indices, int(i))) for i in y_val]

y_test_phoc_labels = [generate_label(get_generator_value(test_generator.class_indices, int(i))) for i in y_test]
y_test_phos_labels = [phos_generate_label(get_generator_value(test_generator.class_indices, int(i))) for i in y_test]

In [None]:
weight_path = 'phosc_weights.pkl'
if os.path.exists(weight_path):
    model = base_model(110, 110, weight_path= weight_path)
else:
    model = base_model(110, 110)

In [None]:
batch_size = 32
train_sequence = DataSequence(imagefiles=X_train_files, phoclabels= y_train_phoc_labels, phoslabels=y_train_phos_labels, batch_size=batch_size)
valid_sequence = DataSequence(imagefiles=X_val_files, phoclabels= y_val_phoc_labels, phoslabels=y_val_phos_labels, batch_size=batch_size)

In [None]:
# Early stopping and ReduceLROnPlateau callbacks
early_stop = tensorflow.keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=0.0001, patience=10, verbose=2,mode='auto', baseline=None, restore_best_weights=False)
rlp=tensorflow.keras.callbacks.ReduceLROnPlateau(monitor='val_phocnet_loss', factor=0.25, patience=5, verbose=1,mode='auto', min_delta=0.0001, cooldown=2, min_lr=1e-7)
callbacks_list = [early_stop,rlp]

for i in range(100):
    history = model.fit(
        train_sequence,
        epochs=5,
        shuffle= True,
        validation_data=valid_sequence,
        verbose=1,
        callbacks=callbacks_list)

    weights = model.get_weights()
    df = pd.DataFrame(weights)
    print("Saving the best model.......")
    model.save('phosc-model.h5')
    df.to_pickle('phosc_weights.pkl')
#
# # Create directory to store training history
#
# if not os.path.exists("Train_History"):
#     os.makedirs("Train_History")
#
# # Store train history as CSV file
# model_name="phoc-model"
# hist_df = pd.DataFrame(history.history)
# hist_csv_file = 'Train_History/history_'+model_name+'.csv'
# with open(hist_csv_file, mode='w') as f:
#     hist_df.to_csv(f)
#
# # Plot train and validation accuracy(avg cosine similarity)
#
# acc = history.history['cosine_similarity']
# val_acc = history.history['val_cosine_similarity']
# loss = history.history['loss']
# val_loss = history.history['val_loss']
# epochs = range(1, len(acc) + 1)
# plt.plot(epochs, acc,label='Training Similarity')
# plt.plot(epochs, val_acc,label='Validation Similarity')
# plt.title(model_name+'_Cosine Similarity')
# plt.legend()
# plt.savefig('Train_History/'+model_name+'_Pretrain_CS.png')
# plt.show()
#
# # Plot train and validation loss
# plt.plot(epochs, loss,label='Training Loss')
# plt.plot(epochs, val_loss,label='Validation Loss')
# plt.title(model_name+' MSE Loss')
# plt.legend()
# plt.savefig('Train_History/'+model_name+'_Pretrain_Loss.png')
# plt.show()

In [None]:
sys.stdout = old_stdout
log_file.close()