In [3]:
import numpy as np
from PIL import Image
import os
import string
from pickle import dump, load
from tensorflow.keras.applications.xception import Xception
from tensorflow.keras.applications.xception import preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout
from tensorflow.keras.layers import add
from tensorflow.keras.models import Model, load_model
from tqdm.notebook import tqdm
tqdm().pandas()

0it [00:00, ?it/s]

In [4]:
# Load the document file into memory
def load_doc(filename):
  # Open file to read
   file = open(filename, 'r')
   text = file.read()
   file.close()
   return text

In [5]:
# get all images with their captions
def img_capt(filename):
    file = load_doc(filename)
    captions = file.split('\n')
    descriptions = {}

    for caption in captions[:-1]:
        parts = caption.split('\t')
        if len(parts) == 2:
            img, caption_text = parts
            img_key = img[:-2]
            if img_key not in descriptions:
                descriptions[img_key] = [caption_text]
            else:
                descriptions[img_key].append(caption_text)
        else:
            print(f"Skipping line: {caption} - does not contain delimiter '\\t'")
    
    return descriptions

In [6]:
def txt_clean(captions):
    table = str.maketrans('', '', string.punctuation)

    for img, caps in captions.items():
        for i, img_caption in enumerate(caps):
            img_caption = img_caption.replace("-", " ")  # Replace '-' with space
            descp = img_caption.split()
            # Uppercase to lowercase
            descp = [wrd.lower() for wrd in descp]
            # Remove punctuation from each token
            descp = [wrd.translate(table) for wrd in descp]
            # Remove hanging 's and 'a'
            descp = [wrd for wrd in descp if len(wrd) > 1]
            # Remove words containing numbers
            descp = [wrd for wrd in descp if wrd.isalpha()]
            # Convert back to string
            img_caption = ' '.join(descp)
            captions[img][i] = img_caption
    
    return captions


In [7]:
def txt_vocab(descriptions):
    # To build vocab of all unique words
    vocab = set()
    for key in descriptions.keys():
        [vocab.update(d.split()) for d in descriptions[key]]
    return vocab


In [8]:
def save_descriptions(descriptions, filename):
    lines = list()
    for key, desc_list in descriptions.items():
        for desc in desc_list:
            lines.append(key + '\t' + desc)
    data = "\n".join(lines)
    with open(filename, "w") as file:
        file.write(data)


In [9]:
# Set these path according to project folder in you system, 
dataset_text = "C:\\Users\\PRATYUSH\\Desktop\\caption\\Flickr8k_text"
dataset_images = "C:\\Users\\PRATYUSH\\Desktop\\caption\\Flickr8k_Dataset"
#to prepare our text data
filename = dataset_text + "/" + "Flickr8k.token.txt"
#loading the file that contains all data
#map them into descriptions dictionary 
descriptions = img_capt(filename)
print("Length of descriptions =" ,len(descriptions))
#cleaning the descriptions
clean_descriptions = txt_clean(descriptions)
#to build vocabulary
vocabulary = txt_vocab(clean_descriptions)
print("Length of vocabulary = ", len(vocabulary))
#saving all descriptions in one file
save_descriptions(clean_descriptions, "descriptions.txt")

Length of descriptions = 8092
Length of vocabulary =  8422


In [10]:
model = Xception( include_top=False, pooling='avg' )

In [None]:
def extract_features(directory):
    model = Xception(include_top=False, pooling='avg')
    features = {}

    for pic in tqdm(os.listdir(directory)):
        file = os.path.join(directory, pic)  # Use os.path.join for cross-platform compatibility
        image = Image.open(file)
        image = image.resize((299, 299))
        image = np.expand_dims(image, axis=0)
        # image = preprocess_input(image) 
        image = image / 127.5
        image = image - 1.0
        feature = model.predict(image)
        features[pic] = feature

    return features

# Specify the directory containing your dataset images
dataset_images = 'C:\\Users\\PRATYUSH\\Desktop\\caption\\Flickr8k_Dataset\\Flicker8k_Dataset'

