In [None]:
path = "C:/Users/Yakina/.cache/kagglehub/datasets/hsankesara/flickr-image-dataset/versions/1/flickr30k_images/"

In [None]:
import numpy as np
import pandas as pd
import os
import csv 
import datetime
from tqdm import tqdm
import h5py

import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import Sequence
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D, Activation, Dropout, Flatten, Dense, Input, Layer
from tensorflow.keras.layers import Embedding, LSTM, add, Concatenate, Reshape, concatenate, Bidirectional
from tensorflow.keras.applications import VGG16, ResNet50, DenseNet201
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau

import warnings

warnings.filterwarnings('ignore')

In [None]:
image_path = path + 'flickr30k_images/'
csv_path = path + 'results.csv'

In [None]:
saved_model_number = 2 # or None
features_file = 'image_features.h5'

model_file = f'models/model_epoch_{saved_model_number}.h5'

In [None]:
data = pd.read_csv(csv_path, sep = r'\s*\|\s*')
data.head()

In [None]:
data['image_name'] = data['image_name'].astype(str)
data['comment_number'] = data['comment_number'].astype(int)
data['comment'] = data['comment'].astype(str)

In [None]:
def text_preprocessing(data):
    data['comment'] = data['comment'].apply(lambda x: x.lower())
    data['comment'] = data['comment'].apply(lambda x: x.replace("[^A-Za-z]",""))
    data['comment'] = data['comment'].apply(lambda x: x.replace("\s+"," "))
    data['comment'] = data['comment'].apply(lambda x: " ".join([word for word in x.split() if len(word)>1]))
    data['comment'] = "startseq " + data['comment'] + " endseq"
    return data

In [None]:
data = text_preprocessing(data)
captions = data['comment'].tolist()
captions[:10]

In [None]:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(captions)
vocab_size = len(tokenizer.word_index) + 1
max_length = max(len(caption.split()) for caption in captions)

images = data['image_name'].unique().tolist()
nimages = len(images)

split_index = round(0.85*nimages)
train_images = images[:split_index]
val_images = images[split_index:]

train = data[data['image_name'].isin(train_images)]
test = data[data['image_name'].isin(val_images)]

train.reset_index(inplace=True,drop=True)
test.reset_index(inplace=True,drop=True)

tokenizer.texts_to_sequences([captions[1]])[0]

In [None]:
class CustomDataGenerator(Sequence):
    def __init__(self, df, X_col, y_col, batch_size, directory, tokenizer, vocab_size, max_length, features,shuffle=True):
    
        self.df = df.copy()
        self.X_col = X_col
        self.y_col = y_col
        self.directory = directory
        self.batch_size = batch_size
        self.tokenizer = tokenizer
        self.vocab_size = vocab_size
        self.max_length = max_length
        self.features = features
        self.shuffle = shuffle
        self.n = len(self.df)
        
    def on_epoch_end(self):
        if self.shuffle:
            self.df = self.df.sample(frac=1).reset_index(drop=True)
    
    def __len__(self):
        return self.n // self.batch_size
    
    def __getitem__(self,index):
    
        batch = self.df.iloc[index * self.batch_size:(index + 1) * self.batch_size,:]
        X1, X2, y = self.__get_data(batch)        
        return (X1, X2), y
    
    def __get_data(self,batch):
        
        X1, X2, y = list(), list(), list()
        
        images = batch[self.X_col].tolist()
           
        for image in images:
            feature = self.features[image][0]
            
            captions = batch.loc[batch[self.X_col]==image, self.y_col].tolist()
            for caption in captions:
                seq = self.tokenizer.texts_to_sequences([caption])[0]

                for i in range(1,len(seq)):
                    in_seq, out_seq = seq[:i], seq[i]
                    in_seq = pad_sequences([in_seq], maxlen=self.max_length)[0]
                    out_seq = to_categorical([out_seq], num_classes=self.vocab_size)[0]
                    X1.append(feature)
                    X2.append(in_seq)
                    y.append(out_seq)
            
        X1, X2, y = np.array(X1), np.array(X2), np.array(y)
                
        return X1, X2, y

