In [None]:
!pip install transformers keras==2.12.0

In [None]:
# General Python and file-related libraries
import os
import re
import h5py
import pprint
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import imageio
import PIL
from PIL import ImageFile
import cv2
from tqdm import tqdm_notebook
from sklearn.utils import class_weight
from sklearn.metrics import f1_score, accuracy_score

# TensorFlow and Keras libraries
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import (
    Input, Dense, Embedding, LSTM, Dropout, Bidirectional, GRU,
    Conv2D, MaxPooling2D, Flatten, concatenate, GlobalAveragePooling2D, BatchNormalization,
    Lambda, Add, Multiply
)
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Hugging Face Transformers library
from transformers import *

# Suppressing PIL warnings
ImageFile.LOAD_TRUNCATED_IMAGES = True


# Loading and Processing Data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

##Text

In [None]:
train=pd.read_csv('/content/drive/My Drive/Memotion/SemEval-2020-Task-8/train.csv')
val=pd.read_csv('/content/drive/My Drive/Memotion/SemEval-2020-Task-8/val.csv')
test=pd.read_csv('/content/drive/My Drive/Memotion/SemEval-2020-Task-8/test.csv')
print(train.shape)
print(val.shape)
print(test.shape)

# output:
# (5943, 8)
# (1049, 8)
# (1878, 4)


In [None]:
data_train=train.iloc[:,[2,7,0]]
data_val=val.iloc[:,[2,7,0]]
data_train.columns=[0,1,2]
data_val.columns=[0,1,2]
data_test=test.loc[:,['corrected_text','Image_URL','Image_name']]

In [None]:
import re

def process(data):
  for i in tqdm(range(data.shape[0])):
    eg=re.sub('[^a-zA-Z]',' ',data.iloc[i,0])
    #eg=re.sub('(?!^)([A-Z][a-z]+)', r' \1', eg).lower()
    #eg=re.sub(r'^"|"$', '', eg)
    eg=" ".join(eg.lower().split())
    #eg=eg.split()
    #ps=PorterStemmer()
    #eg=[word for word in eg if not word in set(stopwords.words('english'))]
    #eg=" ".join(eg)

    data.iloc[i,0]=eg
  return data


data_train=process(data_train.copy())
data_val=process(data_val.copy())
data_test=process(data_test.copy())

In [None]:
#change the lables to 0,1 for binary classification for simpicity

for i in range(train.shape[0]):
  if train.iloc[i,3]=='hilarious':
    train.iloc[i,3]=1
  if train.iloc[i,3]=='very_funny':
    train.iloc[i,3]=1
  if train.iloc[i,3]=='funny':
    train.iloc[i,3]=1
  if train.iloc[i,3]=='not_funny':
    train.iloc[i,3]=0

  if train.iloc[i,4]=='very_twisted':
    train.iloc[i,4]=1
  if train.iloc[i,4]=='twisted_meaning':
    train.iloc[i,4]=1
  if train.iloc[i,4]=='general':
    train.iloc[i,4]=1
  if train.iloc[i,4]=='not_sarcastic':
    train.iloc[i,4]=0

  if train.iloc[i,5]=='hateful_offensive':
    train.iloc[i,5]=1
  if train.iloc[i,5]=='very_offensive':
    train.iloc[i,5]=1
  if train.iloc[i,5]=='slight':
    train.iloc[i,5]=1
  if train.iloc[i,5]=='not_offensive':
    train.iloc[i,5]=0

  if train.iloc[i,6]=='motivational':
    train.iloc[i,6]=1
  if train.iloc[i,6]=='not_motivational':
    train.iloc[i,6]=0

for i in range(val.shape[0]):
  if val.iloc[i,3]=='hilarious':
    val.iloc[i,3]=1
  if val.iloc[i,3]=='very_funny':
    val.iloc[i,3]=1
  if val.iloc[i,3]=='funny':
    val.iloc[i,3]=1
  if val.iloc[i,3]=='not_funny':
    val.iloc[i,3]=0

  if val.iloc[i,4]=='very_twisted':
    val.iloc[i,4]=1
  if val.iloc[i,4]=='twisted_meaning':
    val.iloc[i,4]=1
  if val.iloc[i,4]=='general':
    val.iloc[i,4]=1
  if val.iloc[i,4]=='not_sarcastic':
    val.iloc[i,4]=0

  if val.iloc[i,5]=='hateful_offensive':
    val.iloc[i,5]=1
  if val.iloc[i,5]=='very_offensive':
    val.iloc[i,5]=1
  if val.iloc[i,5]=='slight':
    val.iloc[i,5]=1
  if val.iloc[i,5]=='not_offensive':
    val.iloc[i,5]=0

  if val.iloc[i,6]=='motivational':
    val.iloc[i,6]=1
  if val.iloc[i,6]=='not_motivational':
    val.iloc[i,6]=0

