__Building a Sentiment Analysis Model with Transformers (BERT)__

This Python notebook contains the development process of the sentiment analysis model that we will be using to predict the sentiment of Yelp text reviews. We will be using a Transformer model, more specifically, the BERT model, to develop our sentiment analysis model. This project aims to gauge the overall sentiment expressed in large volumes of text reviews (positive, negative, or neutral), enabling businesses to quickly identify areas where customers are satisfied or dissatisfied and prioritize areas for improvement. This is particularly useful in cases where businesses receive high volumes of daily text reviews and do not have the time to individually analyse them to derive the overall sentiment towards that business.

Use Cases:
- Use text mining to analyse online reviews to see if they are positive negative or neutral
- Identify common complaints or area of improvements
- Identify positive comments to encourage business development initiatives

Note: Due to the computational power required by Transformers, running all the code chunks in this file may take a long time. This Python notebook simply shows the code that was used to train and develop the final model. For more information about the model development and selection process, please read "BERT Model Details.pdf" for evaluation metrics.

General Imports

In [None]:
# RUN THIS EVERYTIME AT THE START
import pandas as pd
import re
import string
from collections import Counter, defaultdict
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
import warnings
warnings.filterwarnings('ignore')
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
stopWords_nltk = set(stopwords.words('english'))

Helper Functions

In [None]:
import re
from typing import Union, List

class CleanText():
    """ clearing text except digits () . , word character """

    def __init__(self, clean_pattern = r"[^A-ZĞÜŞİÖÇIa-zğüı'şöç0-9.\"',()]"):
        self.clean_pattern =clean_pattern

    def __call__(self, text: Union[str, list]) -> List[List[str]]:

        if isinstance(text, str):
            docs = [[text]]

        if isinstance(text, list):
            docs = text

        text = [[re.sub(self.clean_pattern, " ", sent) for sent in sents] for sents in docs]

        return text

def remove_emoji(data):
    emoj = re.compile("["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
        u"\U00002500-\U00002BEF"
        u"\U00002702-\U000027B0"
        u"\U00002702-\U000027B0"
        u"\U000024C2-\U0001F251"
        u"\U0001f926-\U0001f937"
        u"\U00010000-\U0010ffff"
        u"\u2640-\u2642"
        u"\u2600-\u2B55"
        u"\u200d"
        u"\u23cf"
        u"\u23e9"
        u"\u231a"
        u"\ufe0f"  # dingbats
        u"\u3030"
                      "]+", re.UNICODE)
    return re.sub(emoj, '', data)

def tokenize(text):
    """ basic tokenize method with word character, non word character and digits """
    text = re.sub(r" +", " ", str(text))
    text = re.split(r"(\d+|[a-zA-ZğüşıöçĞÜŞİÖÇ]+|\W)", text)
    text = list(filter(lambda x: x != '' and x != ' ', text))
    sent_tokenized = ' '.join(text)
    return sent_tokenized

regex = re.compile('[%s]' % re.escape(string.punctuation))

def remove_punct(text):
    text = regex.sub(" ", text)
    return text

clean = CleanText()

# label encode
def label_encode(x):
    if x == 1 or x == 2:
        return 0
    if x == 3:
        return 1
    if x == 5 or x == 4:
        return 2

# label to name
def label2name(x):
    if x == 0:
        return "Negative"
    if x == 1:
        return "Neutral"
    if x == 2:
        return "Positive"


Read Data

In [None]:
df = pd.read_csv("unique_data.csv")

In [None]:
# extract only 'review_stars' and 'review_text' columns and rename them
df = df[["review_text", "review_stars"]].rename(columns = {"review_text": "Review", "review_stars": "Rating"})

# show column names
print("df.columns: ", df.columns)

In [None]:
# head of df
df.head()

Labelling & Cleaning Text

In [None]:
# encode label and mapping label name
df["label"] = df["Rating"].apply(lambda x: label_encode(x))
df["label_name"] = df["label"].apply(lambda x: label2name(x))

