# Captioner

## Prepare photo data

In [1]:
import os
import pickle
from keras.applications.vgg16 import VGG16, preprocess_input
from keras.preprocessing.image import load_img, img_to_array
from keras.models import Model

Using TensorFlow backend.


In [None]:
# extract features from each photo in the directory
def extract_features(directory):
    model = VGG16()
    model.layers.pop()
    model = Model(inputs=model.inputs, outputs=model.layers[-1].output)
    model.summary()
    
    # extract features from each photo
    features = dict()
    
    for name in os.listdir(directory):
        filename = os.path.join(directory, name)
        image = load_img(filename, target_size=(224, 224))
        image = img_to_array(image)
        image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
        image = preprocess_input(image)
        
        # get features
        feature = model.predict(image, verbose=0)        
        image_id = name.split('.')[0]
        features[image_id] = feature
        
        print('> %s' % name)
        
    return features

# extract features from all images
directory = './Flicker8k_dataset'
features = extract_features(directory)
print('Extracted features: %d' % len(features))
pickle.dump(features, open('features.pkl', 'wb'))

## Prepare text data

In [8]:
import string
import re

In [9]:
# load the document into memory
def load_doc(filename):
    file = open(filename, 'r')
    text = file.read()
    file.close()
    
    return text

# extract descriptions for images
def load_descriptions(doc):
    mapping = dict()
    
    for line in doc.split('\n'):
        tokens = line.split()
        if len(line) < 2:
            continue
            
        # take the first token as the image ID; the rest as the description
        image_id, image_desc = tokens[0], tokens[1:]
        
        # remove filename from image ID
        image_id = image_id.split('.')[0]
        
        # convert description tokens back to the string
        image_desc = ' '.join(image_desc)
        
        # create the list, if necessary
        if image_id not in mapping:
            mapping[image_id] = list()
            
        mapping[image_id].append(image_desc)
        
    return mapping

def clean_descriptions(descriptions):
    # prepare regex for character filtering
    re_punc = re.compile('[%s]' % re.escape(string.punctuation))
    
    for _, desc_list in descriptions.items():
        for i in range(len(desc_list)):
            desc = desc_list[i]
            
            # tokenize
            desc = desc.split()
            desc = [word.lower() for word in desc]
            desc = [re_punc.sub('', w) for w in desc]
            desc = [word for word in desc if len(word) > 1]
            desc =  [word for word in desc if word.isalpha()]
            desc_list[i] = ' '.join(desc)
            
# convert the loaded descriptions into a vocabulary of words
def to_vocabulary(descriptions):
    # build a list of all description strings
    all_desc = set()
    for key in descriptions.keys():
        [all_desc.update(d.split()) for d in descriptions[key]]
        
    return all_desc

# save descriptions to file, one per line
def save_descriptions(descriptions, filename):
    lines = list()
    
    for key, desc_list in descriptions.items():
        for desc in desc_list:
            lines.append(key +  ' ' + desc)
    
    data = '\n'.join(lines)
    file = open(filename, 'w')
    file.write(data)
    file.close()
    
    
''' EXCECUTION CODE '''
filename = 'Flickr8k_text/Flickr8k.token.txt'

# load descriptions
doc = load_doc(filename)

# parse descriptions
descriptions = load_descriptions(doc)
print('LOADED: %d' % len(descriptions))

# clean descriptions
clean_descriptions(descriptions)

# summarize vocabulary
vocabulary = to_vocabulary(descriptions)
print('VOCAB SIZE: %d' % len(vocabulary))

# save to file
save_descriptions(descriptions, 'descriptions.txt')

LOADED: 8092
VOCAB SIZE: 8763


## Develop deep learning model

In [12]:
def load_doc(filename):
    file = open(filename, 'r')
    text = file.read()
    file.close()
    
    return text

def load_set(filename):
    doc = load_doc(filename)
    dataset = list()
    
    for line in doc.split('\n'):
        # skip empty lines
        if len(line) < 1:
            continue
            
        # get the image ID
        identifier = line.split('.')[0]
        dataset.append(identifier)
        
    return set(dataset)

# load clean descriptions into memory
def load_clean_descriptions(filename, dataset):
    doc = load_doc(filename)
    descriptions = dict()
    
    for line in doc.split('\n'):
        tokens = line.split()
        image_id, image_desc = tokens[0], tokens[1:]
        
        if image_id in dataset:
            # create list
            if image_id not in descriptions:
                descriptions[image_id] = list()
                
            # wrap descriptions in tokens
            desc = 'STARTSEQ ' + ' '.join(image_desc) + ' ENDSEQ'
            descriptions[image_id].append(desc)
            
    return descriptions

# load photo features
def load_photo_features(filename, dataset):
    all_features = pickle.load(open(filename, 'rb'))
    
    # filter features
    features = {k: all_features[k] for k in dataset}
    
    return features

''' EXECUTION CODE '''
# training dataset
filename = 'Flickr8k_text/Flickr_8k.trainImages.txt'
train = load_set(filename)
print('DATASET: %d' % len(train))

# descriptions
train_descriptions = load_clean_descriptions('descriptions.txt', train)
print('DESCRIPTIONS: train=%d' % len(train_descriptions))

