## Importing Libraries

In [None]:
# Pre-Processing
import pandas as pd
import re
import emoji
import contractions

from sklearn.model_selection import train_test_split

from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from datasets import Dataset

import evaluate

import numpy as np

## Importing Data

In [None]:
senwave_fp = 'twitter_data/custom_data/SenWaveDataset/labeledEn.csv'

In [None]:
senwave_df = pd.read_csv(senwave_fp)

In [None]:
# Removal of links
def remove_urls(doc):
    return re.sub(r'http\S+', '', doc)

def convert_emojis(doc):
    # delimiters are what is used around the emoji description, in this case spaces are used
    return emoji.replace_emoji(doc, replace='')

def remove_hashtags(doc):
    return doc.replace('#', '')
    #return re.sub(r'#\w+', '', doc)

def remove_numbers(doc):
    return re.sub(r'\d+', '', doc)

def remove_user_mentions(doc):
    return re.sub(r'@\w+', '', doc)

def fix_contractions(doc):
    return contractions.fix(doc)

def remove_punctuation(doc):
    return re.sub(r'[^\w\s]', '', doc)

def remove_amp(doc):
    return re.sub(r'\bamp\b', '', doc).strip() # strip removes the surrounding white space

def remove_special_character_combinations(doc):
    # Remove all combinations of \r and \n in any order
    cleaned_text = re.sub(r'[\r\n\xa0]+', '', doc)
    return cleaned_text

def remove_non_english_characters(doc):
    return re.sub(r'[^\x00-\x7F]+', '', doc)

In [None]:
# This function combines all the previous functions into one
def preprocess_tweet(doc):
    doc = remove_urls(doc)
    doc = convert_emojis(doc)
    doc = remove_hashtags(doc)
    doc = remove_numbers(doc)
    doc = remove_user_mentions(doc)
    doc = fix_contractions(doc)
    doc = remove_punctuation(doc)
    doc = remove_amp(doc)
    doc = remove_special_character_combinations(doc)
    doc = remove_non_english_characters(doc)
    return doc

In [None]:
# This applies the pre-processing
senwave_df['Tweet'] = senwave_df['Tweet'].apply(preprocess_tweet)

## Creating Train and Test Dataframes

In [None]:
# Define the mapping rules
positive_labels = ['Optimistic', 'Thankful', 'Empathetic']
negative_labels = ['Pessimistic', 'Anxious', 'Sad', 'Annoyed', 'Denial']
neutral_label = ['Official report']

In [None]:
def re_label(row):
    positive_count = sum(row[emotion] for emotion in positive_labels)
    negative_count = sum(row[emotion] for emotion in negative_labels)
    neutral_count = sum(row[emotion] for emotion in neutral_label)
    
    # Check for mixed emotions or official report
    if (positive_count > 0 and negative_count > 0) or neutral_count > 0:
        return 1
    elif positive_count > 0:
        return 2
    elif negative_count > 0:
        return 0
    else:
        return 1

In [None]:
# Apply the re_label function to the dataframe
senwave_df['label'] = senwave_df.apply(re_label, axis=1)

In [None]:
len(senwave_df[senwave_df['label'] == 2])

In [None]:
senwave_pos = senwave_df[senwave_df['label'] == 2].sample(n=1500, random_state=23) # Positive
senwave_neu = senwave_df[senwave_df['label'] == 1].sample(n=1500, random_state=23) # Neutral 
senwave_neg = senwave_df[senwave_df['label'] == 0].sample(n=1500, random_state=23) # Negative

filtered_df = pd.concat([senwave_pos, senwave_neu, senwave_neg], ignore_index=True)

In [None]:
# Perform an 80/20 train-test split
train_df, test_val_df = train_test_split(filtered_df, test_size=0.4, random_state=23)
val_df, test_df = train_test_split(test_val_df, test_size=0.5, random_state=23)

In [None]:
train_df = train_df[['Tweet', 'label']]
val_df = val_df[['Tweet', 'label']]
test_df = test_df[['Tweet', 'label']]

## Retrain BERTweet on the tweet dataset

In [None]:
# Convert to HuggingFace dataset
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)
test_dataset = Dataset.from_pandas(test_df)

In [None]:
# Tokenize the datasets
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')

def tokenize_function(doc):
    return tokenizer(doc['Tweet'], padding='max_length', truncation=True)

train_dataset = train_dataset.map(tokenize_function, batched=True)
val_dataset = val_dataset.map(tokenize_function, batched=True)

In [None]:
# Set format for PyTorch
train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
val_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])

In [None]:
# Pre-trained BERTweet model
model = AutoModelForSequenceClassification.from_pretrained('bert-base-uncased', 
                                                           num_labels=3 
                                                           #,hidden_dropout_prob=0.3
                                                          )

In [None]:
# Define compute_metrics function
accuracy = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return accuracy.compute(predictions=predictions, references=labels)

In [None]:
# Define training arguments
training_args = TrainingArguments(
    output_dir='./senwave_model_results',
    num_train_epochs=20,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
    evaluation_strategy='epoch',  # Evaluate at the end of each epoch
    save_strategy='epoch',  # Save checkpoints at the end of each epoch
    load_best_model_at_end=True,
    metric_for_best_model="accuracy"
)


In [None]:
# Initialize Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

In [None]:
# Train the model
trainer.train()

In [None]:
train_df