In [None]:
import numpy as np
import pandas as pd
import time
import datetime
import gc
import random
import re
import nltk
import string
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer

import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler,random_split
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

import transformers
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AdamW, BertConfig,BertTokenizer,get_linear_schedule_with_warmup, pipeline, AutoConfig
from datasets import Dataset


## Loading and cleaning the data

In [None]:
classified_data = pd.read_excel('OpenAI_Scored_Articles_3000.xlsx')
classified_data = classified_data[classified_data['OpenAI_Score'] != 'Error']
classified_data.rename(columns = {'OpenAI_Score':'Classification'}, inplace = True)
print(classified_data.shape)
classified_data.head()

In [None]:
total_data = pd.read_excel('combined_updated.xlsx')
total_data.drop_duplicates(subset=['Manual.summary'], inplace=True)
total_data.dropna(subset='Manual.summary', inplace=True)
print(total_data.shape)
total_data.head()

In [None]:
# Drop rows with NaN values in 'Manual.summary' column from total_data
unlabeled_df = total_data.dropna(subset=['Manual.summary'])['Manual.summary'].to_frame()
unlabeled_df.rename(columns={'Manual.summary': 'text'}, inplace=True)

# Select desired columns from classified_data
labeled_data = classified_data[['Manual.summary', 'Classification']]
labeled_data.rename(columns={'Manual.summary': 'text'}, inplace=True)
labeled_data.rename(columns={'Classification': 'label'}, inplace=True)

# Change column name to text and the classifications to 1 for hawkish and 0 for dovish
labeled_data['label'] = labeled_data['label'].map({0: 2, -1: 0, 1: 1})

# Change type to int
labeled_data['label'] = labeled_data['label'].astype(int)

## Model
### Pretrained BERT Model

In [None]:
# Load the BERT tokenizer
model_name = "gtfintechlab/FOMC-RoBERTa"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=3)

In [None]:
# Shuffle the labeled data and split into train and test sets
labeled_data = labeled_data.sample(frac=1, random_state=13).reset_index(drop=True)
train_df, test_df = train_test_split(labeled_data, test_size=0.2, random_state=13)

# Convert to Hugging Face Dataset
dataset = Dataset.from_pandas(train_df)

In [None]:
def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True)

tokenized_datasets = dataset.map(tokenize_function, batched=True)

In [None]:
# Split dataset: use 40 for training, 10 for validation
small_train_dataset = tokenized_datasets.shuffle(seed=42).select(range(2000))
small_eval_dataset = tokenized_datasets.shuffle(seed=42).select(range(2000, 2397))

In [None]:
from transformers import TrainingArguments, Trainer

training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=small_train_dataset,
    eval_dataset=small_eval_dataset,
)

In [None]:
trainer.train()

In [None]:
# Evaluate the model
print(trainer.evaluate())

# Save the model
trainer.save_model("./finetuned-FOMC-RoBERTa")
tokenizer.save_pretrained("./finetuned-FOMC-RoBERTa")

## Comparing predictions

In [None]:
tokenizer = AutoTokenizer.from_pretrained("gtfintechlab/FOMC-RoBERTa", do_lower_case=True, do_basic_tokenize=True)
model = AutoModelForSequenceClassification.from_pretrained("gtfintechlab/FOMC-RoBERTa", num_labels=3)
config = AutoConfig.from_pretrained("gtfintechlab/FOMC-RoBERTa")

classifier = pipeline('text-classification', model=model, tokenizer=tokenizer, config=config, device=-1, framework="pt")

In [None]:
# Make predictions
unlabeled_texts = test_df['text'].tolist()
predictions = classifier(unlabeled_texts)

# Add predictions to the dataframe
test_df['predictions_roberta'] = [pred['label'] for pred in predictions]

In [None]:
# Change predictions LABEL_0 to -1, LABEL_1 to 1, and LABEL_2 to 0
test_df['predictions_roberta'] = test_df['predictions_roberta'].map({'LABEL_0': 0, 'LABEL_1': 1, 'LABEL_2': 2})
test_df.head()

In [None]:
import pandas as pd
from sklearn.metrics import confusion_matrix, classification_report