# photo features
train_features = load_photo_features('features.pkl', train)
print('PHOTOS: train=%d' % len(train_features))

DATASET: 6000
DESCRIPTIONS: train=6000
PHOTOS: train=6000


### Define the neural network model
- Photo feature extractor
- Sequence processor
- Decoder

In [27]:
import numpy as np
import pickle
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.models import Model
from keras.layers import Input, Dense, LSTM, Embedding, Dropout
from keras.layers.merge import add
from keras.callbacks import ModelCheckpoint

In [28]:
def load_doc(filename):
    file = open(filename, 'r')
    text = file.read()
    file.close()
    
    return text

def load_set(filename):
    doc = load_doc(filename)
    dataset = list()
    
    for line in doc.split('\n'):
        if len(line) < 1:
            continue
            
        identifier = line.split('.')[0]
        dataset.append(identifier)
        
    return set(dataset)

# load clean descriptions into memory
def load_clean_descriptions(filename, dataset):
    doc = load_doc(filename)
    descriptions = dict()
    
    for line in doc.split('\n'):
        tokens = line.split()
        image_id, image_desc = tokens[0], tokens[1:]
        
        if image_id in dataset:
            # create list
            if image_id not in descriptions:
                descriptions[image_id] = list()
                
            desc = 'STARTSEQ ' + ' '.join(image_desc) + ' ENDSEQ'
            descriptions[image_id].append(desc)
            
    return descriptions

# load photo features 
def load_photo_features(filename, dataset):
    all_features = pickle.load(open(filename, 'rb'))
    features = {k: all_features[k] for k in dataset}
    
    return features

# convert a dictionary of clean descriptions to a list of descriptions
def to_lines(descriptions):
    all_desc = list()
    
    for key in descriptions.keys():
        [all_desc.append(d) for d in descriptions[key]]
        
    return all_desc

# fit a tokenizer, given a caption description
def create_tokenizer(descriptions):
    lines = to_lines(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(lines)
    
    return tokenizer

# compute the length of the description with the most words
def max_length(descriptions):
    lines = to_lines(descriptions)
    
    return max(len(d.split()) for d in lines)

# create sequences of images, input sequences, and output words for an image
def create_sequences(tokenizer, max_length, descriptions, photos, vocab_size):
    X1, X2, y = list(), list(), list()
    
    # walk though each image identifier
    for key, desc_list in descriptions.items():
        # walk through each description for the image
        for desc in desc_list:
            # encode the sequence
            seq = tokenizer.texts_to_sequences([desc])[0]
            
            # split one sequence into multiple X, y pairs
            for i in range(1, len(seq)):
                # split into input and output pair
                in_seq, out_seq = seq[:i], seq[i]
                
                # pad input sequence
                in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
                
                # encode output sequence
                out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
                
                X1.append(photos[key][0])
                X2.append(in_seq)
                y.append(out_seq)
                
    return np.array(X1), np.array(X2), np.array(y)

# define the captioning neural network model
def define_model(vocab_size, max_length):
    ''' feature extraction model (image) '''
    inputs1 = Input(shape=(4096, ))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)
    
    ''' sequence model (RNN) '''
    inputs2 = Input(shape=(max_length, ))
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)
    
    ''' decoder model (caption) '''
    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)
    
    # tie everything together
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss='categorical_crossentropy', optimizer='adam')
    model.summary()
    
    return model

In [None]:
''' TRAINING SET '''

filename = 'Flickr8k_text/Flickr_8k.trainImages.txt'
train = load_set(filename)
print('DATASET: %d' % len(train))

# descriptions
train_descriptions = load_clean_descriptions('descriptions.txt', train)
print('DESCRIPTIONS: train=%d' % len(train_descriptions))

# photo features
train_features = load_photo_features('features.pkl', train)
print('PHOTOS: train=%d' % len(train_features))

# prepare tokenizer
tokenizer = create_tokenizer(train_descriptions)
vocab_size = len(tokenizer.word_index) + 1
print('VOCAB SIZE: %d' % vocab_size)

# determine the maximum sequence length
max_length = max_length(train_descriptions)
print('DESCRIPTION LENGTH: %d' % max_length)

# prepare sequences
X1_train, X2_train, y_train = create_sequences(tokenizer, max_length, train_descriptions, train_features, vocab_size)

''' TESTING SET '''
filename = 'Flickr8k_text/Flickr_8k.devImages.txt'
test = load_set(filename)
print('DATASET: %d' % len(test))

# descriptions
test_descriptions = load_clean_descriptions('descriptions.txt', test)
print('DESCRIPTIONS: test=%d' % len(test_descriptions))

# photo features
test_features = load_photo_features('features.pkl', test)
print('PHOTOS: test=%d' % len(test_features))

# prepare sequences 
X1_test, X2_test, y_test = create_sequences(tokenizer, max_length, test_descriptions, test_features, vocab_size)

# make neural network
model = define_model(vocab_size, max_length)

# define checkpoint callback
checkpoint = ModelCheckpoint('captioner_model.h5', monitor='val_loss', verbose=1, save_best_only=True, mode='min')

# fit the model
model.fit(x=[X1_train, X2_train], y=y_train, epochs=20, verbose=1, callbacks=[checkpoint], validation_data=([X1_test, X2_test], y_test))