In [None]:
!pip install tensorflow==2.3
!pip install keras==2.4.3



In [None]:
!pip install tensorflow-gpu



In [None]:
import tensorflow as tf
tf.__version__

'2.3.0'

In [None]:
import keras
keras.__version__

'2.4.3'

In [None]:
from numpy import array
from pickle import load
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Embedding
from tensorflow.keras.layers import Dropout
from keras.layers.merge import add
from tensorflow.keras.callbacks import ModelCheckpoint

In [None]:

# load doc into memory
def load_doc(filename):
	# open the file as read only
	file = open(filename, 'r')
	# read all text
	text = file.read()
	# close the file
	file.close()
	return text

# load a pre-defined list of photo identifiers
def load_set(filename):
	doc = load_doc(filename)
	dataset = list()
	# process line by line
	for line in doc.split('\n'):
		# skip empty lines
		if len(line) < 1:
			continue
		# get the image identifier
		identifier = line.split('.')[0]
		dataset.append(identifier)
	return set(dataset)

# load clean descriptions into memory
def load_clean_descriptions(filename, dataset):
	# load document
	doc = load_doc(filename)
	descriptions = dict()
	for line in doc.split('\n'):
		# split line by white space
		tokens = line.split()
		# split id from description
		image_id, image_desc = tokens[0], tokens[1:]
		# skip images not in the set
		if image_id in dataset:
			# create list
			if image_id not in descriptions:
				descriptions[image_id] = list()
			# wrap description in tokens
			desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
			# store
			descriptions[image_id].append(desc)
	return descriptions

# load photo features
def load_photo_features(filename, dataset):
	# load all features
	all_features = load(open(filename, 'rb'))
	# filter features
	features = {k: all_features[k] for k in dataset}
	return features

In [None]:
# covert a dictionary of clean descriptions to a list of descriptions
def to_lines(descriptions):
	all_desc = list()
	for key in descriptions.keys():
		[all_desc.append(d) for d in descriptions[key]]
	return all_desc

# fit a tokenizer given caption descriptions
def create_tokenizer(descriptions):
	lines = to_lines(descriptions)
	tokenizer = Tokenizer()
	tokenizer.fit_on_texts(lines)
	return tokenizer

# calculate the length of the description with the most words
def find_max_length(descriptions):
	lines = to_lines(descriptions)
	return max(len(d.split()) for d in lines)

In [None]:
def define_model(vocab_size, max_length):
    inputs1 = Input(shape=(1000,))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)

    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)
    decoder1 = add([fe2, se3])
    drop_decoder1 = Dropout(0.5)(decoder1)
    decoder2 = Dense(256, activation='relu')(drop_decoder1)
    drop_decoder2 = Dropout(0.5)(decoder2)
    outputs = Dense(vocab_size, activation='softmax')(drop_decoder2)
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    opt = keras.optimizers.Adam(learning_rate=0.01)
    model.compile(loss='categorical_crossentropy', optimizer=opt)
    print(model.summary())
    plot_model(model, to_file='model.png', show_shapes=True)
    return model

In [None]:
def create_sequences(tokenizer,max_length,desc_list,photo,vocab_size):
    X1,X2,y = list(),list(),list()
    for desc in desc_list:
        seq = tokenizer.texts_to_sequences([desc])
        for i in range(1, len(seq[0])):
            in_seq, out_seq = seq[0][:i], seq[0][i]
            # print(in_seq,out_seq)
            in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
            out_seq = to_categorical([out_seq], num_classes=vocab_size)
            X1.append(photo)
            X2.append(in_seq)
            y.append(out_seq.flatten())
    return array(X1), array(X2), array(y)

In [None]:
# data generator, intended to be used in a call to model.fit_generator()
def data_generator(descriptions, photos, tokenizer, max_length, vocab_size):
	# loop for ever over images
	while 1:
		for key, desc_list in descriptions.items():
			# retrieve the photo feature
			photo = photos[key][0]
			in_img, in_seq, out_word = create_sequences(tokenizer, max_length, desc_list, photo, vocab_size)
			yield [in_img, in_seq],out_word