# Extract features from images in the dataset
features = extract_features(dataset_images)
dump(features, open("features.p", "wb"))

# To directly load the features from the pickle file
features = load(open("features.p", "rb"))

  0%|          | 0/8091 [00:00<?, ?it/s]

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 144ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 140ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 141ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 162ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 139ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 160ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 145ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 154ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 153ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 128ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 127ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 138ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1

In [16]:
def load_doc(filename):
    with open(filename, 'r') as file:
        text = file.read()
    return text

def load_photos(filename):
    file = load_doc(filename)
    photos = file.split("\n")[:-1]  # Use \n for newline splitting
    return photos

def load_clean_descriptions(filename, photos):
    file = load_doc(filename)
    descriptions = {}
    
    for line in file.split("\n"):  # Use \n for newline splitting
        words = line.split()
        if len(words) < 1:
            continue
        image, image_caption = words[0], words[1:]
        if image in photos:
            if image not in descriptions:
                descriptions[image] = []
            desc = ' ' + " ".join(image_caption) + ' '
            descriptions[image].append(desc)
    
    return descriptions

def load_features(photos):
    all_features = load(open("features.p", "rb"))
    features = {k: all_features[k] for k in photos}
    return features


dataset_text = "C:\\Users\\PRATYUSH\\Desktop\\caption\\Flickr8k_text"# Define this path correctly
filename = os.path.join(dataset_text, "Flickr_8k.trainImages.txt")

# Load the data
train_imgs = load_photos(filename)
train_descriptions = load_clean_descriptions("descriptions.txt", train_imgs)
train_features = load_features(train_imgs)


In [36]:
# Convert dictionary to a clear list of descriptions
def dict_to_list(descriptions):
    all_desc = []
    for key in descriptions.keys():
        all_desc.extend(descriptions[key])
    return all_desc

# Create tokenizer class
# This will vectorize the text corpus
# Each integer will represent a token in the dictionary

def create_tokenizer(descriptions):
    desc_list = dict_to_list(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(desc_list)
    return tokenizer

# Give each word an index and store it in tokenizer.p pickle file
tokenizer = create_tokenizer(train_descriptions)
dump(tokenizer, open('tokenizer.p', 'wb'))

# Determine the size of the vocabulary
vocab_size = len(tokenizer.word_index) + 1
print(f"Vocabulary size: {vocab_size}")

# Calculate the maximum length of descriptions to decide the model structure parameters
def max_length(descriptions):
    desc_list = dict_to_list(descriptions)
    return max(len(d.split()) for d in desc_list)

max_len = max_length(train_descriptions)
print(f"Maximum length of description: {max_len}")


Vocabulary size: 7318
Maximum length of description: 33


In [70]:
from tensorflow.keras.utils import plot_model
def define_model(vocab_size, max_length):
    print(f"vocab_size: {vocab_size}, max_length: {max_length} (type: {type(max_length)})")  # Debug line

    # Features from the CNN model compressed from 2048 to 256 nodes
    inputs1 = Input(shape=(2048,))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)
    
    # LSTM sequence model
    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)
    
    # Merging both models
    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)
    
    # Merge it [image, seq] [word]
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss='categorical_crossentropy', optimizer='adam')
    
    # Summarize model
    #print(model.summary())
    #plot_model(model, to_file='model.png', show_shapes=True)
    
    return model

# Ensure max_length is correctly calculated and is an integer
max_length = 33  # Assuming this is correctly defined elsewhere
print(f"max_length: {max_length} (type: {type(max_length)})")  # Debug line

# Define and compile the model
model = define_model(vocab_size, max_length)


max_length: 33 (type: <class 'int'>)
vocab_size: 7318, max_length: 33 (type: <class 'int'>)


In [67]:
# Creating a directory named 'models' to save our models
os.makedirs("models", exist_ok=True)

In [71]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical

# Define vocab_size and max_length
vocab_size = 7318  # Example value, change this to your actual vocabulary size
max_length = 33   # Example value, change this to your actual max sequence length

