#BERT Text Classification

In [None]:
!pip install sentencepiece

In [None]:
#Bert tokenization class
!wget --quiet https://raw.githubusercontent.com/tensorflow/models/master/official/nlp/bert/tokenization.py

In [None]:
#importing libraries
import numpy as np
import pandas as pd
import tensorflow as tf
import keras
import tokenization
import tensorflow_hub as hub
import logging
logging.basicConfig(level=logging.INFO)

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns

In [None]:
#model with adam optimizer
adam = keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False)
sgd = keras.optimizers.SGD(learning_rate=0.001, decay=1e-6, momentum=0.9, nesterov=True)
adadelta = keras.optimizers.Adadelta(learning_rate=1.0, rho=0.9, epsilon=None, decay=0.0)

In [None]:
#Builiding BERT layer
module_url = 'https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/2'
bert_layer = hub.KerasLayer(module_url, trainable=True)

In [None]:
#Reading train.jsonl
train_df = pd.read_json('/content/drive/MyDrive/data/train.jsonl', lines=True)

In [None]:
#Readinf dev_seen.jsonl
dev_seen_df = pd.read_json('/content/drive/MyDrive/data/dev_seen.jsonl', lines=True)

#Concatenating train_df and dev_seen_df
training_data = pd.concat([train_df, dev_seen_df])

In [None]:
#Validation Data
dev_df = pd.read_json('/content/drive/MyDrive/data/dev_unseen.jsonl', lines=True)

In [None]:
#Splitting the data into training and testing
df_train, df_test = train_test_split(
    training_data,
    test_size=0.05,
    random_state=0
)

In [None]:
vocab_file = bert_layer.resolved_object.vocab_file.asset_path.numpy()
do_lower_case = bert_layer.resolved_object.do_lower_case.numpy()
tokenizer = tokenization.FullTokenizer(vocab_file, do_lower_case)

In [None]:
#Encoding the text(preprocessing)
def bert_encode(texts, tokenizer, max_len=512):
    all_tokens = []
    all_masks = []
    all_segments = []
    
    for text in texts:
        text = tokenizer.tokenize(text)
            
        text = text[:max_len-2]
        input_sequence = ["[CLS]"] + text + ["[SEP]"]
        pad_len = max_len - len(input_sequence)
        
        tokens = tokenizer.convert_tokens_to_ids(input_sequence) + [0] * pad_len
        pad_masks = [1] * len(input_sequence) + [0] * pad_len
        segment_ids = [0] * max_len
        
        all_tokens.append(tokens)
        all_masks.append(pad_masks)
        all_segments.append(segment_ids)
    
    return np.array(all_tokens), np.array(all_masks), np.array(all_segments)

In [None]:
#defining the model
def build_model(bert_layer, max_len=512):
    input_word_ids = tf.keras.Input(shape=(max_len,), dtype=tf.int32, name="input_word_ids")
    input_mask = tf.keras.Input(shape=(max_len,), dtype=tf.int32, name="input_mask")
    segment_ids = tf.keras.Input(shape=(max_len,), dtype=tf.int32, name="segment_ids")

    pooled_output, sequence_output = bert_layer([input_word_ids, input_mask, segment_ids])
    clf_output = sequence_output[:, 0, :]
    net = tf.keras.layers.Dropout(0.2)(clf_output)
    net = tf.keras.layers.Dense(32, activation='relu')(net)
    out = tf.keras.layers.Dense(2, activation='sigmoid')(net)
    
    model = tf.keras.models.Model(inputs=[input_word_ids, input_mask, segment_ids], outputs=out)
    model.compile(tf.keras.optimizers.Adam(lr=1e-5), loss='binary_crossentropy', metrics=['accuracy'])
    
    return model

In [None]:
train_input = bert_encode(df_train.text.values, tokenizer, max_len=100)
test_input = bert_encode(df_test.text.values, tokenizer, max_len=100)
traiin_labels = tf.keras.utils.to_categorical(df_train.label.values, num_classes=2)
test_labels =  tf.keras.utils.to_categorical(df_test.label.values, num_classes=2)


