**Imports**

In [None]:
import pandas as pd
import numpy as np
from google.colab import drive
drive.mount('/content/gdrive')

!pip install transformers
!pip install -U datasets

from collections import defaultdict, Counter
import json
import torch

from matplotlib import pyplot as plt

from transformers import AutoTokenizer, AutoModelForSequenceClassification
from datasets import load_dataset, DatasetDict
from torch.utils.data import DataLoader

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
#banktrak_df_1 = pd.read_csv('/content/gdrive/MyDrive/Group 1: DSSI Summer 2025/Data/banktrak-8K-20230501-annotated.csv')
#banktrak_df = pd.read_csv('/content/gdrive/MyDrive/Group 1: DSSI Summer 2025/Data/summary_banktrak.csv')
banktrak_df = pd.read_csv('/content/gdrive/MyDrive/Group 1: DSSI Summer 2025/Data/test.csv')

**Class Imbalance**

In [None]:
#https://imbalanced-learn.org/stable/references/generated/imblearn.over_sampling.RandomOverSampler.html#imblearn.over_sampling.RandomOverSampler
from imblearn.over_sampling import RandomOverSampler

X = banktrak_df[['text']]
y = banktrak_df[['contains_debt_instrument_information']]

#oversampling minority data
ros = RandomOverSampler(random_state=42)                 #this is the random oversampling
X_resampled, y_resampled = ros.fit_resample(X, y)        #i think this is fitting our model with the undersampling
y_resampled.contains_debt_instrument_information.value_counts()

X_resampled = X_resampled.reset_index(drop=True)
y_resampled = y_resampled.reset_index(drop=True)

banktrak_df = pd.concat([X_resampled, y_resampled], axis=1)

In [None]:
# #https://imbalanced-learn.org/stable/references/generated/imblearn.under_sampling.RandomUnderSampler.html
# from imblearn.under_sampling import RandomUnderSampler

# X = banktrak_df[['text']]
# y = banktrak_df[['contains_debt_instrument_information']]

# #undersampling minority data
# rus = RandomUnderSampler(random_state=42)                 #this is the random oversampling
# X_resampled, y_resampled = rus.fit_resample(X, y)        #i think this is fitting our model with the undersampling
# y_resampled.contains_debt_instrument_information.value_counts()

# X_resampled = X_resampled.reset_index(drop=True)
# y_resampled = y_resampled.reset_index(drop=True)

# banktrak_df = pd.concat([X_resampled, y_resampled], axis=1)

**train_test_split**

In [None]:
#spliting data
from sklearn.model_selection import train_test_split
train_df, val_df = train_test_split(banktrak_df, test_size=0.2, random_state=42) #validation size is 20%, random state is just there for reproducilbility

# turn to json dict
from datasets import Dataset, DatasetDict
dataset = DatasetDict({'train': Dataset.from_pandas(train_df), 'validation': Dataset.from_pandas(val_df)}) #use dataset dict to turn to dataframe to dict

**Tokenizer**

In [None]:
from transformers import AutoTokenizer, AutoModelForMaskedLM #import auto tokenizer, and auto model

tokenizer = AutoTokenizer.from_pretrained("Lianglab/PharmBERT-cased")
model = AutoModelForMaskedLM.from_pretrained("Lianglab/PharmBERT-cased")

tensor([[  101,  1284,  1328,  1106,  1329,   170,  3073,  4487,  9044, 22559,
         17260,   119,   102]])


In [None]:
# apply tokenizer to dataset
tokenized_dataset = dataset.map(lambda example: tokenizer(example['text'], padding="max_length", truncation=True, max_length=512 ))

#clean dataset
# tokenized_dataset = tokenized_dataset.remove_columns(['item', 'text', 'company','cik'])
tokenized_dataset = tokenized_dataset.rename_column("contains_debt_instrument_information", "labels") #rename this column labels bz the model likes that
tokenized_dataset.set_format("torch")
#turn to pytorch tensor for model

Map:   0%|          | 0/1408 [00:00<?, ? examples/s]