# clean text, lowercase and remove punctuation
df["Review"] = df["Review"].apply(lambda x: remove_punct(clean(remove_emoji(x).lower())[0][0]))

In [None]:
# checking head of df to ensure labelling and cleaning were successful
df.head()

Using a Simple Tokenizer to Tokenize Reviews and Generate Token Counts

In [None]:
# tokenize data
df["tokenized_review"] = df.Review.apply(lambda x: tokenize(x)).apply(lambda x: remove_punct(x))

# calculate token count for any sent
df["sent_token_length"] = df["tokenized_review"].apply(lambda x: len(x.split()))

In [None]:
# determine what proportion of all sent_token_length is less than 512
(df.sent_token_length < 512).mean()

Generating Token Counts with BERT Tokenizer for Transformer Model

In [None]:
# initiate BERT tokenizer
from transformers import BertTokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', do_lower_case=True)

In [None]:
# tokenize data with bert tokenizer (takes about 4-6 minutes to run this chunk)
df["sent_bert_token_length"] = df["Review"].apply(lambda x: len(tokenizer(x, add_special_tokens=False)["input_ids"]))

In [None]:
# determine what proportion of all sent_bert_token_length is less than 512
(df.sent_bert_token_length < 512).mean()

BERT Preprocessing and Training Imports

In [None]:
# RUN THIS EVERYTIME AT THE START
import pandas as pd
import numpy as np
import os
import random
from pathlib import Path
import json

import torch
if torch.cuda.is_available():
    device_name = torch.device("cuda")
else:
    device_name = torch.device('cpu')
print("Using {}.".format(device_name))
from tqdm.notebook import tqdm

from transformers import BertTokenizer
from torch.utils.data import TensorDataset

from transformers import BertForSequenceClassification

BERT Preprocessing and Training

In [6]:
class Config():
    seed_val = 17
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    epochs = 5
    batch_size = 6
    seq_length = 512
    lr = 2e-5
    eps = 1e-8
    pretrained_model = 'bert-base-uncased'
    test_size=0.15
    random_state=42
    add_special_tokens=True
    return_attention_mask=True
    pad_to_max_length=True
    do_lower_case=False
    return_tensors='pt'

config = Config()

In [None]:
# params will be saved after training
params = {"seed_val": config.seed_val,
    "device":str(config.device),
    "epochs":config.epochs,
    "batch_size":config.batch_size,
    "seq_length":config.seq_length,
    "lr":config.lr,
    "eps":config.eps,
    "pretrained_model": config.pretrained_model,
    "test_size":config.test_size,
    "random_state":config.random_state,
    "add_special_tokens":config.add_special_tokens,
    "return_attention_mask":config.return_attention_mask,
    "pad_to_max_length":config.pad_to_max_length,
    "do_lower_case":config.do_lower_case,
    "return_tensors":config.return_tensors,
         }

In [10]:
# set random seed and device
import random

device = config.device

random.seed(config.seed_val)
np.random.seed(config.seed_val)
torch.manual_seed(config.seed_val)
torch.cuda.manual_seed_all(config.seed_val)

In [None]:
# view df to ensure tokenizing was successful
df.head()

Train, Validation, & Test Split

In [None]:
# split train, validation, and test according to the proportion 60/20/20 respectively
# train_size and val_size variables are indexes that are derived from sentiment analysis preprocess.ipynb for the sake of data consistency across all 3 sentiment analysis models
train_size = int(len(df) * 0.6)
val_size = int(len(df) * 0.2)
train_df = df[:train_size]
val_df = df[train_size:train_size+val_size]
test_df = df[train_size+val_size:]

In [None]:
# view train_df
train_df.head()

In [None]:
# count of unique label control for train set
print(len(train_df['label'].unique()))
print(train_df.shape)