In [None]:
humour_distribution = train['humour'].value_counts()
sarcasm_distribution = train['sarcasm'].value_counts()
offensive_distribution = train['offensive'].value_counts()
offensive_distribution_val = val['offensive'].value_counts()
motivational_distribution = train['motivational'].value_counts()

# Display the distributions
print("Distribution of 'humour' column:")
print(humour_distribution)

# print("\nDistribution of 'sarcasm' column:")
# print(sarcasm_distribution)

print("\nDistribution of 'offensive' column:")
print(offensive_distribution)

print("\nDistribution of 'offensive val' column:")
print(offensive_distribution_val)

# print("\nDistribution of 'motivational' column:")
# print(motivational_distribution)

In [None]:
cls='motivational' #possible values : humour	sarcasm	offensive	motivational
data_train[1]=train[cls]
data_val[1]=val[cls]


class_labels = tf.convert_to_tensor(data_train.iloc[:,1], dtype=tf.int64)
class_labels_val = tf.convert_to_tensor(data_val.iloc[:,1], dtype=tf.int64)


Upsampling Data to handle class imbalance

In [None]:
from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import RandomOverSampler
UpSample=True

if UpSample:
  X,Y=RandomOverSampler(random_state=42).fit_resample(data_train.iloc[:,[0,2]],class_labels)
  data_train=pd.concat((pd.DataFrame(X),pd.DataFrame(Y)),axis=1)
  X,Y=RandomOverSampler(random_state=42).fit_resample(data_val.iloc[:,[0,2]],class_labels_val)
  data_val=pd.concat((pd.DataFrame(X),pd.DataFrame(Y)),axis=1)
  data_train.columns=['text','image','class']
  data_val.columns=['text','image','class']
  data_test.columns=['text','waste','image']
  data_train

else:
  X,Y=RandomOverSampler(random_state=42).fit_resample(data_val.iloc[:,[0,2]],class_labels_val)
  data_val=pd.concat((pd.DataFrame(X),pd.DataFrame(Y)),axis=1)
  data_train.columns=['text','class','image']
  data_val.columns=['text','image','class']
  data_test.columns=['text','waste','image']
  data_train


In [None]:

offensive_distribution = data_train['class'].value_counts()
offensive_distribution_val = data_val['class'].value_counts()




print("\nDistribution of 'offensive' column:")
print(offensive_distribution)

print("\nDistribution of 'offensive val' column:")
print(offensive_distribution_val)


## Image

In [None]:
!pip install -U -q kaggle
!mkdir -p ~/.kaggle
!cp '/content/drive/My Drive/Memotion/kaggle.json' ~/.kaggle/

!kaggle datasets download --unzip williamscott701/memotion-dataset-7k
# !rsync --info=progress2 '/content/drive/My Drive/Memotion/2000_data.zip' '/content/'
# !unzip '/content/2000_data.zip'

# UniModal

## BiLSTM

In [None]:
from keras.preprocessing.text import Tokenizer

tk = Tokenizer(lower = True)
tk.fit_on_texts(data_train.text.values)

X_train_seq = tk.texts_to_sequences(data_train.text.values)
X_val_seq = tk.texts_to_sequences(data_val.text.values)
X_train = pad_sequences(X_train_seq, maxlen=64, padding='post')
X_val = pad_sequences(X_val_seq, maxlen=64, padding='post')
X_test_seq = tk.texts_to_sequences(data_test.text.values)
X_test = pad_sequences(X_test_seq, maxlen=64, padding='post')

In [None]:
y_train = data_train['class'].copy()
y_val = data_val['class'].copy()
y_train = y_train.astype(float)
y_val = y_val.astype(float)

In [None]:

# Compute class weights for the specified class
class_weights = class_weight.compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)

class_weights= {0:class_weights[0],1:class_weights[1]}

