In [None]:
# only run on google runtime
!pip install tensorflow-text
!pip install tf-models-official
!pip install tensorflow-addons
!pip install scikit-learn

In [3]:
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text
from official.nlp import optimization
import tensorflow_addons as tfa
import transformers
import sklearn as sk
from sklearn.model_selection import train_test_split

In [None]:
# only run on google runtime
# update file paths with location to subtask data
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
input_directory = '/content/drive/MyDrive/2023-2024 School Year/Fall Semester/Natural Language Processing/Project/Data'
raw_train_data = input_directory + '/subtaskA_train_monolingual.jsonl'
raw_dev_data = input_directory + '/subtaskA_dev_monolingual.jsonl'

In [5]:
import json

def extract_data(filename):
  text = []
  labels = []
  model = []
  source = []
  with open(filename, 'r', encoding='utf-8') as f:
    jlist = list(f)
    for elem in jlist:
      jsonData = json.loads(elem)
      text.append(jsonData["text"])
      labels.append(jsonData["label"])
      model.append(jsonData["model"])
      source.append(jsonData["source"])
  return text, labels


In [6]:
train_text, train_labels = extract_data(raw_train_data)

In [None]:
# if using train set for eval
train_text, eval_text, train_labels, eval_labels = train_test_split(train_text, train_labels, test_size=0.1, random_state=92)

In [7]:
# if using dev set for eval
train_text, train_labels = sk.utils.shuffle(train_text, train_labels, random_state=92)
eval_text, eval_labels = extract_data(raw_dev_data)

In [None]:
# if using deberta
bert_tokenizer = transformers.AutoTokenizer.from_pretrained("microsoft/deberta-base")
bert_encoder = transformers.TFDebertaModel.from_pretrained("microsoft/deberta-base")

In [None]:
# if using bert
bert_tokenizer = transformers.AutoTokenizer.from_pretrained("bert-base-uncased")
bert_encoder = transformers.TFBertModel.from_pretrained("bert-base-uncased")

In [9]:
def hugging_face_bert_encode(text_data, encode_ending=False):
  input_ids = []
  attention_masks = []
  for text in text_data:
    if encode_ending:
      text_split = text.split()
      end_of_text = text_split[max(-1*len(text_split), -384):]
      text = ' '.join(end_of_text)
    tokenized_data = bert_tokenizer(text, padding='max_length', max_length=512, truncation=True)
    input_ids.append(tokenized_data['input_ids'])
    attention_masks.append(tokenized_data['attention_mask'])
  return [np.array(input_ids), np.array(attention_masks)]
encode_from_end = False
hugging_face_train_text_data = hugging_face_bert_encode(train_text, encode_from_end)
hugging_face_train_labels = np.array(train_labels)
hugging_face_test_text_data = hugging_face_bert_encode(eval_text, encode_from_end)
hugging_face_test_labels = np.array(eval_labels)

In [10]:
# text classification modeling was based in part upon tensorflows guide to text classification with BERT: https://www.tensorflow.org/text/tutorials/classify_text_with_bert
def build_model():
  input_ids = tf.keras.Input(shape=(512,),dtype='int32')
  attention_masks = tf.keras.Input(shape=(512,),dtype='int32')
  bert_outputs = bert_encoder([input_ids, attention_masks])['last_hidden_state']
  pooling_layer = tf.keras.layers.GlobalMaxPool1D()
  dropout_inputs = pooling_layer(bert_outputs)
  dropout = tf.keras.layers.Dropout(0.2)
  classifier_inputs = dropout(dropout_inputs)
  classifier = tf.keras.layers.Dense(1, activation='sigmoid', name='output')
  outputs = classifier(classifier_inputs)
  return tf.keras.Model([input_ids, attention_masks], outputs)

In [11]:
final_train_data = hugging_face_train_text_data
final_train_labels = hugging_face_train_labels
final_test_data = hugging_face_test_text_data
final_test_labels = hugging_face_test_labels

In [None]:
model = build_model()
#optimizer
batch_size = 4
epochs = 1
steps_per_epoch = int(hugging_face_test_text_data[0].shape[0] /batch_size)
num_train_steps = steps_per_epoch * epochs
num_warmup_steps = int(0.1*num_train_steps)

init_lr = 3e-5
optimizer = optimization.create_optimizer(init_lr=init_lr,
                                          num_train_steps=num_train_steps,
                                          num_warmup_steps=num_warmup_steps,
                                          optimizer_type='adamw')
#loss
loss= tf.keras.losses.BinaryCrossentropy(from_logits=False)
#metrics
metrics=[
    tfa.metrics.F1Score(num_classes=1, average="micro", name='micro_f1', threshold=0.5),
    tf.keras.metrics.BinaryAccuracy()
]
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

model.fit(x=(final_train_data[0], final_train_data[1]), y=final_train_labels, validation_data = (final_test_data, final_test_labels), batch_size=batch_size, epochs=epochs)
