In [None]:
!pip install -q transformers
!pip install pydot
!pip install tensorflow==2.11.0

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from collections import Counter
import numpy as np
import tensorflow as tf
from tensorflow import keras
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
from transformers import BertTokenizer, TFBertModel
model_checkpoint = 'bert-base-cased'

In [None]:
bert_tokenizer = BertTokenizer.from_pretrained(model_checkpoint)
bert_model = TFBertModel.from_pretrained(model_checkpoint)

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

def cm(preds, true_label):
  t = {0:'anger', 1:'fear', 2:'joy', 3:'love', 4:'neutral', 5:'sadness', 6:'surprise'}
  cm = confusion_matrix(true_label, preds)
  plt.figure(figsize=(10,7))
  sns.heatmap(
      cm, annot=True, xticklabels=t.values(),
        yticklabels=t.values())
  plt.xlabel("Predicted")
  plt.ylabel("True")
  plt.show()

In [None]:
max_length = 200

####  Standardized BERT model (selected after tuning hidden dimension size) to try out with all different sizes

In [None]:
def create_bert_multiclass_model(checkpoint = model_checkpoint,
                                 num_classes = 7,
                                 hidden_size = 201, 
                                 dropout=0.3,
                                 learning_rate=0.00005):
    bert_model = TFBertModel.from_pretrained(checkpoint)                                              
    max_length = 200
    bert_model.trainable = True
    input_ids = tf.keras.layers.Input(shape=(max_length,), dtype=tf.int64, name='input_ids_layer')
    token_type_ids = tf.keras.layers.Input(shape=(max_length,), dtype=tf.int64, name='token_type_ids_layer')
    attention_mask = tf.keras.layers.Input(shape=(max_length,), dtype=tf.int64, name='attention_mask_layer')
    bert_inputs = {'input_ids': input_ids,
                   'token_type_ids': token_type_ids,
                   'attention_mask': attention_mask}    
    bert_out = bert_model(bert_inputs)
    cls_token = bert_out[0][:, 0, :]
    hidden = tf.keras.layers.Dense(hidden_size, activation='relu', name='hidden_layer')(cls_token)
    hidden = tf.keras.layers.Dropout(dropout)(hidden) 
    classification = tf.keras.layers.Dense(num_classes, activation='softmax',name='classification_layer')(hidden)
    classification_model = tf.keras.Model(inputs=[input_ids, token_type_ids, attention_mask], outputs=[classification])
    classification_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
                                 loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False), 
                                 metrics='accuracy') 
    return classification_model

In [None]:
bert_model = create_bert_multiclass_model()

In [None]:
bert_model.summary()

###  Phase 2: Test for optimal training data size for BERT

#### Size = 17 k rows

In [None]:
train = pd.read_csv('train_data_imbalanced_stratified_no_dupe_17k_5k.csv')
val = pd.read_csv("validation_data_imbalanced_stratified_no_dupe_17k_5k.csv")
test = pd.read_csv("test_data_imbalanced_stratified_no_dupe.csv")

In [None]:
train_text = train['text'].tolist()
val_text = val['text'].tolist()
test_text = test['text'].tolist()