Map:   0%|          | 0/352 [00:00<?, ? examples/s]

**DataLoader To Batch**

In [None]:
#get train and test dataset to variable
train_dataset = tokenized_dataset['train'].shuffle(seed=1111)  #changed the range to have all the data
test_dataset = tokenized_dataset['validation']

#apply dataloader to datasets
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
eval_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False) #batch size keep 4 to 8

**Training & Validation Model**

In [None]:
from transformers import get_linear_schedule_with_warmup
from tqdm.notebook import tqdm
from transformers import set_seed
from torch.optim import AdamW
from transformers import AutoModelForSequenceClassification
from transformers import DistilBertConfig
from transformers import RobertaConfig

model_path = "/content/gdrive/MyDrive/Lianglab_PharmBERT"

set_seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#apply drop out to model
config = RobertaConfig.from_pretrained("Lianglab/PharmBERT-cased")
config.num_labels = 2
config.hidden_dropout_prob = 0.2  # applies to the output of each layer
config.attention_probs_dropout_prob = 0.3  # applies to self-attention scores

model = AutoModelForSequenceClassification.from_pretrained("Lianglab/PharmBERT-cased", config=config).to(device)

num_epochs = 2
num_training_steps = len(eval_dataloader)
optimizer = AdamW(model.parameters(), lr=5e-5, weight_decay=0.01) #2e-5 to 5e-5

lr_scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=num_training_steps)
best_val_loss = float("inf") # starts at infilty so any real loss will be smaller
progress_bar = tqdm(range(num_training_steps))


for epoch in range(num_epochs): #for num of epochs
    # training
    model.train() # training mode
    training_losses = [] #store batch losses
    for batch_i, batch in enumerate(eval_dataloader): #for batches in training loader

        optimizer.zero_grad()

        # copy input to device
        input_ids = batch['input_ids'].to(device)            #moves these to same device
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        labels= labels.long()


        # Call the model for Forward Pass
        output = model(input_ids = input_ids, attention_mask = attention_mask, labels =labels)
        training_loss = output.loss                                           #takes loss data
        training_losses.append(training_loss.item())                          #adds loss data

        #Do backprop and update params by taking an optimization step
        training_loss.backward()
        optimizer.step()
        lr_scheduler.step()                                                   #pdates learning rate according to scheduler policy (linear decay here)
        progress_bar.update(1)                                                #advances the tqdm progress bar by 1 step (one batch done)
    print("Mean Training Loss", np.mean(training_losses))

    # validation
    val_loss = 0
    #set to evaluation mode because we dont want to collect gradients
    model.eval()            #disables things like dropout and layer norm randomness
    for batch_i, batch in enumerate(eval_dataloader):                  #go over batches
        with torch.no_grad():                                          #clear gradients
            # copy input to device
            input_ids = batch['input_ids'].to(device)                  #add to device
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            labels = labels.long()

            #call the model again for Forward Pass
            output = model(input_ids = input_ids, attention_mask = attention_mask, labels =labels)
        # add the batch average of validation loss to the running sum
        val_loss += output.loss               # batch’s loss to the total validation loss.

    # calculating average validation loss across all batches
    avg_val_loss = val_loss / len(eval_dataloader)                               #calculations of best loss
    print(f"Validation loss: {avg_val_loss}")

    # Saving this model checkpoint only if the current validation loss
    # is better than the best validation loss obtained so far
    if avg_val_loss < best_val_loss:
        print("Saving checkpoint!")
        best_val_loss = avg_val_loss
        torch.save({'epoch': epoch, 'model_state_dict': model.state_dict(),'val_loss': best_val_loss,}, f"{model_path}epoch_{epoch}.pt")
    print()

print(f"The best validation loss after {num_epochs} epochs is: {best_val_loss}")

