In [None]:
# Text classification model based on BERT and LSTM using UPMC-food-101 dataset

import os
import sys
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.keras import layers
from tensorflow.keras import callbacks
from tensorflow.keras import optimizers
from tensorflow.keras import utils

In [None]:
colums = ['image_path', 'text', 'food']
training_data = pd.read_csv('../data/upmc-food-101/train.csv', names=colums, sep=',', index_column=['image_path'])
testing_data = pd.read_csv('../data/upmc-food-101/test.csv', names=colums, sep=',', index_column=['image_path'])

# sort values by image_path
training_data = training_data.sort_values(by=['image_path'])
testing_data = testing_data.sort_values(by=['image_path'])

In [None]:
# data shape
print('Training data shape:', training_data.shape)
print('Testing data shape:', testing_data.shape)

In [None]:
import re

# clean data function
def clean_data(data):
    # remove html tags
    data = remove_tags(data)
    # remove punctuation
    data = re.sub(r'[^\w\s]', '', data)
    # remove numbers
    data = re.sub(r'\d+', '', data)
    # remove multiple spaces
    data = re.sub(r'\s+', ' ', data)
    # lower case
    data = data.lower()
    return data

# remove tags
tags = re.compile(r'<[^>]+>')
def remove_tags(data):
    return tags.sub('', data)

# vectorize data function
vectorize_data = np.vectorize(clean_data)

In [None]:
# get number of classes
no_classes = training_data.food.nunique()
print('Number of classes:', no_classes)

In [None]:
# import LabelEncoder
from sklearn.preprocessing import LabelEncoder

encoder = LabelEncoder()
processed_training_data = vectorize_data(training_data.text.values)
processed_testing_data = vectorize_data(testing_data.text.values)

encoded_training_labels = encoder.fit_transform(training_data.food.values)
encoded_testing_labels = encoder.fit_transform(testing_data.food.values)

training_labels = utils.to_categorical(encoded_training_labels, no_classes)
testing_labels = utils.to_categorical(encoded_testing_labels, no_classes)

print("Processed text sample:", processed_training_data[0])
print("Shape of train labels:", training_labels.shape)


In [None]:
import bert

# Import the BERT BASE model from Tensorflow HUB (layer, vocab_file and tokenizer)
bert_layer = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/1",
                            trainable=True)
vocab_file = bert_layer.resolved_object.vocab_file.asset_path.numpy()
do_lower_case = bert_layer.resolved_object.do_lower_case.numpy()
tokenizer = bert.bert_tokenization.FullTokenizer(vocab_file, do_lower_case)



In [None]:
# Preprocessing of texts according to BERT
def get_masks(text, max_seq_length):
    """Mask for padding"""
    tokens = tokenizer.tokenize(text)
    tokens = ["[CLS]"] + tokens + ["[SEP]"]
    if len(tokens) > max_seq_length:
        tokens = tokens[:max_seq_length]
    return np.asarray([1] * len(tokens) + [0] * (max_seq_length - len(tokens)), dtype=np.int32)
get_masks_vector = np.vectorize(get_masks)

def get_segments(text, max_seq_length):
    """Segments: 0 for the first sequence, 1 for the second"""
    tokens = tokenizer.tokenize(text)
    tokens = ["[CLS]"] + tokens + ["[SEP]"]
    if len(tokens) > max_seq_length:
        tokens = tokens[:max_seq_length]

    segments_ids = []
    current_segment_id = 0
    for token in tokens:
        segments_ids.append(current_segment_id)
        if token == "[SEP]":
            current_segment_id = 1
    return np.asarray(segments_ids + [0] * (max_seq_length - len(tokens)), dtype=np.int32)
get_segments_vector = np.vectorize(get_segments)