In [None]:
train_encodings = bert_tokenizer(train_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
valid_encodings = bert_tokenizer(val_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
test_encodings = bert_tokenizer(test_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')

In [None]:
nptrain_labels = np.asarray(train['emotions'])
npval_labels = np.asarray(val['emotions'])
nptest_labels = np.asarray(test['emotions'])

In [None]:
train_labels = pd.Categorical(train['emotions']).codes
npval_labels = pd.Categorical(val['emotions']).codes
nptest_labels = pd.Categorical(test['emotions']).codes

In [None]:
bert_model_17 = bert_model.fit([train_encodings.input_ids, train_encodings.token_type_ids, train_encodings.attention_mask], 
                                                  train_labels,   
                                                  validation_data=([valid_encodings.input_ids, valid_encodings.token_type_ids, valid_encodings.attention_mask], 
                                                  npval_labels),    
                                                  batch_size=8, 
                                                  epochs=2)  

In [None]:
predictions = bert_model.predict([test_encodings.input_ids, test_encodings.token_type_ids, test_encodings.attention_mask])
predictions = tf.argmax(predictions, axis=-1)


In [None]:
print(classification_report(nptest_labels, predictions.numpy()))
cm(nptest_labels, predictions.numpy())

#### Size = 34 k training rows

In [None]:
#test with 34k
train = pd.read_csv('train_data_imbalanced_stratified_no_dupe_34k_10k.csv')
#lexicon = pd.read_csv("lexicon_220306.csv")
val = pd.read_csv("validation_data_imbalanced_stratified_no_dupe_34k_10k.csv")
test = pd.read_csv("test_data_imbalanced_stratified_no_dupe.csv")

In [None]:
train_text = train['text'].tolist()
val_text = val['text'].tolist()
test_text = test['text'].tolist()


train_encodings = bert_tokenizer(train_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
valid_encodings = bert_tokenizer(val_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
test_encodings = bert_tokenizer(test_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')

In [None]:
train_labels = pd.Categorical(train['emotions']).codes
npval_labels = pd.Categorical(val['emotions']).codes
nptest_labels = pd.Categorical(test['emotions']).codes

In [None]:
bert_model_34k_history = bert_model.fit([train_encodings.input_ids, train_encodings.token_type_ids, train_encodings.attention_mask], 
                                                  train_labels,   
                                                  validation_data=([valid_encodings.input_ids, valid_encodings.token_type_ids, valid_encodings.attention_mask], 
                                                  npval_labels),    
                                                  batch_size=8, 
                                                  epochs=2)  

In [None]:
predictions = bert_model.predict([test_encodings.input_ids, test_encodings.token_type_ids, test_encodings.attention_mask])
predictions = tf.argmax(predictions, axis=-1)
print(classification_report(nptest_labels, predictions.numpy()))

#### Size = 68 k training rows

In [None]:
#try with 68 k
train = pd.read_csv('train_data_imbalanced_stratified_no_dupe_68k_20k.csv')
#lexicon = pd.read_csv("lexicon_220306.csv")
val = pd.read_csv("validation_data_imbalanced_stratified_no_dupe_68k_20k.csv")
test = pd.read_csv("test_data_imbalanced_stratified_no_dupe.csv")

In [None]:
train_text = train['text'].tolist()
val_text = val['text'].tolist()
test_text = test['text'].tolist()


train_encodings = bert_tokenizer(train_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
valid_encodings = bert_tokenizer(val_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
test_encodings = bert_tokenizer(test_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')

In [None]:
train_labels = pd.Categorical(train['emotions']).codes
npval_labels = pd.Categorical(val['emotions']).codes
nptest_labels = pd.Categorical(test['emotions']).codes

In [None]:
bert_model_68 = bert_model.fit([train_encodings.input_ids, train_encodings.token_type_ids, train_encodings.attention_mask], 
                                                  train_labels,   
                                                  validation_data=([valid_encodings.input_ids, valid_encodings.token_type_ids, valid_encodings.attention_mask], 
                                                  npval_labels),    
                                                  batch_size=8, 
                                                  epochs=2)  

In [None]:
predictions = bert_model.predict([test_encodings.input_ids, test_encodings.token_type_ids, test_encodings.attention_mask])
predictions = tf.argmax(predictions, axis=-1)
print(classification_report(nptest_labels, predictions.numpy()))

We see that using the 17k training size is enough for our purposes. We will use this for all other experiments.

### Phase 2: Experimentation with Lexicon with BERT Model

#### Filtering out training set to only include words in Lexicon.

In [None]:
from nltk import WhitespaceTokenizer
from nltk import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()
w_tokenizer = WhitespaceTokenizer()
def stringify(text):
  finallist = ''
  for i in range(len(text)):
    if i + 1 == len(text):
      finallist += text[i]
    else:
      finallist += text[i] + ' '
  return finallist

def lemmatize_text(text):
    return [lemmatizer.lemmatize(w) for w in w_tokenizer.tokenize(text)]

def tokenizer(text):
  return w_tokenizer.tokenize(text)


In [None]:
#now doing a lexiconned version
train = pd.read_csv('train_data_imbalanced_stratified_no_dupe_17k_5k.csv')
val = pd.read_csv("validation_data_imbalanced_stratified_no_dupe_17k_5k.csv")
test = pd.read_csv("test_data_imbalanced_stratified_no_dupe.csv")
lexicon = pd.read_csv("lexicon_220306.csv")
lexicon_word_list  = lexicon['cleaned_stopwords'].tolist()


In [None]:
train['tokenized'] = train['text'].apply(tokenizer)
train['lexiconned'] = train['tokenized'].apply(lambda x: [word for word in x if word in (lexicon_word_list)])

In [None]:
test['tokenized'] = test['text'].apply(tokenizer)
test['lexiconned'] = test['tokenized'].apply(lambda x: [word for word in x if word in (lexicon_word_list)])
val['tokenized'] = val['text'].apply(tokenizer)
val['lexiconned'] = val['tokenized'].apply(lambda x: [word for word in x if word in (lexicon_word_list)])


In [None]:
train

In [None]:
train['stringed'] = train['lexiconned'].apply(stringify)
test['stringed']= test['lexiconned'].apply(stringify)
val['stringed'] = val['lexiconned'].apply(stringify)
train_x = train['stringed']
train_y = train['emotions']
val_x = val['stringed']
val_y = val['emotions']
test_x = test['stringed']
test_y = test['emotions']

In [None]:
train_labels = pd.Categorical(train_y).codes
npval_labels = pd.Categorical(val_y).codes
nptest_labels = pd.Categorical(test_y).codes

In [None]:
train_text = train_x.tolist()
val_text = val_x.tolist()
test_text = test_x.tolist()
max_length = 200

In [None]:
train_encodings = bert_tokenizer(train_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
valid_encodings = bert_tokenizer(val_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
test_encodings = bert_tokenizer(test_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')

In [None]:
bert_model_lexicon = bert_model.fit([train_encodings.input_ids, train_encodings.token_type_ids, train_encodings.attention_mask], 
                                                  train_labels,   
                                                  validation_data=([valid_encodings.input_ids, valid_encodings.token_type_ids, valid_encodings.attention_mask], 
                                                  npval_labels),    
                                                  batch_size=8, 
                                                  epochs=2) 

In [None]:
predictions = bert_model.predict([test_encodings.input_ids, test_encodings.token_type_ids, test_encodings.attention_mask])
predictions = tf.argmax(predictions, axis=-1)
print(classification_report(nptest_labels, predictions.numpy()))
cm(nptest_labels, predictions.numpy())

#### Filter out training set to only include words NOT in Lexicon

In [None]:
train = pd.read_csv('train_data_imbalanced_stratified_no_dupe_17k_5k.csv')
val = pd.read_csv("validation_data_imbalanced_stratified_no_dupe_17k_5k.csv")
test = pd.read_csv("test_data_imbalanced_stratified_no_dupe.csv")
lexicon = pd.read_csv("lexicon_220306.csv")
lexicon_word_list  = lexicon['cleaned_stopwords'].tolist()

In [None]:
# now only words not in lexicon
train['tokenized'] = train['text'].apply(tokenizer)
train['lexiconned'] = train['tokenized'].apply(lambda x: [word for word in x if word not in (lexicon_word_list)])

In [None]:
test['tokenized'] = test['text'].apply(tokenizer)
test['lexiconned'] = test['tokenized'].apply(lambda x: [word for word in x if word not in (lexicon_word_list)])
val['tokenized'] = val['text'].apply(tokenizer)
val['lexiconned'] = val['tokenized'].apply(lambda x: [word for word in x if word not in (lexicon_word_list)])


In [None]:
train['stringed'] = train['lexiconned'].apply(stringify)
test['stringed']= test['lexiconned'].apply(stringify)
val['stringed'] = val['lexiconned'].apply(stringify)
train_x = train['stringed']
train_y = train['emotions']
val_x = val['stringed']
val_y = val['emotions']
test_x = test['stringed']
test_y = test['emotions']

In [None]:
train_labels = pd.Categorical(train_y).codes
npval_labels = pd.Categorical(val_y).codes
nptest_labels = pd.Categorical(test_y).codes

In [None]:
train_text = train_x.tolist()
val_text = val_x.tolist()
test_text = test_x.tolist()

In [None]:
train_encodings = bert_tokenizer(train_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
valid_encodings = bert_tokenizer(val_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
test_encodings = bert_tokenizer(test_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')

In [None]:
bert_model_reverse_lexicon = bert_model.fit([train_encodings.input_ids, train_encodings.token_type_ids, train_encodings.attention_mask], 
                                                  train_labels,   
                                                  validation_data=([valid_encodings.input_ids, valid_encodings.token_type_ids, valid_encodings.attention_mask], 
                                                  npval_labels),    
                                                  batch_size=8, 
                                                  epochs=2) 

In [None]:
predictions = bert_model.predict([test_encodings.input_ids, test_encodings.token_type_ids, test_encodings.attention_mask])
predictions = tf.argmax(predictions, axis=-1)
print(classification_report(nptest_labels, predictions.numpy()))
cm(nptest_labels, predictions.numpy())

#### Phase 2: Test best BERT model on Kaggle data set

In [None]:
 #test with new data set with no lexicon
train = pd.read_csv('train_data_imbalanced_stratified_no_dupe_17k_5k.csv')
val = pd.read_csv("validation_data_imbalanced_stratified_no_dupe_17k_5k.csv")
test = pd.read_csv("kaggle_data_no_dupe.csv")
train_text = train['text'].tolist()
val_text = val['text'].tolist()
test_text = test['text'].tolist()

train_encodings = bert_tokenizer(train_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
valid_encodings = bert_tokenizer(val_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
test_encodings = bert_tokenizer(test_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')


In [None]:
train_labels = pd.Categorical(train['emotions']).codes
npval_labels = pd.Categorical(val['emotions']).codes
nptest_labels = pd.Categorical(test['emotions']).codes

In [None]:
bert_model_kaggle = bert_model.fit([train_encodings.input_ids, train_encodings.token_type_ids, train_encodings.attention_mask], 
                                                  train_labels,   
                                                  validation_data=([valid_encodings.input_ids, valid_encodings.token_type_ids, valid_encodings.attention_mask], 
                                                  npval_labels),    
                                                  batch_size=8, 
                                                  epochs=2) 

In [None]:
predictions = bert_model.predict([test_encodings.input_ids, test_encodings.token_type_ids, test_encodings.attention_mask])
predictions = tf.argmax(predictions, axis=-1)
print(classification_report(nptest_labels, predictions.numpy()))
cm(nptest_labels, predictions.numpy())

#### Test on balanced clean test set

In [None]:
test = pd.read_csv("test_data_balanced_no_dupe.csv")
test_text = test['text'].tolist()
test_encodings = bert_tokenizer(test_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
nptest_labels = pd.Categorical(test['emotions']).codes

In [None]:
predictions = bert_model.predict([test_encodings.input_ids, test_encodings.token_type_ids, test_encodings.attention_mask])
predictions = tf.argmax(predictions, axis=-1)
print(classification_report(nptest_labels, predictions.numpy()))
cm(nptest_labels, predictions.numpy())

### Phase 3: Create combined training set with CARER and Kaggle data, test on BERT architecture

In [None]:
#try a new bert with mixed kaggle and clean set with squished labeles
train_mix = pd.read_csv("train_20k_12k_mixed_strat_clean_kaggle_230404.csv")
val_mix = pd.read_csv("validation_20k_12k_mixed_strat_clean_kaggle_230404.csv")
test_mix = pd.read_csv("test_data_imbalanced_stratified_no_dupe.csv")

In [None]:
dupes = pd.merge(train_mix, test_mix, on='text')
to_drop = train_mix['text'].isin(dupes['text'])
train_mix = train_mix[~to_drop]
train_mix.shape

In [None]:
train_text = train_mix['text'].tolist()
val_text = val_mix['text'].tolist()
test_text = test_mix['text'].tolist()

train_encodings = bert_tokenizer(train_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
valid_encodings = bert_tokenizer(val_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
test_encodings = bert_tokenizer(test_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')


In [None]:
train_labels = pd.Categorical(train_mix['emotions']).codes
npval_labels = pd.Categorical(val_mix['emotions']).codes
nptest_labels = pd.Categorical(test_mix['emotions']).codes

In [None]:
bert_model_mix = bert_model.fit([train_encodings.input_ids, train_encodings.token_type_ids, train_encodings.attention_mask], 
                                                  train_labels,   
                                                  validation_data=([valid_encodings.input_ids, valid_encodings.token_type_ids, valid_encodings.attention_mask], 
                                                  npval_labels),    
                                                  batch_size=8, 
                                                  epochs=2) 

In [None]:
predictions = bert_model.predict([test_encodings.input_ids, test_encodings.token_type_ids, test_encodings.attention_mask])
predictions = tf.argmax(predictions, axis=-1)
print(classification_report(nptest_labels, predictions.numpy()))
cm(nptest_labels, predictions.numpy())

### Now let's see if Bert can tell the different between the kaggle and clean data sources. (Needs tuning)

In [None]:
def create_bert_model(max_sequence_length=200,
                          hidden_size = 50, 
                          dropout=0.3,
                          learning_rate=0.00005):
    bert_model = TFBertModel.from_pretrained('bert-base-cased')
    bert_model.trainable = True
    input_ids = tf.keras.layers.Input(shape=(200,), dtype=tf.int64, name='input_ids_layer')
    token_type_ids = tf.keras.layers.Input(shape=(200,), dtype=tf.int64, name='token_type_ids_layer')
    attention_mask = tf.keras.layers.Input(shape=(200,), dtype=tf.int64, name='attention_mask_layer')

    bert_inputs = {'input_ids': input_ids,
                   'token_type_ids': token_type_ids,
                   'attention_mask': attention_mask}      

    bert_out = bert_model(bert_inputs)
    cls_token = bert_out[0][:, 0, :]
    hidden = tf.keras.layers.Dense(hidden_size, activation='relu', name='hidden_layer')(cls_token)
    hidden = tf.keras.layers.Dropout(dropout)(hidden)  
    classification = tf.keras.layers.Dense(1, activation='sigmoid',name='classification_layer')(hidden) 
    classification_model = tf.keras.Model(inputs=[input_ids, token_type_ids, attention_mask], outputs=[classification]) 
    classification_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
                                 loss=tf.keras.losses.BinaryCrossentropy(from_logits=False), 
                                 metrics='accuracy')
    return classification_model

In [None]:
train_mix = pd.read_csv("train_source_20k_12k_mixed_230404.csv")
test_mix = pd.read_csv("validation_source_20k_12k_mixed_230404.csv")

In [None]:
train_mix.head()

In [None]:
train_text = train_mix['text'].tolist()
#val_text = val['text'].tolist()
test_text = test_mix['text'].tolist()

train_encodings = bert_tokenizer(train_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
#valid_encodings = bert_tokenizer(val_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')
test_encodings = bert_tokenizer(test_text, truncation=True, padding='max_length', max_length=max_length, return_tensors='tf')


In [None]:
train_labels = pd.Categorical(train_mix['source']).codes
#npval_labels = pd.Categorical(val['emotions']).codes
nptest_labels = pd.Categorical(test_mix['source']).codes

In [None]:
print(train_encodings)

In [None]:
bert_model_source = create_bert_cls_model()


In [None]:
bert_source_model_history = bert_model_source.fit(
    [train_encodings.input_ids, train_encodings.token_type_ids, train_encodings.attention_mask], 
    train_labels, validation_data=([test_encodings.input_ids, test_encodings.token_type_ids, test_encodings.attention_mask], 
    nptest_labels), batch_size=8, epochs=2) 