dev_input = bert_encode(dev_df.text.values, tokenizer, max_len=100)
dev_labels = tf.keras.utils.to_categorical(dev_df.label.values, num_classes=2)

In [None]:
text_model = build_model(bert_layer, max_len=100)
text_model.summary()

In [None]:
#run model
checkpoint = tf.keras.callbacks.ModelCheckpoint('model.h5', monitor='val_accuracy', save_best_only=True, verbose=1)
earlystopping = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=5, verbose=1)

train_history = text_model.fit(
    train_input, traiin_labels, 
    validation_split=0.2,
    epochs=50,
    callbacks=[checkpoint, earlystopping],
    batch_size=32,
    verbose=1
    )

In [None]:
#Predict
text_model.load_weights('model.h5')
test_pred = text_model.predict_generator(test_input, steps=450)

test_pred = np.argmax(test_pred, axis=1)

In [None]:
import matplotlib.pyplot as plt

In [None]:
#training validation accuracy graph
plt.plot(train_history.history['accuracy'], label='training acc')
plt.plot(train_history.history['val_accuracy'], label='validation acc')

plt.title('Training and validation accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epochs')
plt.legend()
plt.show()

In [None]:
print(classification_report(df_test.label, test_pred, target_names=['Non-Offensive(0)','Offensive(1)']))

In [None]:
#Confusion Matrix

def show_confusion_matrix(confusion_matrix):
  hmap = sns.heatmap(confusion_matrix, annot=True, fmt="d", cmap="Blues")
  hmap.yaxis.set_ticklabels(hmap.yaxis.get_ticklabels(), rotation=0, ha='right')
  hmap.xaxis.set_ticklabels(hmap.xaxis.get_ticklabels(), rotation=30, ha='right')
  plt.ylabel('True sentiment')
  plt.xlabel('Predicted sentiment');
cm = confusion_matrix(df_test.label, test_pred)
df_cm = pd.DataFrame(cm, index=['Non-Offensive(0)','Offensive(1)'], columns=['Non-Offensive(0)','Offensive(1)'])
show_confusion_matrix(df_cm)

#VGG16 image model

In [None]:
from keras.applications.vgg16 import VGG16, preprocess_input
from keras.preprocessing.image import ImageDataGenerator, array_to_img
from keras.models import Model, Sequential
from keras.layers import Conv2D, MaxPooling2D, Activation, Dropout, Flatten, Dense
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.optimizers import SGD
from keras import backend as K
import pandas as pd
import numpy as np

from keras.utils.vis_utils import plot_model
from IPython.display import display
import matplotlib.pyplot as plt

In [None]:
#image dimensions
img_width, img_height = 224,224

In [None]:
train_data_dir = "/content/drive/MyDrive/data/train_data"
validation_data_dir = "/content/drive/MyDrive/data/dev_seen_data"
# nb_train_samples = 2000
# nb_validation_samples = 100
# epochs = 50
batch_size = 16

In [None]:
#image data is represented in three dim-array where first channel represents the colour channels:[channels][rows][columns]
if K.image_data_format() == 'channels_first':
  input_shape = (3, img_width, img_height)
else:
  input_shape = (img_width, img_height, 3)

In [None]:
#Wrapper for preprocess_input() to make it compatible to use with ImageDataGenerator's preprocessing_function
def preprocess_vgg(x):
  X = np.expand_dims(x, axis=0)
  X = preprocess_input(X)
  return X[0]

In [None]:
#Intializing VGG16 with Imagenet weights
vgg16 = VGG16(weights='imagenet')

In [None]:
x = vgg16.get_layer('fc2').output
prediction = Dense(2, activation='softmax', name='predictions')(x)

img_model = Model(inputs=vgg16.input, outputs=prediction)

In [None]:
#freezing all the layers except bottlenecj layer for fine tuning
for layer in img_model.layers:
  if layer.name in ['predictions']:
    continue
  layer.trainable = False

In [None]:
df = pd.DataFrame(([layer.name, layer.trainable] for layer in img_model.layers), columns=['layer','trainable'])

