## BERT base and ESG-BERT model bootstrap

Note: This file was run with Google Colab Pro Plus using high-RAM and GPU. It therefore requires installing the relevant packages and connecting to MyDrive.

## Set-up

In [None]:
!pip install transformers
!pip3 install pickle5

### Import modules

In [None]:
import pickle
import numpy as np
import pandas as pd
import torch
import warnings
import seaborn as sns
import random
import pickle5 as pickle
from collections import defaultdict, Counter
from string import punctuation
from matplotlib import pyplot as plt
from nltk.util import bigrams
from tqdm import tqdm
from itertools import product

from sklearn.feature_extraction import _stop_words
from sklearn.manifold import TSNE
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, roc_auc_score, f1_score
from sklearn.utils import shuffle

from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader

from transformers import BertModel, BertTokenizer
from transformers import logging
logging.set_verbosity_error()

warnings.filterwarnings("ignore", category=FutureWarning)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

### Ensure reproducibility by setting seed

In [None]:
seed = 42

# python RNG
random.seed(seed)

# pytorch RNGs
torch.manual_seed(seed)
torch.backends.cudnn.deterministic = True
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)

# numpy RNG
np.random.seed(seed)

### Get variables & data

In [None]:
with open('/content/drive/MyDrive/train.pkl', "rb") as fh:
    train_data = pickle.load(fh)
train = pd.DataFrame(train_data)

In [None]:
def get_augmented_dataset(df):
    translated_indices = data_for_augmentation.index.to_list() #translated indices
    market_indices = df.index.to_list() #indices in original df
    indices = list(set(translated_indices) & set(market_indices)) # only check indices in both dfs
    translated_set = data_for_augmentation.loc[indices]
    augmented_dataset = shuffle(pd.concat([df, translated_set]), random_state=42)

    return augmented_dataset

# Get translated paragraphs
translated_data = pd.read_excel('/content/drive/MyDrive/Back translation with GT.xlsx')
data_for_augmentation = translated_data[translated_data['Exact match']=='No']
data_for_augmentation.rename(columns={'Back-translation':'Paragraph'}, inplace=True)
data_for_augmentation.drop(columns=['Original Paragraph','Exact match'], inplace=True)
data_for_augmentation.set_index('Index',inplace=True)

### Clean text

In [None]:
# Define function to clean text
def clean(text):
    return [w.strip(punctuation) for w in text.strip().split() if w.strip(punctuation) != '']

In [None]:
train['Paragraph'] = train['Paragraph'].apply(clean)
data_for_augmentation['Paragraph'] = data_for_augmentation['Paragraph'].apply(clean)

### Define classes

In [None]:
# Define dataset class
class BERTDataset(Dataset):

    def __init__(self, data, label_col, hf_path):
        
        # Initialize tokenizer
        self.tok = BertTokenizer.from_pretrained(hf_path)
        
        # Truncate and encode paragraphs
        self.paragraphs = list(data['Paragraph'].apply(self.tok.encode, max_length=512, truncation=True))
        
        # Store labels
        self.labels = list(data[label_col])

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        paragraph = self.paragraphs[idx]
        label = self.labels[idx]
        return paragraph, label

In [None]:
# Define BERT classifier
class BERTClassifier(nn.Module):

    def __init__(self, hf_path, dropout_rate=0.2):
        
        # Define network layers
        super(BERTClassifier, self).__init__()
        self.bert = BertModel.from_pretrained(hf_path)
        self.linear = nn.Linear(768, 2)
        
        # Define dropout
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, paragraphs, masks):
        
        # Define flow of tensors through network
        output_bert = self.bert(paragraphs, attention_mask=masks)[0].mean(axis=1)
        return self.linear(self.dropout(output_bert))

### Define functions

In [None]:
# Define collate function
def bert_collate(batch):
    
    # Store batch size
    batch_size = len(batch)
    
    # Separate paragraphs and labels
    paragraphs = [p for p, _ in batch]
    labels = torch.tensor([l for _, l in batch]).long()
    
    # Store length of longest paragraphs in batch
    max_len = max(len(p) for p in paragraphs)
    
    # Create padded paragraph and attention mask tensors (the latter to avoid performing attention on padding token indices)
    paragraphs_pad = torch.zeros((batch_size, max_len)).long()
    masks_pad = torch.zeros((batch_size, max_len)).long()
    for i, p in enumerate(paragraphs):
        paragraphs_pad[i, :len(p)] = torch.tensor(p)
        masks_pad[i, :len(p)] = 1
    
    return paragraphs_pad, masks_pad, labels

