In [None]:
pip install pytorch-pretrained-bert

In [20]:
import re
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from torch.nn.utils import clip_grad_norm_
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import dataloader, Dataset, DataLoader
from pytorch_pretrained_bert import BertTokenizer, BertModel, BertConfig
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix



PADDING_SIZE: int = 300
batch_size: int = 32
EPOCHES: int = 3
VOCAB_SIZE_CONFIG: int = 32000

def _parse_line(line):
    line = line.strip().lower()
    line = line.replace("&nbsp;", " ")
    line = re.sub(r'<br(\s\/)?>', ' ', line)
    line = re.sub(r' +', ' ', line)  # merge multiple spaces to one
    return line


class MovieReviewDataset(Dataset):
    def __init__(self, dataframe, padding_size: int):
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', do_lower_case=True)
        self.reviews = dataframe['Phrase']
        self.labels = torch.as_tensor(np.array(dataframe["Sentiment"]), dtype=torch.long)
        self.max_length = padding_size

    def __len__(self):
        return len(self.reviews)

    def __getitem__(self, item):
        encoded_dict = self.tokenizer.encode_plus(self.reviews[item],
                                                  add_special_tokens = True,
                                                  max_length = self.max_length,
                                                  pad_to_max_length = True,
                                                  return_attention_mask = True,
                                                  return_tensors = 'pt')
        return (encoded_dict['input_ids'].view(-1), 
                encoded_dict['attention_mask'].view(-1), 
                self.labels[item])
    
    

class BertLayerNorm(nn.Module):
    def __init__(self, hidden_size, eps=1e-12):
        super(BertLayerNorm, self).__init__()
        self.weight = nn.Parameter(torch.ones(hidden_size))
        self.bias = nn.Parameter(torch.zeros(hidden_size))
        self.variance_epsilon = eps

    def forward(self, x):
        u = x.mean(-1, keepdim=True)
        s = (x - u).pow(2).mean(-1, keepdim=True)
        x = (x - u) / torch.sqrt(s + self.variance_epsilon)
        return self.weight * x + self.bias
        

class BertForSequenceClassification(nn.Module):
    def __init__(self, num_labels=3):
        super(BertForSequenceClassification, self).__init__()
        self.num_labels = num_labels
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, num_labels)
        nn.init.xavier_normal_(self.classifier.weight)

    def forward(self, input_ids, token_type_ids=None, attention_mask=None, labels=None):
        _, pooled_output = self.bert(input_ids, token_type_ids, attention_mask, output_all_encoded_layers=False)
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        out = F.softmax(logits, dim=1)
        return out


def training(train_data, model, device, criterion, optimizer, epoch):
    correct: int = 0
    data_len: int = 0
    total_loss: np.float = 0
    start_time = time.time()
    print('Training Epoch: {}'.format(epoch))
    model.train()
    for batch_idx, (data, mask, target) in enumerate(train_data):
        if batch_idx%250 == 0:
            print('.',end='')
        
        data = data.to(device)
        mask = mask.to(device)
        target = target.to(device)

        optimizer.zero_grad()
        target_predictions = model(data, token_type_ids=None, attention_mask=mask, labels=target)
        loss = criterion(target_predictions, target)
        loss.backward()
        clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        total_loss += loss.item()
        predicted_target = target_predictions.argmax(dim=1, keepdim=True)
        correct += predicted_target.eq(target.view_as(predicted_target)).sum().item()
        data_len += len(data)
    
    total_loss /= (batch_idx+1)
    acc = 100.00 * (correct/data_len)
    print("\nTrain Loss: {:.6f}     Train Accuracy: {:.2f}%        Training Time: {:.2f} min".format(total_loss, acc, (time.time()-start_time)/60.00))
    return total_loss, acc