def get_ids(text, tokenizer, max_seq_length):
    """Token ids from 0 to vocab_size"""
    tokens = tokenizer.tokenize(text)
    tokens = ["[CLS]"] + tokens + ["[SEP]"]
    if len(tokens) > max_seq_length:
        tokens = tokens[:max_seq_length]

    token_ids = tokenizer.convert_tokens_to_ids(tokens)
    return np.asarray(token_ids + [0] * (max_seq_length - len(token_ids)), dtype=np.int32)
get_ids_vector = np.vectorize(get_ids)

def prepare(text_array, tokenizer, max_seq_length=128):
    """Prepare the text samples for BERT"""
    input_ids = get_ids_vector(text_array,tokenizer, max_seq_length).squeeze()
    input_masks = get_masks_vector(text_array, max_seq_length).squeeze()
    input_segments = get_segments_vector(text_array, max_seq_length).squeeze()
    return [input_ids, input_masks, input_segments]

In [None]:
# set max sequence length according to data
max_seq_length = 1313

input_ids_train, input_masks_train, input_segments_train = prepare(processed_training_data, tokenizer, max_seq_length)
input_ids_test, input_masks_test, input_segments_test = prepare(processed_testing_data, tokenizer, max_seq_length)

In [None]:
input_word_ids = layers.Input(shape=(max_seq_length,), dtype=tf.int32, name='input_word_ids')
input_mask = layers.Input(shape=(max_seq_length,), dtype=tf.int32, name='input_mask')
segment_ids = layers.Input(shape=(max_seq_length,), dtype=tf.int32, name='segment_ids')
den_output, seq_output = bert_layer([input_word_ids, input_mask, segment_ids])

In [None]:
# classification model
X = layers.LSTM(units=128, return_sequences=True)(seq_output)
X = layers.Dropout(0.5)(X)
X = layers.Dense(256, activation='relu')(X)
X = layers.Dropout(0.5)(X)
output = layers.Dense(no_classes, activation='softmax')(X)

model = tf.keras.models.Model(inputs=[input_word_ids, input_mask, segment_ids], outputs=output)

In [None]:
# Adam optimizer with learning rate of 0.001
model.compile(optimizer=tf.keras.optimizers.Adam(0.001),
                loss='categorical_crossentropy',
                metrics=['accuracy'])

In [None]:
model.summary()

In [None]:
# early stopping
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=3, restore_best_weights=True)

In [None]:
# set callback for saving the model, log and early stopping conditions
checkpoint_path = "training_1/cp.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)
checkpoint = callbacks.ModelCheckpoint(checkpoint_path, save_weights_only=True, verbose=1)
logger = callbacks.CSVLogger('training_1/log.csv')
early_stopping = callbacks.EarlyStopping(monitor='val_accuracy', patience=3, restore_best_weights=True)

In [None]:
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_accuracy', factor=0.5, patience=2, min_lr=0.00001)

In [None]:
history = model.fit([input_ids_train, input_masks_train, input_segments_train], training_labels,
                    epochs=10,
                    validation_data=([input_ids_test, input_masks_test, input_segments_test], testing_labels),
                    callbacks=[early_stopping, checkpoint, logger, reduce_lr])
                    

In [None]:
# loading the log file
log_df = pd.read_csv('training_1/log.csv')


In [None]:
# training and Testing accuracy
training_accuracy = log_df['acc'].values
testing_accuracy = log_df['val_acc'].values

# plotting the accuracy
plt.plot(training_accuracy, label='Training Accuracy')
plt.plot(testing_accuracy, label='Testing Accuracy')
plt.legend()
plt.show()


In [None]:
# Training and Test loss
training_loss = log_df['loss'].values
testing_loss = log_df['val_loss'].values

# plotting the loss
plt.plot(training_loss, label='Training Loss')
plt.plot(testing_loss, label='Testing Loss')
plt.legend()
plt.show()

In [None]:
# model evaluation
model.evaluate([input_ids_test, input_masks_test, input_segments_test], testing_labels, batch_size=512)