# count of unique label control for val set
print(len(val_df['label'].unique()))
print(val_df.shape)

# count of unique label control for test set
print(len(test_df['label'].unique()))
print(test_df.shape)

Encoding Training & Validation Data with BERT Tokenizer

In [None]:
# this whole chunk takes around 4m 8.5s to run
# create tokenizer
tokenizer = BertTokenizer.from_pretrained(config.pretrained_model, do_lower_case=config.do_lower_case)

# encode data with BERT tokenizer
encoded_data_train = tokenizer.batch_encode_plus(
    train_df.Review.values,
    add_special_tokens=config.add_special_tokens,
    return_attention_mask=config.return_attention_mask,
    pad_to_max_length=config.pad_to_max_length,
    max_length=config.seq_length,
    return_tensors=config.return_tensors
)
encoded_data_val = tokenizer.batch_encode_plus(
    val_df.Review.values,
    add_special_tokens=config.add_special_tokens,
    return_attention_mask=config.return_attention_mask,
    pad_to_max_length=config.pad_to_max_length,
    max_length=config.seq_length,
    return_tensors=config.return_tensors
)

# create input id
input_ids_train = encoded_data_train['input_ids']
attention_masks_train = encoded_data_train['attention_mask']
labels_train = torch.tensor(train_df.label.values)

input_ids_val = encoded_data_val['input_ids']
attention_masks_val = encoded_data_val['attention_mask']
labels_val = torch.tensor(val_df.label.values)

# encode dataset with bert tokenizer
dataset_train = TensorDataset(input_ids_train, attention_masks_train, labels_train)
dataset_val = TensorDataset(input_ids_val, attention_masks_val, labels_val)

Creating the Base Model

In [None]:
# create bert-base-uncased, which is a smaller pre-trained model
# using num_labels=3 to indicate the number of output labels (positive/neutral/negative).
model = BertForSequenceClassification.from_pretrained(config.pretrained_model,
                                                      num_labels=3,
                                                      output_attentions=False,
                                                      output_hidden_states=False)

Creating Data Loaders for Train and Validation Sets

In [None]:
# create data loader for train and validation set using RandomSampler and SequentialSampler respectively to provide an iterable over the dataset
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler

dataloader_train = DataLoader(dataset_train,
                              sampler=RandomSampler(dataset_train),
                              batch_size=config.batch_size)

dataloader_validation = DataLoader(dataset_val,
                                   sampler=SequentialSampler(dataset_val),
                                   batch_size=config.batch_size)

Defining Optimizer and Scheduler

In [None]:
from transformers import AdamW, get_linear_schedule_with_warmup

optimizer = AdamW(model.parameters(),
                  lr=config.lr,
                  eps=config.eps)


scheduler = get_linear_schedule_with_warmup(optimizer,
                                            num_warmup_steps=0,
                                            num_training_steps=len(dataloader_train)*config.epochs)

Defining Performance Metric Functions

In [None]:
# we will use f1 score and accuracy per class as performance metrics to evaluate our model
from sklearn.metrics import f1_score

# function to return f1 score
def f1_score_func(preds, labels):
    preds_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return f1_score(labels_flat, preds_flat, average='weighted')

# function to return accuracy per class (positive/neutral/negative)
def accuracy_per_class(preds, labels, label_dict):
    label_dict_inverse = {v: k for k, v in label_dict.items()}

    preds_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()

    for label in np.unique(labels_flat):
        y_preds = preds_flat[labels_flat==label]
        y_true = labels_flat[labels_flat==label]
        print(f'Class: {label_dict_inverse[label]}')
        print(f'Accuracy: {len(y_preds[y_preds==label])}/{len(y_true)}\n')

Training the Model