def evaluation(test_data, model, device, criterion, batch_size):
    correct = 0
    data_len = 0
    total_loss = 0
    target_out = torch.empty(len(test_data))
    target_predicted_out = torch.empty(lent(test_data))

    model.eval()
    with torch.no_grad():
        for batch_idx, (data, mask, target) in enumerate(test_data):
            data = data.to(device)
            mask = mask.to(device)
            target = target.to(device)
            target_pred = model(data, token_type_ids=None, attention_mask=mask, labels=target)
            loss = criterion(target_pred, target)
            total_loss += loss.item()
            target_prediction = target_pred.argmax(dim=1, keepdim=True)
            correct += target_prediction.eq(target.view_as(target_prediction)).sum().item()
            data_len += len(data)

            target_out[batch_idx*batch_size:data_len] = target
            target_predicted_out[batch_idx*batch_size:data_len] = target_prediction.resize_(len(data))

        total_loss /= (batch_idx+1)
        acc = 100.00 * correct/data_len
        print("Test Loss : {:.6f}     Test Accuracy : {:.2f}%".format(total_loss, acc))
        print(100 * '=')
    return total_loss, acc, target_out, target_predicted_out


def compute_confusion_matrix(True_Class, Predict_Class):
    target_true = True_Class.cpu()
    target_predicted = Predict_Class.cpu()
    conf_matrix = confusion_matrix(target_true, target_predicted)
    conf_matrix_df = pd.DataFrame(conf_matrix, index = ['lab1', 'lab2', 'lab3'], columns = ['lab1', 'lab2', 'lab3'])
    plt.figure(figsize=(8,8), dpi=100)
    sns.set(font_scale=1.5)
    sns.heatmap(conf_matrix_df, annot=True, annot_kws={"size": 14}, cmap=plt.cm.Reds, fmt="d")
    plt.ylabel('True Classes');
    plt.xlabel('Predictaion Classes'); 
    plt.title('Confusion Matrix for Bert Sentiment Analyiss', color = 'darkblue'); 
    plt.show()


def show_plot(train_loss, test_loss, train_accuracy, test_accuracy):
    plt.figure(figsize=(10,8), dpi=100)
    plt.title('Bert Train and Test Loss', color='darkblue')
    plt.plot(train_loss, color='blue', label='Train Loss')
    plt.plot(test_loss, color='orange', label='Test Loss')
    plt.legend()
    plt.show()

    plt.figure(figsize=(10,8), dpi=100)
    plt.title('Bert Accuracy During Time Training', color='darkblue')
    plt.plot(train_accuracy, color='blue', label='Train Acc')
    plt.plot(test_accuracy, color='orange', label='Test Acc')
    plt.legend()
    plt.show()

In [11]:
#If available use GPU memory to load data 
use_cuda = torch.cuda.is_available()
device = torch.device("cuda:0" if use_cuda else "cpu")
print(device)
kwargs = {'num_workers': 0, 'pin_memory': True} if use_cuda else {}

dataframe = pd.read_csv("train.tsv", sep="\t")
# dataframe["Phrase_filter"] = dataframe.Phrase.apply(lambda rec: _parse_line(rec))
train_data, test_data = train_test_split(dataframe, test_size=0.2, shuffle=True)

train_data = MovieReviewDataset(train_data, PADDING_SIZE)
test_data = MovieReviewDataset(test_data, PADDING_SIZE)

train_loader = DataLoader(train_data, batch_size, shuffle=True, **kwargs)
test_loader = DataLoader(test_data, batch_size, shuffle=True, **kwargs)

In [20]:
train_loss, train_accuracy, test_loss, test_accuracy = np.empty(EPOCHES+1)
train_loss[0] , test_loss[0] = 1.5
train_accuracy[0], test_accuracy = 0

# Model
config = BertConfig(vocab_size_or_config_json_file=VOCAB_SIZE_CONFIG)
model = BertForSequenceClassification(4).to(device)

criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam([
    {"params":model.bert.parameters(),"lr": 1e-4},
    {"params":model.classifier.parameters(), "lr": 1e-3}])
scheduler = StepLR(optimizer, step_size=1, gamma=0.1)

for i in range(EPOCHES):
    train_loss[i+1], train_accuracy[i+1] = training(train_loader, model, device, criterion, optimizer, i+1)
    test_loss[i+1], test_accuracy[i+1], True_Class, Predict_Class = evaluation(test_loader, model, device, criterion, batch_size)
    scheduler.step()

# Save Trained Model
torch.save(model.state_dict(), "Bert_Model.pt")

In [None]:
# Show The Results
CM_Calculator(True_Class, Predict_Class)
show_plot(train_loss, test_loss, train_acc, test_acc)