Batch_size = 64
vocabulary_size = len(tk.word_counts.keys())+1
max_words = 64
embedding_size = 64

model = Sequential()
model.add(Embedding(vocabulary_size, embedding_size, input_length=max_words,))
model.add(Dropout(0))

model.add(Bidirectional(LSTM(32)))
model.add(Dropout(0))
model.add(Dense(1, activation='softmax'))

model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['acc'])

early=EarlyStopping(patience=3, restore_best_weights=True, verbose=1)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.01, patience=3, verbose=1)


history=model.fit(X_train, y_train, validation_data=(X_val, y_val), batch_size=Batch_size,
          epochs=50,
          class_weight=class_weights,
          callbacks=[early,reduce_lr]
          )

In [None]:
y_train_prob=model.predict(X_train)
y_val_prob=model.predict(X_val)

y_train_pred = [1 if x>0.5 else 0 for x in y_train_prob]
y_val_pred = [1 if x>0.5 else 0 for x in y_val_prob]

from sklearn.metrics import accuracy_score, classification_report, confusion_matrix,auc

print("Training Accuracy : {0}%".format(int(100*accuracy_score(y_train, y_train_pred))))
print("test Accuracy : {0}%\n".format(int(100*accuracy_score(y_val, y_val_pred))))
print(classification_report(y_val, y_val_pred))

confusion = confusion_matrix(y_val, y_val_pred)
print(confusion)

##RoBERTa

Setting maximum sequence length for the inputs to RoBERTa

In [None]:
max_seq_length=0
for i in tqdm(range(data_train.shape[0])):
    max_seq_length=max(len(data_train.iloc[i,0].split()),max_seq_length)
max_seq_length=max_seq_length+2
print(max_seq_length)

# output :
# 201

Textual data processing function to prepare meme overlay text for input to Roberta Model

In [None]:
def convert_examples_to_features(sentences, label_list, max_seq_length, tokenizer):
    input_ids, input_masks, segment_ids, labels = [], [], [], []
    for index in tqdm_notebook(range(len(sentences)), desc="Converting examples to features"):
        sentence = sentences[index]
        if sentence == '':
            sentence = " "
        # inputs = tokenizer.encode_plus(sentence, max_length=max_seq_length, pad_to_max_length=True)
        inputs = tokenizer.encode_plus(
            sentence,
            add_special_tokens=True,  # Add [CLS] and [SEP] tokens
            max_length=max_seq_length,# Specify the maximum length of the tokens
            pad_to_max_length=True,   # Pad shorter sequences to max length
            return_token_type_ids=True,  # Return segment IDs
            return_attention_mask=True,  # Return attention mask
          )
        input_id = inputs['input_ids']
        input_mask = inputs['attention_mask']
        segment_id = inputs['token_type_ids']
        label = label_list[index]
        input_ids.append(input_id)
        input_masks.append(input_mask)
        segment_ids.append(segment_id)
        labels.append(label)
    return (
        np.array(input_ids),
        np.array(input_masks),
        np.array(segment_ids),
        np.array(labels)
    )

Processing textual data using above function

In [None]:
from transformers import *
tokenizer=RobertaTokenizer.from_pretrained('roberta-base')
#tokenizer=AlbertTokenizer.from_pretrained("albert-base-v2")

data_train=data_train.reset_index(drop=True)
data_val=data_val.reset_index(drop=True)

(train_input_ids, train_input_masks, train_segment_ids, train_labels
) = convert_examples_to_features(data_train['text'],data_train['class'],max_seq_length,tokenizer)

(val_input_ids, val_input_masks, val_segment_ids, val_labels
) = convert_examples_to_features(data_val['text'],data_val['class'],max_seq_length,tokenizer)

(test_input_ids, test_input_masks, test_segment_ids, test_labels
) = convert_examples_to_features(data_test['text'],np.ones(data_test.shape[0]),max_seq_length,tokenizer)


Defining F1 metric

In [None]:
from tensorflow.keras import backend as K

def f1(y_true, y_pred):
    y_pred = K.round(y_pred)
    tp = K.sum(K.cast(y_true*y_pred, 'float'), axis=0)
    # tn = K.sum(K.cast((1-y_true)*(1-y_pred), 'float'), axis=0)
    fp = K.sum(K.cast((1-y_true)*y_pred, 'float'), axis=0)
    fn = K.sum(K.cast(y_true*(1-y_pred), 'float'), axis=0)

    p = tp / (tp + fp + K.epsilon())
    r = tp / (tp + fn + K.epsilon())

    f1 = 2*p*r / (p+r+K.epsilon())
    f1 = tf.where(tf.math.is_nan(f1), tf.zeros_like(f1), f1)
    return K.mean(f1)