actual_labels = test_df['label']
predicted_labels = test_df['predictions_roberta']

# Create confusion matrix
cm = confusion_matrix(actual_labels, predicted_labels, labels=[0, 1, 2])

# Print classification report
print(classification_report(actual_labels, predicted_labels))

# Create a dataframe for better visualization of the confusion matrix
cm_df = pd.DataFrame(cm, index=['Actual_Dovish', 'Actual_Hawkish', 'Actual_Neutral'],
                     columns=['Predicted_Dovish', 'Predicted_Hawkish', 'Predicted_Neutral'])

print("Confusion Matrix:")
print(cm_df)

# Extracting TP, FP, TN, FN for each class
# For 'Dovish'
tp_dovish = cm[0, 0]
fp_dovish = cm[1:, 0].sum()
fn_dovish = cm[0, 1:].sum()
tn_dovish = cm[1:, 1:].sum()

# For 'Hawkish'
tp_hawkish = cm[1, 1]
fp_hawkish = cm[[0, 2], 1].sum()
fn_hawkish = cm[1, [0, 2]].sum()
tn_hawkish = cm[[0, 2], :][:, [0, 2]].sum()

# For 'Neutral'
tp_neutral = cm[2, 2]
fp_neutral = cm[:2, 2].sum()
fn_neutral = cm[2, :2].sum()
tn_neutral = cm[:2, :2].sum()

print(f"Dovish - TP: {tp_dovish}, FP: {fp_dovish}, FN: {fn_dovish}, TN: {tn_dovish}")
print(f"Hawkish - TP: {tp_hawkish}, FP: {fp_hawkish}, FN: {fn_hawkish}, TN: {tn_hawkish}")
print(f"Neutral - TP: {tp_neutral}, FP: {fp_neutral}, FN: {fn_neutral}, TN: {tn_neutral}")


In [None]:
# Load the fine-tuned model and tokenizer
model_path = "C:/Users/joaqu/OneDrive/Desktop/Masters/finetuned-FOMC-RoBERTa"
fine_tuned_model = AutoModelForSequenceClassification.from_pretrained(model_path)
fine_tuned_tokenizer = AutoTokenizer.from_pretrained(model_path)

# Create a pipeline for sentiment analysis
classifier2 = pipeline("sentiment-analysis", model=fine_tuned_model, tokenizer=fine_tuned_tokenizer)

# Make predictions
unlabeled_texts = test_df['text'].tolist()
predictions2 = classifier2(unlabeled_texts)

# Add predictions to the dataframe
test_df['predictions_roberta_ft'] = [pred['label'] for pred in predictions2]

In [None]:
# Change predictions LABEL_0 to -1, LABEL_1 to 1, and LABEL_2 to 0
test_df['predictions_roberta_ft'] = test_df['predictions_roberta_ft'].map({'LABEL_0': 0, 'LABEL_1': 1, 'LABEL_2': 2})

In [None]:
actual_labels = test_df['label']
predicted_labels = test_df['predictions_roberta_ft']

# Create confusion matrix
cm = confusion_matrix(actual_labels, predicted_labels, labels=[0, 1, 2])

# Print classification report
print(classification_report(actual_labels, predicted_labels))

# Create a test_dfframe for better visualization of the confusion matrix
cm_df = pd.DataFrame(cm, index=['Actual_Dovish', 'Actual_Hawkish', 'Actual_Neutral'],
                     columns=['Predicted_Dovish', 'Predicted_Hawkish', 'Predicted_Neutral'])

print("Confusion Matrix:")
print(cm_df)

# Extracting TP, FP, TN, FN for each class
# For 'Dovish'
tp_dovish = cm[0, 0]
fp_dovish = cm[1:, 0].sum()
fn_dovish = cm[0, 1:].sum()
tn_dovish = cm[1:, 1:].sum()

# For 'Hawkish'
tp_hawkish = cm[1, 1]
fp_hawkish = cm[[0, 2], 1].sum()
fn_hawkish = cm[1, [0, 2]].sum()
tn_hawkish = cm[[0, 2], :][:, [0, 2]].sum()