You are using a model of type bert to instantiate a model of type roberta. This is not supported for all configurations of models and can yield errors.
Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at Lianglab/PharmBERT-cased and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight', 'roberta.embeddings.LayerNorm.bias', 'roberta.embeddings.LayerNorm.weight', 'roberta.embeddings.position_embeddings.weight', 'roberta.embeddings.token_type_embeddings.weight', 'roberta.embeddings.word_embeddings.weight', 'roberta.encoder.layer.0.attention.output.LayerNorm.bias', 'roberta.encoder.layer.0.attention.output.LayerNorm.weight', 'roberta.encoder.layer.0.attention.output.dense.bias', 'roberta.encoder.layer.0.attention.output.dense.weight', 'roberta.encoder.layer.0.attention.self.key.bias', 'roberta.encoder.layer.0.attention.self.key.weight', 'roberta.encoder.layer.0.atte

  0%|          | 0/22 [00:00<?, ?it/s]

IndexError: index out of range in self

**Evaluate**

In [None]:
from sklearn.metrics import accuracy_score, f1_score

batch = tokenized_dataset['validation']
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model.eval()            #evalaution mode
test_batch_logits = []    #store models raw outputs logits
y_true = []
for batch_i, batch in enumerate(eval_dataloader): #go through batches
    with torch.no_grad():                          #clear graidnet
        # copy input to device
        input_ids = batch['input_ids'].to(device)            #add to deviice
        attention_mask = batch['attention_mask'].to(device)  #add device
        labels = batch['labels'].cpu().detach().numpy()      #removes labels

        # Call the model on test data
        output = model(input_ids = input_ids, attention_mask = attention_mask, labels =None) #labels none becuase didnt pass any
        test_batch_logits.append(output.logits)                #append output logits --stores logits before soft max
        y_true.extend(labels)                                  #add ground turth labels

print(len(test_batch_logits),len(eval_dataloader))      #shape of the final logits tensor ([num_examples, num_classes])
test_logits = torch.cat(test_batch_logits, dim=0)     #concatenates the logits from all batches along dimension 0.

#sanity check -> dimension 0 of your logits tensor should be same as the size of the test dataset
print(test_logits.shape,len(tokenized_dataset['validation']),len(y_true))

In [None]:
#Convert the logits to predicted labels
y_pred = torch.argmax(test_logits, dim = 1).cpu().numpy()

print(y_true[:10])
print(y_pred[:10])

#sanity check: should have as many predictions as labels
assert len(y_pred)==len(y_true)

In [None]:
# call the f1_score function
print('F1 Score:',f1_score(y_true, y_pred, average='binary')) #why add average binary?

# call the accuracy_score function
print('Accuracy Score:',accuracy_score(y_true, y_pred))

from sklearn.metrics import precision_score, recall_score
print('Precision Score:',precision_score(y_true, y_pred))
print('Recall Score:',recall_score(y_true, y_pred))

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
#https://scikit-learn.org/stable/modules/generated/sklearn.metrics.confusion_matrix.html

cm = confusion_matrix(y_true, y_pred)
print("Confusion Matrix:\n", cm)

import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
import seaborn as sns

# disp = ConfusionMatrixDisplay(confusion_matrix=cm)
# disp.plot()
# plt.show()

from matplotlib.colors import LinearSegmentedColormap

# Provided custom colors
hex_colors = [
    "#4c69b2", "#334157", "#ff4949", "#ff7a78",
    "#ff9d35", "#6389eb", "#ffc53a", "#cbddfa"
]

# Create a linear segmented colormap from the list
custom_cmap = LinearSegmentedColormap.from_list("custom_blend", hex_colors, N=256)