# Function to create sequences
def create_sequences(tokenizer, max_length, desc_list, feature):
    x_1, x_2, y = [], [], []
    for desc in desc_list:
        # Encode the sequence
        seq = tokenizer.texts_to_sequences([desc])[0]
        # Create X, y pairs
        for i in range(1, len(seq)):
            in_seq, out_seq = seq[:i], seq[i]
            in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
            out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
            x_1.append(feature)
            x_2.append(in_seq)
            y.append(out_seq)
    return np.array(x_1), np.array(x_2), np.array(y)

# Data generator function used by model.fit()
def data_generator(descriptions, features, tokenizer, max_length):
    while True:
        for key, description_list in descriptions.items():
            feature = features[key][0]
            inp_image, inp_seq, op_word = create_sequences(tokenizer, max_length, description_list, feature)
            for i in range(len(inp_image)):
                yield (tf.convert_to_tensor(inp_image[i:i+1], dtype=tf.float32),  # Add batch dimension
                        tf.convert_to_tensor(inp_seq[i:i+1], dtype=tf.float32)), \
                      tf.convert_to_tensor(op_word[i:i+1], dtype=tf.float32)  # Add batch dimension

# Define the output signature for the dataset
output_signature = (
    (tf.TensorSpec(shape=(1, 2048), dtype=tf.float32),  # Batch dimension added
     tf.TensorSpec(shape=(1, max_length), dtype=tf.float32)),  # Batch dimension added
    tf.TensorSpec(shape=(1, vocab_size), dtype=tf.float32)  # Batch dimension added
)

# Create the dataset from the generator
dataset = tf.data.Dataset.from_generator(
    lambda: data_generator(train_descriptions, train_features, tokenizer, max_length),
    output_signature=output_signature
)

# Example usage to check the shape of the input and output for your model
try:
    for data in dataset.take(1):
        ([a, b], c) = data
        print("inp_image shape:", a.shape)  # Expected shape: (1, 2048)
        print("inp_seq shape:", b.shape)    # Expected shape: (1, 32)
        print("op_word shape:", c.shape)    # Expected shape: (1, 7577)
except Exception as e:
    print("Error:", e)

# Training the model
epochs = 10  # Example value, change this to the actual number of epochs you want
steps = len(train_descriptions)  # Example value, this should be the number of steps per epoch

try:
    history = model.fit(dataset, epochs=epochs, steps_per_epoch=steps, verbose=1)
    print("Training history:", history.history)  # Debug output of training history
except Exception as e:
    print("Error during training:", e)


inp_image shape: (1, 2048)
inp_seq shape: (1, 33)
op_word shape: (1, 7318)
Epoch 1/10
[1m6000/6000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m478s[0m 79ms/step - loss: 6.5728
Epoch 2/10
[1m6000/6000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m483s[0m 81ms/step - loss: 6.1072
Epoch 3/10
[1m6000/6000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m480s[0m 80ms/step - loss: 6.1315
Epoch 4/10
[1m6000/6000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m482s[0m 80ms/step - loss: 6.0289
Epoch 5/10
[1m6000/6000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m483s[0m 81ms/step - loss: 6.1361
Epoch 6/10
[1m6000/6000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m488s[0m 81ms/step - loss: 6.1931
Epoch 7/10
[1m6000/6000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m490s[0m 82ms/step - loss: 6.0643
Epoch 8/10
[1m6000/6000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m485s[0m 81ms/step - loss: 5.7558
Epoch 9/10
[1m6000/6000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m483s

In [72]:
model.save("models/my_model.h5")  # Save the model in HDF5 format




In [11]:
import tensorflow as tf

def custom_not_equal(x):
    return tf.cast(tf.not_equal(x[0], x[1]), tf.float32)

# Define the model with the custom object
inputs = tf.keras.Input(shape=(2048,))
x = tf.keras.layers.Dense(256, activation='relu')(inputs)
x = tf.keras.layers.Lambda(custom_not_equal)(x)  # Using the custom function
outputs = tf.keras.layers.Dense(7318, activation='softmax')(x)
model = tf.keras.Model(inputs, outputs)

# Save the model in Keras format
model.save(r'C:\Users\PRATYUSH\Desktop\caption\models\my_model.keras')

