In [None]:
import pandas as pd

import numpy as np 

import matplotlib.pyplot as plt

import torch
import torch.nn as nn
#import torch.nn.functional as F
from torch.optim import Adam
from torch.utils.data import Dataset,DataLoader

from torchvision import models,transforms

from tqdm import tqdm

from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import f1_score
from sklearn.metrics import classification_report
from sklearn.metrics import brier_score_loss

from scipy.stats import bernoulli, uniform

from transformers import BertTokenizer, BertModel

In [None]:
df_test = pd.read_csv('../input/reddit-mental-health-data/Reddit Dataset/final_datasets/final_test.csv')

In [None]:
df_test.head()

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') #Use GPU if it's available or else use CPU.
print(device)
torch.cuda.empty_cache()

In [None]:
import torch
import numpy as np
from transformers import BertTokenizer

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
labels = {'adhd':0,
          'anxiety':1,
          'bipolar':2,
          'depression':3,
          'ptsd':4,
          'none':5
          }

# Dataset

In [None]:
class Dataset(torch.utils.data.Dataset):

    def __init__(self, df, max_length, input_field):

        self.labels = [torch.eye(6)[labels[label], :] for label in df['class_name']]
        self.texts = [tokenizer(text, 
                               padding='max_length', max_length = max_length, truncation=True,
                                return_tensors="pt") for text in df[input_field]]

    def classes(self):
        return self.labels

    def __len__(self):
        return len(self.labels)

    def get_batch_labels(self, idx):
        # Fetch a batch of labels
        return np.array(self.labels[idx])

    def get_batch_texts(self, idx):
        # Fetch a batch of inputs
        return self.texts[idx]

    def __getitem__(self, idx):

        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)

        return batch_texts, batch_y

# Model

In [None]:
from torch import nn
from transformers import BertModel

class BertClassifier(nn.Module):

    def __init__(self, dropout=0.3):

        super(BertClassifier, self).__init__()

        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.dropout = nn.Dropout(dropout)
        self.linear = nn.Linear(768, 6)
        self.sigmoid = nn.Sigmoid()

    def forward(self, input_id, mask):

        output = self.bert(input_ids= input_id, attention_mask=mask)
        last_hidden_layer = output.last_hidden_state
        del output
        pooled_output = torch.mean(last_hidden_layer, dim = 1)
        dropout_output = self.dropout(pooled_output)
        linear_output = self.linear(dropout_output)
        final_layer = self.sigmoid(linear_output)

        return final_layer

# Evaluation functions

In [None]:
def calc_bins(preds,labels_oneh):
    # Assign each prediction to a bin
    num_bins = 5
    bins = torch.linspace(0.1, 1, num_bins)
    bins = bins.to(device)
    preds = preds.to(device)
    #print(preds)
    #print(bins)
    binned = torch.bucketize(preds, bins)

    # Save the accuracy, confidence and size of each bin
    bin_accs = torch.zeros(num_bins)
    bin_confs = torch.zeros(num_bins)
    bin_sizes = torch.zeros(num_bins)

    for bin in range(num_bins):
        bin_sizes[bin] = len(preds[binned == bin])
        if bin_sizes[bin] > 0:
            bin_accs[bin] = (labels_oneh[binned==bin]).sum() / bin_sizes[bin]
            bin_confs[bin] = (preds[binned==bin]).sum() / bin_sizes[bin]

    return bins, binned, bin_accs, bin_confs, bin_sizes

In [None]:
def get_metrics(preds,labels):
    ECE = 0
    MCE = 0
    bins, _, bin_accs, bin_confs, bin_sizes = calc_bins(preds,labels)

    for i in range(len(bins)):
        abs_conf_dif = abs(bin_accs[i] - bin_confs[i])
        ECE += (bin_sizes[i] / sum(bin_sizes)) * abs_conf_dif
        MCE = max(MCE, abs_conf_dif)

    return ECE, MCE

In [None]:
def evaluate_posts(model, df_test, max_length, input_field):
    
    test = Dataset(df_test, max_length, input_field)

    test_dataloader = torch.utils.data.DataLoader(test, batch_size=16)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    if use_cuda:

        model = model.cuda()

    model.eval()
    pred_lst = []
    gt_lst = []
    
    total_acc_test = 0
    total_loss_test = 0
    
    criterion = nn.CrossEntropyLoss()
    
    with torch.no_grad():

        for test_input, test_label in test_dataloader:

            test_label = test_label.squeeze(1).to(device)
            mask = test_input['attention_mask'].to(device)
            input_id = test_input['input_ids'].squeeze(1).to(device)

            output = model(input_id, mask)
            
            batch_loss = criterion(output, test_label)
            total_loss_test += batch_loss.item()

            acc = (output.argmax(dim=1) == test_label.argmax(dim=1)).sum().item()
            total_acc_test += acc
            
            output=output.detach().cpu().numpy()
            pred_lst.append(output)
            test_label = test_label.detach().cpu().numpy()
            gt_lst.append(test_label)
            
        pred_lst=np.concatenate(pred_lst, axis=0)
        gt_lst=np.concatenate(gt_lst, axis=0)
        
        ECE, MCE = get_metrics(torch.Tensor(pred_lst).cpu(),torch.Tensor(gt_lst).cpu())
        
        pred_lst2 = np.argmax(pred_lst, axis=1)
        gt_lst2 = np.argmax(gt_lst, axis=1)
        auc_lst=[]
        for k in range(0,5):
            tmp_gt=gt_lst[:, k]
            tmp_pred=pred_lst[:,k]
            try:
                tmp_auc=roc_auc_score(tmp_gt, tmp_pred)
            except ValueError:
                print("error")
                tmp_auc=0
            auc_lst.append(tmp_auc)

        brier_score = np.mean(np.sum((pred_lst - gt_lst)**2, axis=1))
        auc_lst=np.array(auc_lst)
        auc=np.mean(auc_lst)
        f1 = f1_score(gt_lst2, pred_lst2, average='weighted')
        print(classification_report(gt_lst2, pred_lst2, labels=[0,1, 2, 3,4,5]))
    
        print(
            f'Test Loss: {total_loss_test / len(test): .3f} \
            | Test Accuracy: {total_acc_test / len(df_test): .3f} \
            | Test AUC: {auc: .3f} \
            | Test f1 Score: {f1: .3f}\
            | Brier Score: {brier_score: .3f} \
            | Expected Calibration Score: {ECE: .3f} \
            | Maximum Calibration Score: {MCE: .3f} ')
    

# Evaluating on posts

In [None]:
epochs = 5
model = BertClassifier()
checkpoint = torch.load('../input/final-bert-models/bertpost.pt')
model.load_state_dict(checkpoint['model_state_dict'],strict=False)
del checkpoint
lr = 1e-5
batch_size = 16

evaluate_posts(model, df_test, 512, 'post')

# Evaluating on Titles

In [None]:
model = BertClassifier()
checkpoint = torch.load('../input/final-bert-models/berttitle.pt')
model.load_state_dict(checkpoint['model_state_dict'],strict=False)
del checkpoint

evaluate_posts(model, df_test, 512, 'post')

# Evaluating on Posts + Titles

In [None]:
model = BertClassifier()
checkpoint = torch.load('../input/final-bert-models/berttitle_post.pt')
model.load_state_dict(checkpoint['model_state_dict'],strict=False)
del checkpoint

evaluate_posts(model, df_test, 512, 'post')