In [None]:
import pandas as pd
import tensorflow as tf
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
df = pd.read_csv('/content/drive/MyDrive/flickr30k_images/results.csv', delimiter='|')
df

Unnamed: 0,image_name,comment_number,comment
0,1000092795.jpg,0,Two young guys with shaggy hair look at their...
1,1000092795.jpg,1,"Two young , White males are outside near many..."
2,1000092795.jpg,2,Two men in green shirts are standing in a yard .
3,1000092795.jpg,3,A man in a blue shirt standing in a garden .
4,1000092795.jpg,4,Two friends enjoy time spent together .
...,...,...,...
158910,998845445.jpg,0,A man in shorts and a Hawaiian shirt leans ov...
158911,998845445.jpg,1,"A young man hanging over the side of a boat ,..."
158912,998845445.jpg,2,A man is leaning off of the side of a blue an...
158913,998845445.jpg,3,"A man riding a small boat in a harbor , with ..."


In [None]:
df.columns=['image_name', 'comment_number', 'comment']

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 158915 entries, 0 to 158914
Data columns (total 3 columns):
 #   Column          Non-Null Count   Dtype 
---  ------          --------------   ----- 
 0   image_name      158915 non-null  object
 1   comment_number  158915 non-null  int64 
 2   comment         158915 non-null  object
dtypes: int64(1), object(2)
memory usage: 3.6+ MB


In [None]:
image_count = len(df.image_name.unique())
image_count

31783

In [None]:
train_set_size = 0.8
train_image_count = int(image_count*train_set_size)
train_split_index = train_image_count * 5
train_image_count, train_split_index

(25426, 127130)