In [None]:
# train dataset
# load training dataset (6K)
filename = '/content/drive/My Drive/College/IRS Innovative/flicker8k-dataset/Flickr8k_text/Flickr_8k.trainImages.txt'
train = load_set(filename)
train_steps = len(train)
print('Dataset: %d' % len(train))
# descriptions
train_descriptions = load_clean_descriptions('/content/drive/My Drive/College/IRS Innovative/flicker8k-dataset/Flickr8k_text/descriptions.txt', train)
print('Descriptions: train=%d' % len(train_descriptions))
# photo features
train_features = load_photo_features('/content/drive/My Drive/College/IRS Innovative/flicker8k-dataset/Flickr8k_Dataset/2048_features_resnet.pkl', train)
print('Photos: train=%d' % len(train_features))

Dataset: 6000
Descriptions: train=6000
Photos: train=6000


In [None]:
# dev dataset

# load test set
filename = '/content/drive/My Drive/College/IRS Innovative/flicker8k-dataset/Flickr8k_text/Flickr_8k.devImages.txt'
test = load_set(filename)
test_steps = len(test)
print('Dataset: %d' % len(test))
# descriptions
test_descriptions = load_clean_descriptions('/content/drive/My Drive/College/IRS Innovative/flicker8k-dataset/Flickr8k_text/descriptions.txt', test)
print('Descriptions: test=%d' % len(test_descriptions))
# photo features
test_features = load_photo_features('/content/drive/My Drive/College/IRS Innovative/flicker8k-dataset/Flickr8k_Dataset/2048_features_resnet.pkl', test)
print('Photos: test=%d' % len(test_features))

Dataset: 1000
Descriptions: test=1000
Photos: test=1000


In [None]:
# prepare tokenizer
tokenizer = create_tokenizer(train_descriptions)
vocab_size = len(tokenizer.word_index) + 1
print('Vocabulary Size: %d' % vocab_size)
# determine the maximum sequence length
max_length = find_max_length(train_descriptions)
print('Description Length: %d' % max_length)

Vocabulary Size: 7579
Description Length: 34


In [None]:
train_generator = data_generator(train_descriptions,train_features,tokenizer,max_length,vocab_size)
test_generator = data_generator(test_descriptions,test_features,tokenizer,max_length,vocab_size)

In [None]:
# define the model
model = define_model(vocab_size, max_length)
filepath = '/content/drive/My Drive/College/IRS Innovative/2048_ResNet50/model_extra-ep{epoch:03d}-loss{loss:.3f}-val_loss{val_loss:.3f}.h5'
my_callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=10),
    tf.keras.callbacks.ModelCheckpoint(filepath=filepath,monitor='val_loss', verbose=1, save_best_only=True, mode='min'),
    tf.keras.callbacks.TensorBoard(log_dir='./logs'),
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.3,patience=3, min_lr=0.00001,mode='min')
]

Model: "functional_3"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_4 (InputLayer)            [(None, 34)]         0                                            
__________________________________________________________________________________________________
input_3 (InputLayer)            [(None, 2048)]       0                                            
__________________________________________________________________________________________________
embedding_1 (Embedding)         (None, 34, 256)      1940224     input_4[0][0]                    
__________________________________________________________________________________________________
dropout_6 (Dropout)             (None, 2048)         0           input_3[0][0]                    
_______________________________________________________________________________________

In [None]:
# fit model
history=model.fit(train_generator, epochs=50,steps_per_epoch=train_steps, verbose=1, callbacks=my_callbacks, validation_data=test_generator,validation_steps=test_steps)

In [None]:
import matplotlib.pyplot as plt

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
plt.savefig('/content/drive/My Drive/College/IRS Innovative/2048_ResNet50/result_graph.png')