In [None]:
def create_caption_model_architecture():
	input1 = Input(shape=(1920,))
	input2 = Input(shape=(max_length,))

	img_features = Dense(256, activation='relu')(input1)
	img_features_reshaped = Reshape((1, 256), input_shape=(256,))(img_features)

	sentence_features = Embedding(vocab_size, 256, mask_zero=False)(input2)
	merged = concatenate([img_features_reshaped,sentence_features],axis=1)
	sentence_features = LSTM(256)(merged)
	x = Dropout(0.5)(sentence_features)
	x = add([x, img_features])
	x = Dense(128, activation='relu')(x)
	x = Dropout(0.5)(x)
	output = Dense(vocab_size, activation='softmax')(x)

	caption_model = Model(inputs=[input1,input2], outputs=output)

	caption_model.compile(loss='categorical_crossentropy', 
						optimizer='adam', 
						metrics=['accuracy'])
	
	return caption_model

In [None]:
def extract_img_features():
	model = DenseNet201()
	fe = Model(inputs=model.input, outputs=model.layers[-2].output)

	img_size = 224
	features = {}
	for image in tqdm(data['image_name'].unique().tolist()):
		img = load_img(os.path.join(image_path,image),target_size=(img_size,img_size))
		img = img_to_array(img)
		img = img/255.
		img = np.expand_dims(img,axis=0)
		feature = fe.predict(img, verbose=0)
		features[image] = feature

	with h5py.File(features_file, 'w') as f:
		for img_name, feature_array in features.items():
			f.create_dataset(img_name, data = feature_array)
	
	print(f"Features saved to {features_file}")

	return features

In [None]:
if saved_model_number == 0:
	features = extract_img_features()
	caption_model = create_caption_model_architecture()
else:
	features = {}
	with h5py.File(features_file, 'r') as f:
		for img_name in f.keys():
			features[img_name] = f[img_name][:]
	print(f"Features loaded from {features_file}")
	caption_model = load_model(model_file)
	print(f"Model loaded from {model_file}")

In [None]:
train_generator = CustomDataGenerator(df = train, X_col = 'image_name', y_col = 'comment', batch_size = 64,directory = image_path, tokenizer = tokenizer, vocab_size = vocab_size, max_length = max_length, features = features)

validation_generator = CustomDataGenerator(df = test, X_col = 'image_name', y_col = 'comment', batch_size = 64, directory = image_path,  tokenizer = tokenizer, vocab_size = vocab_size, max_length = max_length, features = features)

In [None]:
model_name = "model_epoch_{epoch:02d}.h5"
checkpoint = ModelCheckpoint(model_name,
                            monitor = "val_loss",
                            mode = "min",
                            save_best_only = True,
                            verbose = 1)

earlystopping = EarlyStopping(monitor = 'val_loss', min_delta = 0, patience = 5, verbose = 1, restore_best_weights = True)

learning_rate_reduction = ReduceLROnPlateau(monitor = 'val_loss', 
                                            patience = 3, 
                                            verbose = 1, 
                                            factor = 0.2, 
                                            min_lr = 0.00000001)

In [None]:
class MetricsLogger(tf.keras.callbacks.Callback):
    def __init__(self, log_file="training_log.csv"):
        super().__init__()
        self.log_file = log_file
        
        if not os.path.exists(log_file):
            with open(log_file, 'w', newline='') as f:
                writer = csv.writer(f)
                writer.writerow(['timestamp', 'epoch', 'loss', 'accuracy', 'val_loss', 'val_accuracy', 'lr'])
    
    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        
        lr = float(self.model.optimizer.learning_rate.numpy())
        
        with open(self.log_file, 'a', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([
                datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                epoch + 1,  # Epoch number (1-indexed)
                logs.get('loss', 'N/A'),
                logs.get('accuracy', 'N/A'),
                logs.get('val_loss', 'N/A'),
                logs.get('val_accuracy', 'N/A'),
                lr
            ])
        
        print(f"\nEpoch {epoch + 1} Summary:")
        print(f"  Loss: {logs.get('loss', 'N/A'):.4f}")
        print(f"  Accuracy: {logs.get('accuracy', 'N/A'):.4f}")
        print(f"  Val Loss: {logs.get('val_loss', 'N/A'):.4f}")
        print(f"  Val Accuracy: {logs.get('val_accuracy', 'N/A'):.4f}")
        print(f"  Learning Rate: {lr:.8f}")
        print("-" * 50)

In [None]:
metrics_logger = MetricsLogger("training_log.csv")

In [None]:
history = caption_model.fit(
        train_generator,
        epochs = 50,
        validation_data = validation_generator,
        callbacks=[checkpoint, earlystopping, learning_rate_reduction]
        initial_epoch = saved_model_number,
)