In [None]:
train_df = df[:train_split_index]
test_df = df[train_split_index:]
train_df.dropna(inplace=True)
test_df.dropna(inplace=True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_df.dropna(inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_df.dropna(inplace=True)


In [None]:
img_loc = '/content/drive/MyDrive/flickr30k_images/images/'
train_image_paths = img_loc+train_df.image_name
train_image_paths = train_image_paths.values

val_image_paths = img_loc+test_df.image_name
val_image_paths = val_image_paths.values

train_image_paths[:2], val_image_paths[:2]

(array(['/content/drive/MyDrive/flickr30k_images/images/1000092795.jpg',
        '/content/drive/MyDrive/flickr30k_images/images/1000092795.jpg'],
       dtype=object),
 array(['/content/drive/MyDrive/flickr30k_images/images/4944548179.jpg',
        '/content/drive/MyDrive/flickr30k_images/images/4944548179.jpg'],
       dtype=object))

In [None]:
import re
def preprocess(sentence):
    try:
        sentence = sentence.lower()
        return re.sub(r"[^a-zA-Z0-9 ']", ' ', sentence)
    except Exception as e:
        print(sentence, e)
        raise(e)

In [None]:
# train_texts = 'startseq'+ train_df.comment + ' endsq'
# val_texts = 'startseq'+ test_df.comment + ' endsq'

train_texts = train_df.comment.apply(lambda x: preprocess(str(x)))
val_texts = test_df.comment.apply(lambda x: preprocess(str(x)))
train_texts[:5], val_texts[:5]


(0     two young guys with shaggy hair look at their...
 1     two young   white males are outside near many...
 2     two men in green shirts are standing in a yard  
 3         a man in a blue shirt standing in a garden  
 4              two friends enjoy time spent together  
 Name: comment, dtype: object,
 127130     a person painting a mural on the side of a bu...
 127131     pedestrians stop to watch a man create a mura...
 127132     a painting of a man drinking from a bottle on...
 127133     an artist is painting a mural on the side of ...
 127134        a crowd gathers to watch the artist at work  
 Name: comment, dtype: object)

In [None]:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(train_texts)
vocab_size = len(tokenizer.word_index)+1
vocab_size

16345

In [None]:
train_labels = tokenizer.texts_to_sequences(train_texts)
val_labels = tokenizer.texts_to_sequences(val_texts)

train_labels[:2], val_labels[:2]

([[11, 20, 361, 9, 1914, 91, 191, 15, 61, 157, 23, 325, 72, 2, 3, 458],
  [11, 20, 19, 711, 12, 49, 75, 177, 1384]],
 [[1, 55, 334, 1, 893, 4, 3, 144, 8, 1, 66, 23, 13, 12, 29, 76, 168],
  [773, 573, 14, 227, 1, 6, 3836, 1, 893, 4, 1, 100, 15, 3, 144, 8, 1, 32]])

In [None]:
max_length = 100
train_labels = pad_sequences(train_labels, maxlen=max_length, padding='post')
val_labels = pad_sequences(val_labels, maxlen=max_length, padding='post')
train_labels[:2]

array([[  11,   20,  361,    9, 1914,   91,  191,   15,   61,  157,   23,
         325,   72,    2,    3,  458,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0],
       [  11,   20,   19,  711,   12,   49,   75,  177, 1384,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    

In [None]:
train_labels.shape, val_labels.shape

((127130, 100), (31785, 100))

In [None]:
train_labels.shape

(127130, 100)

In [None]:
def preprocess_image(file_path):
    # Read and decode the image
        # Read and decode the image
    img = tf.io.read_file(file_path)
    img = tf.image.decode_image(img, channels=3)

    img = tf.image.resize(img, (299, 299))
    img = tf.image.convert_image_dtype(img, dtype=tf.float32)
    img = img/255
    return img

In [None]:
import random

def data_generator(train_image_paths, train_labels, batch_size, start_from_batch=0):
    count = 0
    batch_images = []
    batch_tokens = []
    batch_target = []
    labels_length = train_labels.shape[0]
    start_from = start_from_batch*batch_size

    for iter_index in range(start_from, labels_length):

        image_path = train_image_paths[iter_index]
        caption = train_labels[iter_index]

        image = preprocess_image(image_path)
        decision = iter_index%2

        if decision:
            batch_tokens.append(caption)
        else:
            false_label_index = (iter_index+random.randint(5, labels_length-5))%labels_length
            batch_tokens.append(train_labels[false_label_index])
        batch_target.append([decision])
        batch_images.append(image)
        count+=1

        if count==batch_size:
            batch_images = tf.convert_to_tensor(batch_images)
            batch_tokens = tf.convert_to_tensor(batch_tokens)
            batch_target = tf.convert_to_tensor(batch_target)

            yield [batch_images, batch_tokens], batch_target

            batch_images = []
            batch_tokens = []
            batch_target = []
            count=0



In [None]:
from tensorflow.keras import layers
from tensorflow.keras import regularizers as reg

In [None]:
image_model = tf.keras.Sequential([

    layers.Conv2D(64, (3, 3), padding="same", kernel_regularizer=reg.l2(1e-3)),
    layers.BatchNormalization(),
    layers.ReLU(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.1),
    layers.Conv2D(64, (3, 3), kernel_regularizer=reg.l2(1e-3)),
    layers.BatchNormalization(),
    layers.ReLU(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.1),

    layers.Conv2D(128, (3, 3),  padding="same", kernel_regularizer=reg.l2(1e-3)),
    layers.BatchNormalization(),
    layers.ReLU(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.2),
    layers.Conv2D(128, (3, 3), kernel_regularizer=reg.l2(1e-3)),
    layers.BatchNormalization(),
    layers.ReLU(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.1),

    layers.Conv2D(256, (3, 3), padding="same", kernel_regularizer=reg.l2(1e-3)),
    layers.BatchNormalization(),
    layers.ReLU(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.2),
    layers.Conv2D(256, (3, 3), kernel_regularizer=reg.l2(1e-3)),
    layers.BatchNormalization(),
    layers.ReLU(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.2),

    # Flatten the input
    layers.GlobalAveragePooling2D(),

    # Dense layers
    layers.Dense(512, kernel_regularizer=reg.l2(1e-3)),
    layers.BatchNormalization(),
    layers.ReLU()
])


In [None]:
image_input = layers.Input(shape=(299,299,3), name="image_input")

image_features = image_model(image_input)

In [None]:
text_input = layers.Input(shape=(max_length, ), name="text_input")

embedding_layer = layers.Embedding(input_dim=vocab_size,
                                   output_dim=300,
                                   mask_zero=True,
                                   input_length=max_length)

embedding_output = embedding_layer(text_input)

lstm_layer_1 = layers.Bidirectional(layers.LSTM(units=64, return_sequences=True))(embedding_output)
lstm_layer_1 = layers.Dropout(0.1)(lstm_layer_1)

lstm_layer_2 = layers.Bidirectional(layers.LSTM(units=128, return_sequences=True))(lstm_layer_1)
lstm_layer_2 = layers.Dropout(0.1)(lstm_layer_2)

lstm_layer_3 = layers.Bidirectional(layers.LSTM(units=256))(lstm_layer_2)
lstm_layer_3 = layers.Dropout(0.1)(lstm_layer_3)

text_features = lstm_layer_3
text_features

<KerasTensor: shape=(None, 512) dtype=float32 (created by layer 'dropout_8')>

In [None]:
concatted_features = layers.Concatenate(axis=1)([image_features, text_features])
concatted_features

<KerasTensor: shape=(None, 1024) dtype=float32 (created by layer 'concatenate')>

In [None]:
dense_layer_final = layers.Dense(512, kernel_regularizer=reg.l2(1e-3))(concatted_features)
dense_layer_final = layers.BatchNormalization()(dense_layer_final)
dense_layer_final = layers.ReLU()(dense_layer_final)

decision_layer = layers.Dense(1, activation="sigmoid")
model_output = decision_layer(dense_layer_final)
model_output

<KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'dense_2')>

In [None]:
its_model = tf.keras.Model(inputs=[image_input, text_input], outputs=model_output)
its_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 text_input (InputLayer)     [(None, 100)]                0         []                            
                                                                                                  
 embedding (Embedding)       (None, 100, 300)             4903500   ['text_input[0][0]']          
                                                                                                  
 bidirectional (Bidirection  (None, 100, 128)             186880    ['embedding[0][0]']           
 al)                                                                                              
                                                                                                  
 dropout_6 (Dropout)         (None, 100, 128)             0         ['bidirectional[0][0]']   

In [None]:
# Compile the model
its_model.compile(optimizer=tf.keras.optimizers.legacy.Adam(0.001),
                  loss="binary_crossentropy", metrics=["accuracy"])


In [None]:
train_data_generator = data_generator(train_image_paths,
                                      train_labels,
                                      batch_size=32,
                                      start_from_batch=2000)

val_data_generator = data_generator(val_image_paths,
                                    val_labels,
                                    batch_size=32)


In [None]:
class SaveModelCallback(tf.keras.callbacks.Callback):
    def __init__(self, checkpoint_path, save_steps=250):
        super(SaveModelCallback, self).__init__()
        self.checkpoint_path = checkpoint_path
        self.save_steps = save_steps

    def on_batch_end(self, batch, logs=None):
        if (batch + 1) % self.save_steps == 0:

            self.model.save(self.checkpoint_path.format(batch=batch+1))

In [None]:
# Initialize the callback
checkpoint_path = '/content/drive/MyDrive/flickr30k_images/model_checkpoint_{batch}.keras'
save_callback = SaveModelCallback(checkpoint_path)

In [None]:
its_model.load_weights('/content/drive/MyDrive/flickr30k_images/model_checkpoint_1000.keras')


In [None]:
its_model.fit(train_data_generator,
              # validation_data=val_data_generator,
              callbacks = [save_callback],
              epochs=1)


In [None]:
import tensorflow as tf

# Assuming you have a validation data generator named val_generator

# Initialize variables to accumulate correct predictions and total number of samples
correct_predictions = 0
num_samples = 0

# Iterate over the validation generator
for x_val, y_val in val_data_generator:
    # Make predictions using your model
    y_pred = its_model.predict(x_val)

    # Convert predicted probabilities to binary predictions (0 or 1)
    y_true = tf.cast(y_val, dtype=tf.float32)

    # Apply thresholding
    binary_predictions = tf.round(y_pred)

    # Ensure predictions are in the range [0, 1]
    binary_predictions = tf.clip_by_value(binary_predictions, 0, 1)
    # print(y_true)
    # print(binary_predictions)
    # Count correct predictions
    correct_predictions += tf.reduce_sum(tf.cast(binary_predictions == y_true, dtype=tf.float32))
    print((tf.reduce_sum(tf.cast(binary_predictions == y_true, dtype=tf.float32))/32).numpy())
    # Update total number of samples
    num_samples += len(x_val)

# Calculate accuracy
accuracy = correct_predictions / num_samples

print("Accuracy on Validation Data:", accuracy.numpy())


In [None]:
its_model.save('/content/drive/MyDrive/flickr30k_images/model_checkpoint_3ep.keras')