Keras Callbacks to simplify model saving

In [None]:
class Saver(tf.keras.callbacks.Callback):
  def on_train_begin(self,logs={}):
    self.score=0
  def on_epoch_end(self,logs={},*args):
    #self.model.save_weights('/media/data_dump/Pradyumna/empha/model-{}-{}-{}-{}-L.h5'.format(lr,epochs,dropout,layers))
    predictions=self.model.predict([val_input_ids, val_input_masks, val_segment_ids])
    res=f1_score(val_labels,np.argmax(predictions,axis=1),average='macro')
    print("f1_score=",res)
    if res>self.score:
        self.score=res
        self.model.save_weights('/content/drive/My Drive/Memotion/model_save_memo/test-2.h5')

Defining model architecture and training

In [None]:
from tensorflow.keras.layers import Input,Dense,Bidirectional,LSTM
from tensorflow.keras.metrics import Precision

precision_metric = Precision()

# Best hyperparameters
lr=1e-5
epochs=15
layer=2
dropout=0



token_inputs = tf.keras.layers.Input(shape=(None,), name='word_inputs', dtype=tf.int32)
mask_inputs = tf.keras.layers.Input(shape=(None,), name='mask_inputs', dtype=tf.int32)
seg_inputs = tf.keras.layers.Input(shape=(None,), name='seg_inputs', dtype=tf.int32)

inputs=[token_inputs,mask_inputs,seg_inputs]

transformer_outputs= TFRobertaModel.from_pretrained('roberta-base')(inputs)[0][:,0,:]

step=transformer_outputs


if layer>=3:
  step=tf.keras.layers.Dense(512,activation='relu')(step)
  if dropout!=0:
      step=tf.keras.layers.Dropout(rate=dropout)(step)
if layer>=2:
  step=tf.keras.layers.Dense(256,activation='relu')(step)
  if dropout!=0:
      step=tf.keras.layers.Dropout(rate=dropout)(step)
if layer>=1:
  step=tf.keras.layers.Dense(64,activation='relu')(step)
  if dropout!=0:
      step=tf.keras.layers.Dropout(rate=dropout)(step)

# Output layer for binary classification
pred = Dense(1, activation='sigmoid')(step)

model=tf.keras.Model(inputs=inputs,outputs=pred)

# Define early stopping
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=3,
    restore_best_weights=True,
    verbose=1
)

# Define learning rate reduction
reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.1,
    patience=3,
    verbose=1,
    min_lr=lr
)

# Compile the model
model.compile(
    loss='binary_crossentropy',
    optimizer=tf.keras.optimizers.Adam(lr=lr),
    metrics=[precision_metric]
)
model.summary()

# Fit the model with early stopping and learning rate reduction
history = model.fit(
    [train_input_ids, train_input_masks, train_segment_ids],
    train_labels,
    epochs=epochs,
    batch_size=32,
    validation_data=([val_input_ids, val_input_masks, val_segment_ids], val_labels),
    callbacks=[early_stopping, reduce_lr]
)



In [None]:
# Predict probabilities for training and validation data
y_train_prob = model.predict([train_input_ids, train_input_masks, train_segment_ids])
y_val_prob = model.predict([val_input_ids, val_input_masks, val_segment_ids])

# Convert probabilities to binary predictions using a threshold (0.5)
y_train_pred = [1 if x > 0.5 else 0 for x in y_train_prob]
y_val_pred = [1 if x > 0.5 else 0 for x in y_val_prob]

# Calculate and print accuracy
train_accuracy = int(100 * accuracy_score(train_labels, y_train_pred))
val_accuracy = int(100 * accuracy_score(val_labels, y_val_pred))

print("Training Accuracy : {0}%".format(train_accuracy))
print("Test Accuracy : {0}%\n".format(val_accuracy))


from sklearn.metrics import accuracy_score, classification_report, confusion_matrix,auc

# Generate and print classification report
classification_rep = classification_report(val_labels, y_val_pred)
print(classification_rep)