In [None]:
def evaluate(dataloader_val):

    model.eval()

    loss_val_total = 0
    predictions, true_vals = [], []

    for batch in dataloader_val:

        batch = tuple(b.to(config.device) for b in batch)

        inputs = {'input_ids':      batch[0],
                  'attention_mask': batch[1],
                  'labels':         batch[2],
                 }

        with torch.no_grad():
            outputs = model(**inputs)

        loss = outputs[0]
        logits = outputs[1]
        loss_val_total += loss.item()

        logits = logits.detach().cpu().numpy()
        label_ids = inputs['labels'].cpu().numpy()
        predictions.append(logits)
        true_vals.append(label_ids)

    # calculate avareage val loss
    loss_val_avg = loss_val_total/len(dataloader_val)

    predictions = np.concatenate(predictions, axis=0)
    true_vals = np.concatenate(true_vals, axis=0)

    return loss_val_avg, predictions, true_vals

In [None]:
# configure model to device
config.device
model.to(config.device)

In [None]:
# ALREADY TRAINED ONCE NO NEED TO TRAIN AGAIN (TAKES A LONG TIME TO RUN AND TRAIN)
# creating 5 models iteratively using epochs 1 to 5 (choose the best model out of the 5 based on validation metrics)
# pip install ipywidgets (make sure to install this before hand for this chunk to run; it allows us to track the progress bar)

#for epoch in tqdm(range(1, config.epochs+1)):

#    model.train()

#    loss_train_total = 0
    # allows you to see the progress of the training
#    progress_bar = tqdm(dataloader_train, desc='Epoch {:1d}'.format(epoch), leave=False, disable=False)

#    for batch in progress_bar:

#        model.zero_grad()

#        batch = tuple(b.to(config.device) for b in batch)


#        inputs = {'input_ids':      batch[0],
#                  'attention_mask': batch[1],
#                  'labels':         batch[2],
#                 }

#        outputs = model(**inputs)

#        loss = outputs[0]
#        loss_train_total += loss.item()
#        loss.backward()

#        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

#        optimizer.step()
#        scheduler.step()

#        progress_bar.set_postfix({'training_loss': '{:.3f}'.format(loss.item()/len(batch))})


#    torch.save(model.state_dict(), f'_BERT_epoch_{epoch}.model')

#    tqdm.write(f'\nEpoch {epoch}')

#    loss_train_avg = loss_train_total/len(dataloader_train)
#    tqdm.write(f'Training loss: {loss_train_avg}')

#    val_loss, predictions, true_vals = evaluate(dataloader_validation)
#    val_f1 = f1_score_func(predictions, true_vals)
#    tqdm.write(f'Validation loss: {val_loss}')

#    tqdm.write(f'F1 Score (Weighted): {val_f1}');
# save model params and other configs
#with Path('params.json').open("w") as f:
#    json.dump(params, f, ensure_ascii=False, indent=4)

Validation Error Analysis for Chosen Model

In [None]:
# load chosen model (choose bert_epoch_2 model)
model.load_state_dict(torch.load(f'_BERT_epoch_2.model', map_location=torch.device('cpu')))

In [None]:
# step by step predictions on validation set dataframe
# we do this to view predictions in the pandas dataframe and easily filter them and perform validation error analysis

pred_final = []

for i, row in tqdm(val_df.iterrows(), total=val_df.shape[0]):
    predictions = []

    review = row["Review"]
    encoded_data_test_single = tokenizer.batch_encode_plus(
    [review],
    add_special_tokens=config.add_special_tokens,
    return_attention_mask=config.return_attention_mask,
    pad_to_max_length=config.pad_to_max_length,
    max_length=config.seq_length,
    return_tensors=config.return_tensors
    )
    input_ids_test = encoded_data_test_single['input_ids']
    attention_masks_test = encoded_data_test_single['attention_mask']


    inputs = {'input_ids':      input_ids_test.to(device),
              'attention_mask':attention_masks_test.to(device),
             }

    with torch.no_grad():
        outputs = model(**inputs)

    logits = outputs[0]
    logits = logits.detach().cpu().numpy()
    predictions.append(logits)
    predictions = np.concatenate(predictions, axis=0)
    pred_final.append(np.argmax(predictions, axis=1).flatten()[0])