# Visualize with matplotlib/seaborn
plt.figure(figsize=(5, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap=custom_cmap,
            xticklabels=['Predicted Non-Debt.', 'Predicted Debt'],
            yticklabels=['Actual Non-Debt', 'Actual Debt'])
plt.title('Confusion Matrix')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.show()

#false negatives bad

In [None]:
from wordcloud import WordCloud
import random

custom_colors = ["#4c69b2", "#ff4949", "#ff7a78", "#ff9d35", "#6389eb", "#ffc53a"]

def random_color_func(word=None, font_size=None, position=None, orientation=None, font_path=None, random_state=None):
    return random.choice(custom_colors)

# Subsets
fp = csv_file[(csv_file["y_pred"] == 1) & (csv_file["y_true"] == 0)]
fn = csv_file[(csv_file["y_pred"] == 0) & (csv_file["y_true"] == 1)]

# Combine all FP/FN texts
fp_text = " ".join(fp['text'])
fn_text = " ".join(fn['text'])

# Generate word clouds with custom color
wordcloud_fp = WordCloud(width=800, height=400, background_color='white', color_func=random_color_func).generate(fp_text)

wordcloud_fn = WordCloud(width=800, height=400, background_color='white', color_func=random_color_func).generate(fn_text)

# Plot
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.imshow(wordcloud_fp, interpolation='bilinear')
plt.title('False Positives/Non-Debt Related')
plt.axis('off')

plt.subplot(1, 2, 2)
plt.imshow(wordcloud_fn, interpolation='bilinear')
plt.title('False Negatives/Debt Related')
plt.axis('off')
plt.show()


**TESTING**

In [None]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import torch

model_path = '/content/gdrive/MyDrive/Distilroberta_trainer_w_dropout/checkpoint-54'

model = AutoModelForSequenceClassification.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()

In [None]:
test = pd.read_csv('/content/gdrive/MyDrive/Group 1: DSSI Summer 2025/Data/test.csv')
test = test[['text','contains_debt_instrument_information']]

from datasets import Dataset, DatasetDict

dataset = DatasetDict({

     'test': Dataset.from_pandas(test)# 20% (33 rows)
})
dataset

from transformers import AutoTokenizer
name = "typeform/distilroberta-base-v2"
#Initialize your tokenizer here
tokenizer = AutoTokenizer.from_pretrained(name)

sample_input = "We want to use a pretrained tokenizer."

#Call your tokenizer here to check if it was properly loaded by using on a test sentence
tokenized_inputs = tokenizer(
    sample_input,
    padding = "max_length",
    truncation= True,
    max_length = 512,
    return_tensors = "pt"
)
print(tokenized_inputs["input_ids"])

dataset = dataset.filter(lambda x: x['text'] is not None)

tokenized_dataset = dataset.map(
    lambda example: tokenizer(example['text'], padding="max_length",
    truncation=True, max_length=512)
)

# tokenized_dataset = tokenized_dataset.remove_columns(['item', 'text', 'company','cik'])
tokenized_dataset = tokenized_dataset.rename_column("contains_debt_instrument_information", "labels")

tokenized_dataset.set_format("torch")

In [None]:
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

eval_dataloader = DataLoader(tokenized_dataset['test'], batch_size=10)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model.eval()
test_batch_logits = []
y_true = []
for batch_i, batch in enumerate(eval_dataloader):
    with torch.no_grad():
        # copy input to device
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].numpy()
        # Call the model on test data
        output = model(input_ids = input_ids, attention_mask = attention_mask, labels =None)
        test_batch_logits.append(output.logits)
        y_true.extend(labels)


print(len(test_batch_logits),len(eval_dataloader))
test_logits = torch.cat(test_batch_logits, dim=0)

#sanity check -> dimension 0 of your logits tensor should be same as the size of the test dataset
print(test_logits.shape,len(tokenized_dataset['test']),len(y_true))

#Convert the logits to predicted labels
y_pred = torch.argmax(test_logits, dim = 1).cpu().numpy()

print(y_true[:20])
print(y_pred[:20])

#sanity check: should have as many predictions as labels
assert len(y_pred)==len(y_true)

In [None]:
# call the f1_score function
y_true = [int(x) for x in y_true]
y_pred = [int(x) for x in y_pred]
print('F1 Score:',f1_score(y_true, y_pred))

# call the accuracy_score function
print('Accuracy Score:',accuracy_score(y_true, y_pred))
print('Precision:',precision_score(y_true, y_pred))
print('Recall:', recall_score(y_true, y_pred))