# Generate and print confusion matrix
confusion = confusion_matrix(val_labels, y_val_pred)
print(confusion)

## ResNet

Custom Data Loader for keras models

In [None]:
train_labels = tf.convert_to_tensor(data_train['class'], dtype=tf.int64)

def generateTrainingData(dataset, bs):

  y_batch = []
  x_batch_pic=[]
  while True:

    for i in range(math.ceil(dataset.shape[0]/bs)):

        y_batch=train_labels[i*bs:min(i*bs+bs,train_labels.shape[0])]

        for j in range(i*bs,min(i*bs+bs,data_train.shape[0])):
          try:
            img = PIL.Image.open('/content/memotion_dataset_7k/images/'+str(dataset['image'][j]))
          except:
            img = PIL.Image.open('/content/drive/My Drive/2000_data/'+str(dataset['image'][j]))

          img=img.resize((256,256))
          try:
            img = np.asarray( img, dtype='uint8' )
          except SystemError:
            img = np.asarray( img.getdata(), dtype='uint8' )

          if len(img.shape)<3:
            img=img[:,:,np.newaxis]

          if img.shape[2]<=2:
            img=cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)

          if img.shape[2]>3:
            img=cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)

          if len(img.shape)>3 or img.shape[2]!=3:
            raise ValueError(str(row['image'])+" "+str(img.shape)+"has a problem")


          img=img/255
          x_batch_pic.append(img)


        yield np.array(x_batch_pic),y_batch

        y_batch = []
        x_batch_pic=[]

def generatePredictionData(dataset, bs):

  x_batch = []
  #y_batch = []
  x_batch_pic=[]
  while True:

    for i in (range(math.ceil(dataset.shape[0]/bs))):


        for j in range(i*bs,min(i*bs+bs,data_val.shape[0])):
          try:
            img = PIL.Image.open('/content/memotion_dataset_7k/images/'+str(dataset['image'][j]))
          except:
            img = PIL.Image.open('/content/drive/My Drive/2000_data/'+str(dataset['image'][j]))

          img=img.resize((256,256))
          try:
            img = np.asarray( img, dtype='uint8' )
          except SystemError:
            img = np.asarray( img.getdata(), dtype='uint8' )

          if len(img.shape)<3:
            img=img[:,:,np.newaxis]

          if img.shape[2]<=2:
            img=cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)

          if img.shape[2]>3:
            img=cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)

          if len(img.shape)>3 or img.shape[2]!=3:
            raise ValueError(str(row['image'])+" "+str(img.shape)+"has a problem")

          img=img/255
          x_batch_pic.append(img)



        yield np.array(x_batch_pic)#, np.array(y_batch)

        x_batch = []
        #y_batch = []
        x_batch_pic=[]

def generateTestPredictionData(dataset, bs):

  x_batch = []
  #y_batch = []
  x_batch_pic=[]
  while True:

    for i in (range(math.ceil(dataset.shape[0]/bs))):

        for j in range(i*bs,min(i*bs+bs,data_test.shape[0])):
          try:
            img = PIL.Image.open('/content/memotion_dataset_7k/images/'+str(dataset['image'][j]))
          except:
            img = PIL.Image.open('/content/2000_data/'+str(dataset['image'][j]))

          img=img.resize((256,256))
          try:
            img = np.asarray( img, dtype='uint8' )
          except SystemError:
            img = np.asarray( img.getdata(), dtype='uint8' )

          if len(img.shape)<3:
            img=img[:,:,np.newaxis]

          if img.shape[2]<=2:
            img=cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)

          if img.shape[2]>3:
            img=cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)

          if len(img.shape)>3 or img.shape[2]!=3:
            raise ValueError(str(row['image'])+" "+str(img.shape)+"has a problem")

          img=img/255

          x_batch_pic.append(img)

        #print(dataset.iloc[i*bs]['text'])

        yield np.array(x_batch_pic)#, np.array(y_batch)

        x_batch = []
        #y_batch = []
        x_batch_pic=[]

Building Model

In [None]:
image_inputs = Input(shape=(256, 256, 3), name="meme_images")

image_step = tf.keras.applications.ResNet50(include_top=False, weights='imagenet',
                                            input_tensor=None, input_shape=(256, 256, 3), pooling=False, classes=2)(image_inputs)

image_step = GlobalAveragePooling2D()(image_step)
image_step = Dense(768, activation='relu')(image_step)
image_step = BatchNormalization()(image_step)