In [None]:
# add predicted label as "pred" into val_df
val_df["pred"] = pred_final

# label to label name mapping
name2label = {"Negative":0,
              "Neutral":1,
             "Positive":2
             }
label_names = list(name2label.keys())
label2name = {v: k for k, v in name2label.items()}

# add predicted label name as "pred_name" into val_df
val_df["pred_name"] = val_df.pred.apply(lambda x: label2name.get(x))

In [None]:
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# print classification report for val_df
print(classification_report(val_df.label_name.values, val_df.pred_name.values))

# print confusion matrix for val_df
confmat_df = pd.DataFrame(confusion_matrix(val_df.label_name.values, val_df.pred_name.values), index=label_names, columns=label_names)
hmap = sns.heatmap(confmat_df, annot=True, fmt="d", cmap="Blues")
hmap.yaxis.set_ticklabels(hmap.yaxis.get_ticklabels(), rotation=0, ha='right')
hmap.xaxis.set_ticklabels(hmap.xaxis.get_ticklabels(), rotation=30, ha='right')
plt.ylabel('Actual Sentiment')
plt.xlabel('Predicted Sentiment');

Testing and Evaluating our Chosen Model

In [None]:
# getting a glimpse of the test set
test_df.head()

In [None]:
# load chosen model (choose bert_epoch_2 model)
model.load_state_dict(torch.load(f'_BERT_epoch_2.model', map_location=torch.device('cpu')))

In [None]:
# step by step predictions on test set dataframe same as above for validation set
# we do this to view predictions in the pandas dataframe and easily filter them and perform test error analysis

pred_final = []

for i, row in tqdm(test_df.iterrows(), total=test_df.shape[0]):
    predictions = []

    review = row["Review"]
    encoded_data_test_single = tokenizer.batch_encode_plus(
    [review],
    add_special_tokens=config.add_special_tokens,
    return_attention_mask=config.return_attention_mask,
    pad_to_max_length=config.pad_to_max_length,
    max_length=config.seq_length,
    return_tensors=config.return_tensors
    )
    input_ids_test = encoded_data_test_single['input_ids']
    attention_masks_test = encoded_data_test_single['attention_mask']


    inputs = {'input_ids':      input_ids_test.to(device),
              'attention_mask':attention_masks_test.to(device),
             }

    with torch.no_grad():
        outputs = model(**inputs)

    logits = outputs[0]
    logits = logits.detach().cpu().numpy()
    predictions.append(logits)
    predictions = np.concatenate(predictions, axis=0)
    pred_final.append(np.argmax(predictions, axis=1).flatten()[0])

In [None]:
# add predicted label as "pred" into test_df
test_df["pred"] = pred_final

# label to label name mapping
name2label = {"Negative":0,
              "Neutral":1,
             "Positive":2
             }
label_names = list(name2label.keys())
label2name = {v: k for k, v in name2label.items()}

# add predicted label name as "pred_name" into test_df
test_df["pred_name"] = test_df.pred.apply(lambda x: label2name.get(x))

In [None]:
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# print classification report for test_df
print(classification_report(test_df.label_name.values, test_df.pred_name.values))

# print confusion matrix for test_df
confmat_df = pd.DataFrame(confusion_matrix(test_df.label_name.values, test_df.pred_name.values), index=label_names, columns=label_names)
hmap = sns.heatmap(confmat_df, annot=True, fmt="d", cmap="Blues")
hmap.yaxis.set_ticklabels(hmap.yaxis.get_ticklabels(), rotation=0, ha='right')
hmap.xaxis.set_ticklabels(hmap.xaxis.get_ticklabels(), rotation=30, ha='right')
plt.ylabel('Actual Sentiment')
plt.xlabel('Predicted Sentiment');