In [None]:
def bert_finetuning_stat_test(hf_path, train, test, col_name, lr=1e-5, epochs=20, 
                    dropout_rate=.2):
    # Create datasets
    train_dataset = BERTDataset(train, col_name, hf_path)
    test_dataset = BERTDataset(test, col_name, hf_path)

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=16, collate_fn=bert_collate, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=bert_collate)

    # Initialize model
    model = BERTClassifier(hf_path, dropout_rate=dropout_rate)

    # Define optimizer and training objective
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    # Define device and move model to CUDA if available
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)

    # Train model
    for e in range(1, epochs+1):

        model.train()
        
        for i, b in enumerate(train_loader):

            # Perform forward pass
            optimizer.zero_grad()
            paragraphs, masks, lbls = [t.to(device) for t in b]
            output = model(paragraphs, masks)
            loss = criterion(output, lbls)
            
            # Perform backpropagation and update weights
            loss.backward()
            optimizer.step()
            
    # Evaluate model on test data
    model.eval()

    y_true = list()
    y_pred = list()

    with torch.no_grad():
        for b in test_loader:
            paragraphs, masks, lbls = [t.to(device) for t in b]
            output = model(paragraphs, masks)
            max_output = output.argmax(dim=1)
            y_true.extend(lbls.tolist())
            y_pred.extend(max_output.tolist())

    return y_true, y_pred

In [None]:
def run_model_n30(which_model, best_params, data, col):
    # Get hf_path (path to right Hugging Face model)
    if which_model == 'BERT':
        hf_path = 'bert-base-uncased'
    else:
        hf_path = 'nbroad/ESG-BERT'
    
    lr = best_params['lr']
    dropout = best_params['dropout']
    augmented = best_params['augmented']

    f1_scores = []
    for i in tqdm(range(0,30)):
        train_stat, test_stat = train_test_split(data, 
                                   test_size=0.2, 
                                   random_state=i, 
                                   stratify=data['All4'])
        if augmented == True:
            train_stat = get_augmented_dataset(train_stat) # augment train with back-translations
        y_true, y_pred = bert_finetuning_stat_test(hf_path, train_stat, test_stat, 
                                        col, lr=lr, epochs=3, 
                                        dropout_rate=dropout)
        f1_scores.append(f1_score(y_true, y_pred, average='macro'))
    
    return f1_scores

## Run statistical test

In [None]:
# Initialize dict to store results
model_stat_test = {'Process_action':{}, 
                   'Market_action':{},
                   'Social':{},
                   'Environment':{}}

In [None]:
# Upload best parameters
import pickle
with open("/content/drive/MyDrive/bert_dict.txt", "rb") as f:
    bert_best_parameters = pickle.load(f)

In [None]:
# BERT statistical test
for col, inner_dict in bert_best_parameters.items():
    print(f"Starting {col}")
    best_params = inner_dict['Best_params']
    if col.endswith('AUG'):
        col = col[:-4]
        f1_scores = run_model_n30('BERT', best_params, train, col)
        model_stat_test[col]['BERT_AUG'] = f1_scores
    else:
        f1_scores = run_model_n30('BERT', best_params, train, col)
        model_stat_test[col]['BERT'] = f1_scores
    
    print(f"F1 scores: {f1_scores}")
    
    # Save (interim) dictionary for later
    with open("/content/drive/MyDrive/bert_stat.txt", "wb") as f:
        pickle.dump(model_stat_test, f)
    print(f"{col} successfully completed and (interim) dict saved\n")

In [None]:
# Turn dict into DataFrame
model_stat_test_df = pd.DataFrame(model_stat_test)
model_stat_test_df = model_stat_test_df.reset_index().rename(columns={'index':'Approach'})
model_stat_test_df = model_stat_test_df.melt(id_vars=['Approach'], var_name='Module', value_name='F1').dropna()
model_stat_test_df = model_stat_test_df.explode('F1').reset_index(drop=True)
model_stat_test_df.head()

In [None]:
# Save DataFrame to pickle
model_stat_test_df.to_pickle("/content/drive/MyDrive/bert_model_stat_test.pkl")

In [None]:
with open("/content/drive/MyDrive/esgbert_dict.txt", "rb") as f:
    esgbert_best_parameters = pickle.load(f)

In [None]:
# ESG-BERT statistical test
for col, inner_dict in esgbert_best_parameters.items():
    print(f"Starting {col}")
    best_params = inner_dict['Best_params']
    if col.endswith('AUG'):
        col = col[:-4]
        f1_scores = run_model_n30('ESG-BERT', best_params, train, col)
        model_stat_test[col]['ESG-BERT_AUG'] = f1_scores
    else:
        f1_scores = run_model_n30('BERT', best_params, train, col)
        model_stat_test[col]['ESG-BERT'] = f1_scores

    print(f"F1 scores: {f1_scores}")

    # Save (interim) dictionary for later
    with open("/content/drive/MyDrive/bert_stat.txt", "wb") as f:
        pickle.dump(model_stat_test, f)
    print(f"{col} successfully completed and (interim) dict saved\n")

In [None]:
# Turn dict into DataFrame
model_stat_test_df = pd.DataFrame(model_stat_test)
model_stat_test_df = model_stat_test_df.reset_index().rename(columns={'index':'Approach'})
model_stat_test_df = model_stat_test_df.melt(id_vars=['Approach'], var_name='Module', value_name='F1').dropna()
model_stat_test_df = model_stat_test_df.explode('F1').reset_index(drop=True)
model_stat_test_df.head()

In [None]:
# Save DataFrame to pickle
model_stat_test_df.to_pickle("/content/drive/MyDrive/bert_model_stat_test.pkl")