In [None]:
train_datagen = ImageDataGenerator(preprocessing_function=preprocess_vgg,
                                   rotation_range=40,
                                   width_shift_range=0.2,
                                   height_shift_range=0.2,
                                   shear_range=0.2,
                                   zoom_range=0.2,
                                   horizontal_flip=True,
                                   fill_mode='nearest')
train_generator = train_datagen.flow_from_directory(directory=train_data_dir,
                                                    target_size=[img_width, img_height],
                                                    batch_size=batch_size,
                                                    class_mode='categorical')

In [None]:
validation_datagen = ImageDataGenerator(preprocessing_function=preprocess_vgg)
validation_generator = validation_datagen.flow_from_directory(directory=validation_data_dir,
                                                              target_size=[img_width, img_height],
                                                              batch_size=batch_size,
                                                              class_mode='categorical')

In [None]:
#compile SGD optimizer with small learning rate
sgd = SGD(lr=1e-4, momentum=0.9)
img_model.compile(optimizer=sgd, loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
plot_model(img_model, to_file='/content/VGG16_img_model.png', show_shapes=True, show_layer_names=True)

In [None]:
history = img_model.fit_generator(
    train_generator,
    steps_per_epoch=2000 // batch_size,
    epochs=2,
    validation_data = validation_generator,
    validation_steps=100 // batch_size
)

In [None]:
img_model.save_weights('/content/drive/MyDrive/data/vgg16_hateful_nonhateful_dense2.h5')

In [None]:
model_json_final = img_model.to_json()
with open("/content/drive/MyDrive/data/vgg16_hateful_nonhateful_dense2.json", "w") as json_file:
  json_file.write(model_json_final)

In [None]:
plt.plot(history.history['accuracy'], label='train accuracy')
plt.plot(history.history['val_accuracy'], label='validation accuracy')
plt.title('Training history')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()
plt.ylim([0, 1]);

In [None]:
test_data_dir = "/content/drive/MyDrive/data/dev_unseen_data"

test_datagen = ImageDataGenerator(preprocessing_function=preprocess_vgg)
test_generator = test_datagen.flow_from_directory(directory=test_data_dir,
                                                              target_size=[img_width, img_height],
                                                              batch_size=batch_size,
                                                              class_mode='categorical')

In [None]:
#Confusion Matrix and Classification Report
Y_pred = img_model.predict_generator(test_generator, steps=540)
y_pred = np.argmax(Y_pred, axis=1)
print('Confusion Matrix')
print(confusion_matrix(test_generator.classes, y_pred))
print('Classification Report')
target_names = ['Non-Offensive', 'Offensive']
print(classification_report(test_generator.classes, y_pred, target_names=target_names))

#Combined model

In [None]:

#Importing all the libraries needed
import keras
import h5py
from keras import optimizers, preprocessing, Input
from keras.models import load_model, Model
from keras.layers import Bidirectional
#from multimodel baseline functions
from keras.layers.core import Reshape, Dropout
from keras.utils.vis_utils import  plot_model
import os
import itertools
#import keras matrics
import matplotlib.pyplot as plt
from keras.layers import Conv1D, MaxPooling1D, Flatten, GlobalAveragePooling3D
from keras import regularizers
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
from keras.applications.inception_v3 import InceptionV3

import pandas as pd
from sklearn.preprocessing import LabelEncoder
import re
from nltk.corpus import stopwords
# from nltk import word_tokenize
from keras.preprocessing import image
from keras.applications.vgg16 import VGG16, preprocess_input
from keras.preprocessing.text import Tokenizer
import numpy as np
from keras.layers import Dense, GlobalAveragePooling2D, Embedding, LSTM, multiply
from PIL import Image, ImageFile

In [None]:
#training_path, testing_path and validation_path are for the text extracted from the meme images respectively in a .csv file
Training_path = '/content/drive/MyDrive/data/train.jsonl'
Testing_path = '/content/drive/MyDrive/data/dev_unseen.jsonl'
Validation_path = '/content/drive/MyDrive/data/dev_seen.jsonl'

In [None]:
img_dir = '/content/img' 

In [None]:
#directory for storing glove embeddings
GLOVE_DIR = "/content/drive/MyDrive/data/"

In [None]:
#assigning weight
class_weight = {1: 1.4,
                0: 1}

In [None]:
def encode_label(DataFrame, Label_col):
    t_y = DataFrame[Label_col].values
    Encoder = LabelEncoder()
    y = Encoder.fit_transform(t_y)
    DataFrame[Label_col] = y

def preprocess_text(Training_path,Validation_path, Testing_path):
    # function to preprocess input
    training_DF = pd.read_json(Training_path, lines = True)
    validation_DF = pd.read_json(Validation_path, lines = True)
    testing_DF = pd.read_json(Testing_path, lines = True)

    # encoding all the labels 
    # encode_label(testing_DF,'label')
    encode_label(training_DF, 'label')
    encode_label(validation_DF, 'label')

    return training_DF, testing_DF, validation_DF

In [None]:
#splitting data into train, test and validation
training_df, testing_df, validation_df = preprocess_text(Training_path, Testing_path, Validation_path)

In [None]:
def create_img_array(img_dirct):
    all_imgs = []
    for root, j, files in os.walk(img_dirct):
        for file in files:
            file = root + '/' + file
            all_imgs.append(file)
    return all_imgs

def create_img_path(DF, Col_name):
    img_path = ['/content' + '/' + name for name in DF[Col_name]]
    return img_path

In [None]:

# Processing image and text for each set
# Creating train, test and validation image path
train_img_path = create_img_path(training_df,'img')
test_img_path = create_img_path(testing_df,'img')
val_img_path = create_img_path(validation_df,'img')


In [None]:
# Vectorising text
# process the whole observation into single list
train_text_list=list(training_df['text'])
test_text_list = list(testing_df['text'])
val_text_list = list(validation_df['text'])

# Creating vectors for train, test, validation
tokenizer = Tokenizer(num_words=1000)
tokenizer.fit_on_texts(train_text_list)
sequences_train = tokenizer.texts_to_sequences(train_text_list)
sequences_test = tokenizer.texts_to_sequences(test_text_list)
sequences_val = tokenizer.texts_to_sequences(val_text_list)

x_train = preprocessing.sequence.pad_sequences(sequences_train, maxlen=100)
x_test = preprocessing.sequence.pad_sequences(sequences_test, maxlen=100)
x_val = preprocessing.sequence.pad_sequences(sequences_val, maxlen=100)

# encoding all the labels 
y_test = testing_df['label']
y_train = training_df['label']
y_val = validation_df['label']

In [None]:
def get_input(path):
    # Loading image from given path
    # and resizing it to 224*224*3 format
    ImageFile.LOAD_TRUNCATED_IMAGES = True
    img = image.load_img(path, target_size=(224,224))    
    return(img)

def process_input(img):
    # Converting image to array    
    img_data = image.img_to_array(img)
    # Adding one more dimension to array    
    img_data = np.expand_dims(img_data, axis=0)
    #     
    img_data = preprocess_input(img_data)
    return(img_data)

In [None]:
def img_text_generator(files, padded_seq, y, batch_size=None):
    while True:
        batch_idxs = np.random.choice(a = list(range(len(padded_seq))), size=batch_size) #Selecting the random batch indexes    
        batch_input_txt = [] # Initializing batch input text
        batch_input_img = [] # Initializing batch input image
        batch_output = [] # Initializing batch output
        
        # Traversing through the batch indexes
        for batch_idx in batch_idxs:
            input_txt = padded_seq[batch_idx] # selecting padded sequences from the batch
            output = y[batch_idx] # Selecting label  
            input_img = get_input(files[batch_idx])
            input_img = process_input(input_img)
            batch_input_txt.append(input_txt) # Appending the input (text vector)
            batch_input_img.append(input_img[0])
            batch_output.append(output) # Appending the label
        
        # Return a tuple of (input,output) to feed the network
        batch_x1 = np.array( batch_input_img )
        batch_x2 = np.array( batch_input_txt )
        batch_y = np.array( batch_output )
        yield ([batch_x1, batch_x2], batch_y)

In [None]:
# Creating train, test, val, generator for meme
img_txt_gen_train = img_text_generator(train_img_path, x_train, y_train, batch_size=32)
img_txt_gen_val = img_text_generator(val_img_path, x_val, y_val, batch_size=1)
img_txt_gen_test = img_text_generator(test_img_path, x_val, y_val, batch_size=1)

In [None]:
text_model.compile(optimizer=adam, loss='categorical_crossentropy', metrics=['accuracy'])
img_model.compile(optimizer=adam, loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
class WeightedAverage(keras.layers.Layer):

    def __init__(self, n_output):
        super(WeightedAverage, self).__init__()
        self.W = tf.Variable(initial_value=tf.random.uniform(shape=[1,1,n_output], minval=0, maxval=1),
            trainable=True) # (1,1,n_inputs)

    def call(self, inputs):

        # inputs is a list of tensor of shape [(n_batch, n_feat), ..., (n_batch, n_feat)]
        # expand last dim of each input passed [(n_batch, n_feat, 1), ..., (n_batch, n_feat, 1)]
        inputs = [tf.expand_dims(i, -1) for i in inputs]
        inputs = keras.layers.Concatenate(axis=-1)(inputs) # (n_batch, n_feat, n_inputs)
        weights = tf.nn.softmax(self.W, axis=-1) # (1,1,n_inputs)
        # weights sum up to one on last dim

        return tf.reduce_sum(weights*inputs, axis=-1) # (n_batch, n_feat)

In [None]:
import keras
#concatenating the output of both the classifiers(text and image)
con_layer = [text_model.output, img_model.output]
W_Avg = WeightedAverage(n_output=len(con_layer))(con_layer)
out = Dense(1, activation='sigmoid')(W_Avg)

In [None]:
#Common Model
from keras import optimizers

com_model = Model(inputs = [img_model.input, text_model.input], outputs=out)

com_model.compile(loss='binary_crossentropy', optimizer=adam, metrics=["accuracy"])

In [None]:
com_model.summary()

In [None]:
plot_model(com_model, to_file='Common_model.png', show_shapes=True, show_layer_names=True)

In [None]:
#Training the combined model
combine_model = com_model.fit(img_txt_gen_train, epochs=3, validation_steps = 149, steps_per_epoch=2, validation_data=img_txt_gen_val, shuffle=True)

In [None]:
com_model.load_weights('/content/drive/MyDrive/data/Combined_model.h5')

In [None]:
y_true = y_test.values
y_pred_com = (com_model.predict_generator(img_txt_gen_test,steps=540))
y_pred_com = np.round(list(itertools.chain(*y_pred_com)))

In [None]:
labels = [1,0]
cm = confusion_matrix(y_true, y_pred_com, labels)
ax= plt.subplot()
sns.heatmap(cm, annot=True, ax = ax); #annot=True to annotate cells

# labels, title and ticks
ax.set_xlabel('Predicted labels');ax.set_ylabel('True labels'); 
ax.set_title('Confusion Matrix'); 
ax.xaxis.set_ticklabels(['offensive', 'non-offensive']); ax.yaxis.set_ticklabels(['offensive', 'non-offensive']);

In [None]:
print(classification_report(y_true, y_pred_com, labels))

In [None]:
plt.plot(combine_model.history['accuracy'], label='Training accuracy')
plt.plot(combine_model.history['val_accuracy'], label='Validation accuracy')\

plt.title('Training v/s Validation Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()

plt.show()

In [None]:
nb_sample = 4
for x,y in zip(img_text_gen_test[:nb_sample], y_pred[:nb_sample]):
  s = pd.Series({'Non-Offensive':1-np.max(y), 'Offensive':np.max(y)})
  axes = s.plot(kind='bar')
  axes.set_xlabel('Class')
  axes.set_ylabel('Probability')
  axes.set_ylim([0,1])
  plt.show()

  img = array_to_img(x)
  display(img)