## Finetuning Hugging Face models to Financial Phrase Bank Dataset

## Exploring Data

In [6]:
#>=50% of annotators agreed on the financial sentiment 
from pathlib import Path
import pandas as pd

# Load the CSV file with the specified encoding and column names
df = pd.read_csv('all-data.csv',encoding='windows-1252', header=None, names=['label', 'headline'])

# Check the first few rows of the data
df.head()

Unnamed: 0,label,headline
0,neutral,"According to Gran , the company has no plans t..."
1,neutral,Technopolis plans to develop in stages an area...
2,negative,The international electronic industry company ...
3,positive,With the new production plant the company woul...
4,positive,According to the company 's updated strategy f...


In [7]:
import re
# Cleaning function
def clean_text(text):
    # Remove non-ASCII characters
    text = re.sub(r'[^\x00-\x7F]+', '', text)

    # Convert to lowercase
    text = text.lower()

    # Remove punctuation and special characters (except spaces)
    text = re.sub(r'[^\w\s]', '', text)

    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()

    return text

In [8]:
#clean headlines
df['headline'] = df['headline'].apply(clean_text)

## Fine-tuning Deberta

In [9]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch

# Load the pre-trained tokenizer and model from Hugging Face
tokenizer = AutoTokenizer.from_pretrained("yangheng/deberta-v3-base-absa-v1.1")
model = AutoModelForSequenceClassification.from_pretrained("yangheng/deberta-v3-base-absa-v1.1")

  from .autonotebook import tqdm as notebook_tqdm


## Tackling data imbalance using undersampling

In [10]:
category_counts = df['label'].value_counts()

print(category_counts)

label
neutral     2879
positive    1363
negative     604
Name: count, dtype: int64


In [None]:
import numpy as np

def undersample_labels(df):
    """
    Undersamples the DataFrame so that each class in the 'label' column 
    has the same number of samples as the least frequent class.

    Parameters:
    df (pd.DataFrame): The input DataFrame with a 'label' column.

    Returns:
    pd.DataFrame: The undersampled DataFrame.
    """

    # Find the least frequent class count
    min_class_size = df['label'].value_counts().min()

    # Initialize an empty list to store undersampled data
    undersampled_data = []

    # Iterate through each unique class in 'label' and sample data
    for label in df['label'].unique():
        sampled_df = df[df['label'] == label].sample(n=min_class_size, random_state=42, replace=False)
        undersampled_data.append(sampled_df)

    # Combine sampled data, shuffle, and reset index
    undersampled_df = pd.concat(undersampled_data).sample(frac=1, random_state=42).reset_index(drop=True)

    return undersampled_df

In [12]:
undersampled_df = undersample_labels(df)
undersampled_df['label'].value_counts()

label
positive    604
negative    604
neutral     604
Name: count, dtype: int64

## Train Test Split

In [54]:
import torch
from torch.utils.data import Dataset

class CustomDataset(Dataset):
  def __init__(self, texts, labels, tokenizer, max_len=512):
    self.texts = texts
    self.labels = labels
    self.tokenizer = tokenizer
    self.max_len = max_len

  def __len__(self):
    return len(self.texts)

  def __getitem__(self, idx):
      text = str(self.texts[idx])
      label = torch.tensor(self.labels[idx])

      encoding = self.tokenizer(text, truncation=True, padding="max_length",
                                max_length=self.max_len)

      return {
          'input_ids': torch.tensor(encoding['input_ids']),
          'attention_mask': torch.tensor(encoding['attention_mask']),
          'labels': label
      }

In [55]:
id2label = {-1: "negative", 0: "neutral", 1: "positive"}
label2id = {"negative": -1, "neutral": 0, "positive": 1}

X = undersampled_df['headline'].tolist()
y = undersampled_df['label'].map(label2id).tolist()

# dataset = CustomDataset(X, y, tokenizer)

In [56]:
from sklearn.model_selection import train_test_split

# Split the texts and labels
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create the datasets
train_dataset = CustomDataset(X_train, y_train, tokenizer)
test_dataset = CustomDataset(X_test, y_test, tokenizer)

## Training

In [57]:
from sklearn.metrics import accuracy_score, f1_score

def compute_metrics(example):
  labels = example.label_ids
  preds = example.predictions.argmax(-1)

  f1 = f1_score(labels, preds, average="weighted")
  acc = accuracy_score(labels, preds)

  return {'accuracy': acc, "f1": f1}

In [58]:
from transformers import Trainer, TrainingArguments

batch_size = 32
model_name = "Fin_DeBerta"

args = TrainingArguments(
    output_dir="output",
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    learning_rate=2e-5,
    weight_decay=0.01,
    num_train_epochs=3,
    evaluation_strategy='epoch',
    save_strategy="epoch",
    load_best_model_at_end=True,
    fp16=True,
    gradient_accumulation_steps=2,
    lr_scheduler_type="cosine",
    warmup_ratio=0.1,
    logging_steps=50,
    report_to="none"
)



In [59]:
trainer = Trainer(model=model,
                  args=args,
                  train_dataset = train_dataset,
                  eval_dataset = test_dataset,
                  compute_metrics=compute_metrics,
                  tokenizer = tokenizer)

  trainer = Trainer(model=model,


In [None]:
trainer.train()

Epoch,Training Loss,Validation Loss