# For 'Neutral'
tp_neutral = cm[2, 2]
fp_neutral = cm[:2, 2].sum()
fn_neutral = cm[2, :2].sum()
tn_neutral = cm[:2, :2].sum()

print(f"Dovish - TP: {tp_dovish}, FP: {fp_dovish}, FN: {fn_dovish}, TN: {tn_dovish}")
print(f"Hawkish - TP: {tp_hawkish}, FP: {fp_hawkish}, FN: {fn_hawkish}, TN: {tn_hawkish}")
print(f"Neutral - TP: {tp_neutral}, FP: {fp_neutral}, FN: {fn_neutral}, TN: {tn_neutral}")

In [None]:
# Download necessary NLTK data files
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

# Define a text cleaning function
def clean_text(text):
    # Lowercase the text
    text = text.lower()
    # Remove punctuation
    text = text.translate(str.maketrans('', '', string.punctuation))
    # Remove digits
    text = re.sub(r'\d+', '', text)
    # Tokenize the text
    tokens = word_tokenize(text)
    # Remove stopwords
    stop_words = set(stopwords.words('english'))
    tokens = [word for word in tokens if word not in stop_words]
    # Lemmatize the text
    lemmatizer = WordNetLemmatizer()
    tokens = [lemmatizer.lemmatize(word) for word in tokens]
    # Rejoin tokens to form the cleaned text
    cleaned_text = ' '.join(tokens)
    return cleaned_text


# Apply text cleaning to the unlabeled texts
unlabeled_texts = test_df['text'].tolist()
cleaned_texts = [clean_text(text) for text in unlabeled_texts]

# Make predictions on the cleaned texts
predictions2 = classifier2(cleaned_texts)

# Add predictions to the dataframe
test_df['predictions_roberta_ft_cleaned'] = [pred['label'] for pred in predictions2]

In [None]:
# Change predictions LABEL_0 to -1, LABEL_1 to 1, and LABEL_2 to 0
test_df['predictions_roberta_ft_cleaned'] = test_df['predictions_roberta_ft_cleaned'].map({'LABEL_0': 0, 'LABEL_1': 1, 'LABEL_2': 2})

### Trying cleaning the text to see if the results improve

In [None]:
actual_labels = test_df['label']
predicted_labels = test_df['predictions_roberta_ft_cleaned']

# Create confusion matrix
cm = confusion_matrix(actual_labels, predicted_labels, labels=[0, 1, 2])

# Print classification report
print(classification_report(actual_labels, predicted_labels))

# Create a test_dfframe for better visualization of the confusion matrix
cm_df = pd.DataFrame(cm, index=['Actual_Dovish', 'Actual_Hawkish', 'Actual_Neutral'],
                     columns=['Predicted_Dovish', 'Predicted_Hawkish', 'Predicted_Neutral'])

print("Confusion Matrix:")
print(cm_df)

# Extracting TP, FP, TN, FN for each class
# For 'Dovish'
tp_dovish = cm[0, 0]
fp_dovish = cm[1:, 0].sum()
fn_dovish = cm[0, 1:].sum()
tn_dovish = cm[1:, 1:].sum()

# For 'Hawkish'
tp_hawkish = cm[1, 1]
fp_hawkish = cm[[0, 2], 1].sum()
fn_hawkish = cm[1, [0, 2]].sum()
tn_hawkish = cm[[0, 2], :][:, [0, 2]].sum()

# For 'Neutral'
tp_neutral = cm[2, 2]
fp_neutral = cm[:2, 2].sum()
fn_neutral = cm[2, :2].sum()
tn_neutral = cm[:2, :2].sum()

print(f"Dovish - TP: {tp_dovish}, FP: {fp_dovish}, FN: {fn_dovish}, TN: {tn_dovish}")
print(f"Hawkish - TP: {tp_hawkish}, FP: {fp_hawkish}, FN: {fn_hawkish}, TN: {tn_hawkish}")
print(f"Neutral - TP: {tp_neutral}, FP: {fp_neutral}, FN: {fn_neutral}, TN: {tn_neutral}")