h = Dense(256, activation='relu')(image_step)
pred = Dense(1, activation='sigmoid')(h)

model = Model(inputs=image_inputs, outputs=pred)

model.compile(loss='binary_crossentropy',
              optimizer=Adam(lr=2e-5, clipnorm=1.),
              metrics=[f1,'accuracy'])

model.summary()

# Define Early Stopping and Reduce LR callbacks
early_stopping = EarlyStopping(patience=5, restore_best_weights=True, verbose=1)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=3, verbose=1)


Keras Callbacks to simplify model saving

In [None]:
class Saver(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.score = 0
        self.epoch_number = 0

    def on_epoch_end(self, logs={}, *args):
        test_gen = generatePredictionData(data_val, 32)
        predict = np.argmax(self.model.predict(test_gen, steps=data_val.shape[0] // 32 + 1, verbose=1), axis=1)
        accuracy = accuracy_score(data_val['class'].astype('int')[:, np.newaxis], predict[:, np.newaxis])
        print(f"Validation Accuracy: {accuracy}")

        self.model.save_weights('/content/drive/My Drive/Memotion/model_save_memo/imageC-2-{}.h5'.format(self.epoch_number))
        self.epoch_number = self.epoch_number + 1

Begin Training

In [None]:
import math
gen = generateTrainingData(data_train, 32)
model.fit_generator(
    gen,
    steps_per_epoch=data_train.shape[0] // 32 + 1,
    epochs=10,
    max_queue_size=10,
    workers=1,
    shuffle=False,
    callbacks=[Saver(), early_stopping, reduce_lr]
)

Test Validation Score

In [None]:
test_gen = generatePredictionData(data_val, 32)  # Replace with your data generator for validation data
predict = model.predict(test_gen, steps=data_val.shape[0] // 32 + 1, verbose=1)
y_val_true = data_val['class'].astype('int')[:, np.newaxis]
y_val_pred = np.argmax(predict, axis=1)
print("Validation Classification Report:")
print(classification_report(y_val_true, y_val_pred))

# MultiModal

Setting maximum sequence length for the inputs to RoBERTa

In [None]:

max_seq_length=0
for i in tqdm(range(data_train.shape[0])):
    max_seq_length=max(len(data_train.iloc[i,0].split()),max_seq_length)
max_seq_length=max_seq_length+2
print(max_seq_length)


# output :
# 201

Textual data processing function to prepare meme overlay text for input to Roberta Model

In [None]:
def convert_examples_to_features(sentences, label_list, max_seq_length, tokenizer):
    input_ids, input_masks, segment_ids, labels = [], [], [], []
    for index in tqdm_notebook(range(len(sentences)), desc="Converting examples to features"):
        sentence = sentences[index]
        if sentence == '':
            sentence = " "
        # inputs = tokenizer.encode_plus(sentence, max_length=max_seq_length, pad_to_max_length=True)
        inputs = tokenizer.encode_plus(
            sentence,
            add_special_tokens=True,  # Add [CLS] and [SEP] tokens
            max_length=max_seq_length,# Specify the maximum length of the tokens
            pad_to_max_length=True,   # Pad shorter sequences to max length
            return_token_type_ids=True,  # Return segment IDs
            return_attention_mask=True,  # Return attention mask
          )
        input_id = inputs['input_ids']
        input_mask = inputs['attention_mask']
        segment_id = inputs['token_type_ids']
        label = label_list[index]
        input_ids.append(input_id)
        input_masks.append(input_mask)
        segment_ids.append(segment_id)
        labels.append(label)
    return (
        np.array(input_ids),
        np.array(input_masks),
        np.array(segment_ids)
        # np.array(labels)
    )

In [None]:
def load_and_process_image(image_path, target_size=(256, 256)):
    try:
        img = PIL.Image.open(image_path)
    except:
        img = PIL.Image.open('/content/drive/My Drive/2000_data/' + image_path)

    img = img.resize(target_size)
    try:
        img = np.asarray(img, dtype='uint8')
    except SystemError:
        img = np.asarray(img.getdata(), dtype='uint8')

    if len(img.shape) < 3:
        img = img[:, :, np.newaxis]

    if img.shape[2] <= 2:
        img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)

    if img.shape[2] > 3:
        img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)

    if len(img.shape) > 3 or img.shape[2] != 3:
        raise ValueError(image_path + " " + str(img.shape) + " has a problem")

    img = img / 255

    return img


In [None]:
train_labels = tf.convert_to_tensor(data_train['class'], dtype=tf.int64)

tokenizer=RobertaTokenizer.from_pretrained('roberta-base')

data_train=data_train.reset_index(drop=True)
data_val=data_val.reset_index(drop=True)

Custom Data Loader for keras models used latter in the notebook

In [None]:
def generateTrainingData(dataset, bs):
    y_batch, x_batch_tokens, x_batch_masks, x_batch_segments, x_batch_pic = [], [], [], [], []

    x_batch_tokens, x_batch_masks, x_batch_segments = convert_examples_to_features(
                dataset['text'],
                dataset['class'],
                max_seq_length,
                tokenizer)
    for i in range(math.ceil(dataset.shape[0] / bs)):
        a = i * bs
        b = min(i * bs + bs, dataset.shape[0])
        y_batch = train_labels[a:b]
        for j in range(a, b):
            image_path = '/content/memotion_dataset_7k/images/' + str(dataset['image'][j])
            img = load_and_process_image(image_path)
            x_batch_pic.append(img)
        yield [np.array(x_batch_tokens[a:b]), np.array(x_batch_masks[a:b]), np.array(x_batch_segments[a:b]), np.array(x_batch_pic)], np.array(y_batch)
        y_batch, x_batch_pic = [], []

def generateTestData(dataset, bs):
    x_batch_tokens, x_batch_masks, x_batch_segments, x_batch_pic = [], [], [], [], []
    while True:
        for i in range(math.ceil(dataset.shape[0] / bs)):
            a = i * bs
            b = min(i * bs + bs, dataset.shape[0])

            x_batch_tokens, x_batch_masks, x_batch_segments = convert_examples_to_features(
                dataset[a: b]['text'],
                y_batch,
                max_seq_length,
                tokenizer
            )

            for j in range(a, b):
                image_path = '/content/memotion_dataset_7k/images/' + str(dataset['image'][j])
                img = load_and_process_image(image_path)

                x_batch_pic.append(img)

            yield [np.array(x_batch_tokens), np.array(x_batch_masks), np.array(x_batch_segments), np.array(x_batch_pic)]
            x_batch_tokens, x_batch_masks, x_batch_segments, x_batch_pic = [], [], [], [], []

##Early Fusion

In [None]:
from tensorflow.keras.layers import Input,Dense,Bidirectional,Conv2D,MaxPooling2D,Flatten,concatenate,GlobalAveragePooling2D,BatchNormalization,Lambda,Add,Multiply
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam,SGD
import tensorflow.keras.backend as K
from sklearn.metrics import f1_score,accuracy_score


token_inputs = tf.keras.layers.Input(shape=(None,), name='word_inputs', dtype=tf.int32)
mask_inputs = tf.keras.layers.Input(shape=(None,), name='mask_inputs', dtype=tf.int32)
seg_inputs = tf.keras.layers.Input(shape=(None,), name='seg_inputs', dtype=tf.int32)

image_inputs=tf.keras.layers.Input(shape=(256,256,3),name="meme_images")


inputs=[token_inputs,mask_inputs,seg_inputs,image_inputs]
text_inputs=[token_inputs,mask_inputs,seg_inputs]


transformer_outputs= TFRobertaModel.from_pretrained('roberta-base')(text_inputs)[0][:,0,:]
transformer_outputs=tf.keras.layers.BatchNormalization()(transformer_outputs)

image_step=tf.keras.applications.ResNet50(include_top=False, weights='imagenet', input_tensor=None, input_shape=(256,256,3), pooling=False, classes=3,)(image_inputs)
image_step=GlobalAveragePooling2D()(image_step)
image_step=tf.keras.layers.Dense(768,activation='relu')(image_step)
image_step=tf.keras.layers.BatchNormalization()(image_step)



concat_output=concatenate([transformer_outputs,image_step],axis=1)
concat_output=tf.keras.layers.LayerNormalization()(concat_output)

h=Dense(256,activation='relu')(concat_output)
pred = Dense(1, activation='sigmoid')(h)

model=Model(inputs=inputs,outputs=pred)

model.compile(loss='binary_crossentropy',
              optimizer=Adam(lr=2e-5, clipnorm=1.),
              metrics=['accuracy'])

# Define Early Stopping and Reduce LR callbacks
early_stopping = EarlyStopping(patience=5, restore_best_weights=True, verbose=1)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=3, verbose=1)

model.summary()


##Gated MultiModal Unit (GMU)

In [None]:
from tensorflow.keras.layers import Input,Dense,Bidirectional,Conv2D,MaxPooling2D,Flatten,concatenate,GlobalAveragePooling2D,BatchNormalization,Lambda,Add,Multiply
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam,SGD
import tensorflow.keras.backend as K
from sklearn.metrics import f1_score,accuracy_score

token_inputs = tf.keras.layers.Input(shape=(None,), name='word_inputs', dtype=tf.int32)
mask_inputs = tf.keras.layers.Input(shape=(None,), name='mask_inputs', dtype=tf.int32)
seg_inputs = tf.keras.layers.Input(shape=(None,), name='seg_inputs', dtype=tf.int32)

image_inputs=tf.keras.layers.Input(shape=(256,256,3),name="meme_images")


inputs=[token_inputs,mask_inputs,seg_inputs,image_inputs]
text_inputs=[token_inputs,mask_inputs,seg_inputs]


transformer_outputs= TFRobertaModel.from_pretrained('roberta-base')(text_inputs)[0][:,0,:]
transformer_outputs=tf.keras.layers.BatchNormalization()(transformer_outputs)

image_step=tf.keras.applications.ResNet50(include_top=False, weights='imagenet', input_tensor=None, input_shape=(256,256,3), pooling=False, classes=3,)(image_inputs)
image_step=GlobalAveragePooling2D()(image_step)
image_step=tf.keras.layers.Dense(768,activation='relu')(image_step)
image_step=tf.keras.layers.BatchNormalization()(image_step)

concat_output=concatenate([transformer_outputs,image_step],axis=1)
concat_output=tf.keras.layers.LayerNormalization()(concat_output)

# Gated Multimodal Block - Begin
hv=Dense(768,activation='tanh')(image_step)
ht=Dense(768,activation='tanh')(transformer_outputs)
z=Dense(768,activation='sigmoid')(concat_output)
h=Add()([Multiply()([z,hv]),Multiply()([Lambda(lambda x: 1. - x)(z),ht])])
# Gated Multimodal Block - End


h=Dense(256,activation='relu')(h)
pred = Dense(1, activation='sigmoid')(h)

model=Model(inputs=inputs,outputs=pred)

model.compile(loss='binary_crossentropy',
              optimizer=Adam(lr=2e-5, clipnorm=1.),
              metrics=['accuracy'])

# Define Early Stopping and Reduce LR callbacks
early_stopping = EarlyStopping(patience=5, restore_best_weights=True, verbose=1)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=3, verbose=1)

model.summary()

##Eval

Keras Callbacks to simplify model saving

In [None]:
bs = 32

In [None]:
class Saver(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.score = 0
        self.epoch_number = 0

    def on_epoch_end(self, logs={}, *args):
        test_gen = generatePredictionData(data_val, bs)
        predict = np.argmax(self.model.predict(test_gen, steps=data_val.shape[0] // bs + 1, verbose=1), axis=1)
        accuracy = accuracy_score(data_val['class'].astype('int')[:, np.newaxis], predict[:, np.newaxis])
        print(f"Validation Accuracy: {accuracy}")

        self.model.save_weights('/content/drive/My Drive/Memotion/model_save_memo/imageC-2-{}.h5'.format(self.epoch_number))
        self.epoch_number = self.epoch_number + 1

Begin Training

In [None]:
import math
gen = generateTrainingData(data_train, bs)
model.fit_generator(
    gen,
    steps_per_epoch=data_train.shape[0] // bs + 1,
    epochs=10,
    max_queue_size=10,
    workers=1,
    shuffle=False,
    callbacks=[Saver(), early_stopping, reduce_lr]
)


Test Validation Score

In [None]:
test_gen = generateTestData(data_val, bs)  # Replace with your data generator for validation data
predict = model.predict(test_gen, steps=data_val.shape[0] // bs + 1, verbose=1)
y_val_true = data_val['class'].astype('int')[:, np.newaxis]
y_val_pred = np.argmax(predict, axis=1)
print("Validation Classification Report:")
print(classification_report(y_val_true, y_val_pred))