In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os

COLORS = {
    'red': '\033[91m',
    'green': '\033[92m',
    'yellow': '\033[93m',
    'blue': '\033[94m',
    'magenta': '\033[95m',
    'cyan': '\033[96m',
    'bold': '\033[1m',
    'reset': '\033[0m'
}

DEMET_PATH = '/content/drive/MyDrive/core/'
LOGPATH = DEMET_PATH + 'logs/'
if (os.path.exists(LOGPATH) == False):
    os.makedirs(LOGPATH)
else:
    os.makedirs(LOGPATH, exist_ok=True)
import os
import time

class Logger:
    def __init__(self, to_file = False):
        self.to_file = to_file
        self.data = time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime())
        self.path = os.path.join(LOGPATH, 'log-' + self.data + '.log')
        self.colors = COLORS

    def __str__(self):
        if self.to_file:
            return 'Logging to file'

    def print(self, message, color = 'reset', bold = False):
        if bold:
            print(self.colors['bold'] + self.colors[color] + '[' + self.string_by_time() + ']:' + ' ' + self.colors['reset'] + message)
        else:
            print(self.colors[color] + '[' + self.string_by_time() + ']:' + ' ' + self.colors['reset'] + message)

    def log(self, message):
        if self.to_file:
            with open(self.path, 'a') as file:
                file.write('[' + self.string_by_time() + ']:' + ' ' + message + '\n')

    def print_and_log(self, message, color = 'reset', bold = False):
        if bold:
            print(self.colors['bold'] + self.colors[color] + '[' + self.string_by_time() + ']:' + ' ' + self.colors['reset'] + message)
        else:
            print(self.colors[color] + '[' + self.string_by_time() + ']:' + ' ' + self.colors['reset'] + message)
        if self.to_file:
            with open(self.path, 'a') as file:
                file.write('[' + self.string_by_time() + ']:' + ' ' + message + '\n')

    def string_by_time(self):
        return time.strftime('%H:%M:%S', time.localtime())

logger = Logger(True)

In [None]:
MODEL_PATH = '/content/drive/MyDrive/core/models/'
CORE_PATH = '/content/drive/MyDrive/core/'
DATA_PATH = '/content/drive/MyDrive/core/shuffled_data.csv'

BERT_VARIANTS = [
                  'FacebookAI/roberta-base',
                  # 'xlm-roberta-base', #NOT WORKING
                  # 'llama-uncased', #NOT WORKING
                  'distilbert/distilbert-base-cased',
                  'bert-base-cased',
                  # 'bert-large-uncased',
                  #'bert-large-cased',
                ]
CHA_TOKENS = [
              '[CHA REPETITION]',
              '[CHA RETRACING]',
              '[CHA SHORT PAUSE]',
              '[CHA MEDIUM PAUSE]',
              '[CHA LONG PAUSE]',
              '[CHA TRAILING OFF]',
              '[CHA PHONOLOGICAL FRAGMENT]',
              '[CHA INTERPOSED WORD]',
              '[CHA FILLER]',
              '[CHA NON COMPLETION OF WORD]',
              '[CHA BELCHES]',
              '[CHA HISSES]',
              '[CHA GRUNTS]',
              '[CHA WHINES]',
              '[CHA COUGHS]',
              '[CHA HUMS]',
              '[CHA ROARS]',
              '[CHA WHISTLES]',
              '[CHA CRIES]',
              '[CHA LAUGHS]',
              '[CHA SNEEZES]',
              '[CHA WHIMPERS]',
              '[CHA GASPS]',
              '[CHA MOANS]',
              '[CHA SIGHS]',
              '[CHA YAWNS]',
              '[CHA GROANS]',
              '[CHA MUMBLES]',
              '[CHA SINGS]',
              '[CHA YELLS]',
              '[CHA GROWLS]',
              '[CHA PANTS]',
              '[CHA SQUEALS]',
              '[CHA VOCALIZES]',
              '[CHA TRAILING OFF QUESTION]',
              '[CHA QUESTION WITH EXCLAMATION]',
              '[CHA INTERRUPTION]',
              '[CHA INTERRUPTION OF QUESTION]',
              '[CHA SELFINTERRUPTION]',
              '[CHA SELFINTERRUPTED QUESTION]',
            ]

class ModelBaseConfig:
    def __init__(self,model_name):
        self.model_name = model_name
        self.extra_tokens = CHA_TOKENS

    def get_model_name(self):
        return self.model_name

    def get_extra_tokens(self):
        return self.extra_tokens



In [None]:
import torch
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
from transformers import BertForSequenceClassification, BertTokenizer, RobertaForSequenceClassification, RobertaTokenizer
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix
from torch.utils.data import DataLoader, RandomSampler, TensorDataset, Subset
from transformers import AdamW
from sklearn.model_selection import KFold
import pandas as pd

logger = Logger()

class ModelBase:
    def __init__(self,ModelBaseConfig):
        self.model_name = ModelBaseConfig.get_model_name()
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.extra_tokens = ModelBaseConfig.get_extra_tokens()
        if 'roberta' in self.model_name:
            logger.print("Initializing RoBERTa Model ...", "green")
            self.model = RobertaForSequenceClassification.from_pretrained(self.model_name, num_labels=2).to(self.device)
            self.tokenizer = RobertaTokenizer.from_pretrained(self.model_name)
        else:
            logger.print("Initializing BERT Model ...", "green")
            self.model = BertForSequenceClassification.from_pretrained(self.model_name, num_labels=2).to(self.device)
            self.tokenizer = BertTokenizer.from_pretrained(self.model_name)
        if self.extra_tokens:
            self.add_tokens()

    def add_tokens(self):
        try:
            logger.print("Adding Extra Tokens ...", "green")
            self.tokenizer.add_tokens(self.extra_tokens, special_tokens = True)
            self.model.resize_token_embeddings(len(self.tokenizer))
        except Exception as e:
            logger.print("Error: " + str(e), "red")

    def train_epoch(self,dataloader,optimizer):
        self.model.train()
        total_loss = 0
        for batch in dataloader:
            optimizer.zero_grad()
            input_ids, attention_mask, labels = batch
            input_ids = input_ids.to(self.device)
            attention_mask = attention_mask.to(self.device)
            labels = labels.to(self.device)
            outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            total_loss += loss.item()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
            optimizer.step()
        return total_loss / len(dataloader)

    def evaluate(self, dataloader):
        self.model.eval()
        predictions = []
        actual_labels = []
        with torch.no_grad():
            for batch in dataloader:
                input_ids, attention_mask, labels = batch
                input_ids = input_ids.to(self.device)
                attention_mask = attention_mask.to(self.device)
                labels = labels.to(self.device)
                outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
                val_output = torch.argmax(outputs.logits, dim=1).cpu().detach().numpy()
                val_labels = labels.cpu().detach().numpy()
                predictions.extend(val_output)
                actual_labels.extend(val_labels)
        accuracy = accuracy_score(actual_labels, predictions)
        precision = precision_score(actual_labels, predictions)
        recall = recall_score(actual_labels, predictions)
        f1 = f1_score(actual_labels, predictions)
        matrix = confusion_matrix(actual_labels, predictions)
        logger.print("Evaluation Results.", "green")
        logger.print("Accuracy: " + str(accuracy), "green")
        logger.print("Precision: " + str(precision), "green")
        logger.print("Recall: " + str(recall), "green")
        logger.print("F1 Score: " + str(f1), "green")
        return accuracy, precision, recall, f1, matrix

    def save_model(self, path):
        self.model.save_pretrained(path)
        self.tokenizer.save_pretrained(path)
        logger.print("Model Saved Successfully.", "green")


def cross_validation(model_name, data, labels, epochs, batch_size, learning_rate, n_splits):
    kfold = KFold(n_splits=n_splits, shuffle=True)
    C = ModelBaseConfig(model_name)
    M = ModelBase(C)
    encodings = M.tokenizer(data, truncation=True, padding=True, return_tensors="pt")
    dataset = TensorDataset(encodings['input_ids'], encodings['attention_mask'], torch.tensor(labels))

    scores_matrix = np.zeros((n_splits, epochs))

    for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset)):
        path = ""
        scores_path = CORE_PATH + str("scores")
        if 'roberta' in model_name:
          path = scores_path + '/roberta'
        elif 'distil' in model_name:
          path = scores_path + '/distil'
        else:
          path = scores_path + '/base'
        if not os.path.exists(path):
          os.makedirs(path)
        path = f"{path}/scores_{fold}.csv"
        with open(path, 'a') as f:
          f.write("accuracy,precision,recall,f1\n")
          config = ModelBaseConfig(model_name)
          model = ModelBase(config)
          logger.print(f"Training Model - Fold {fold+1}/{n_splits} ...","green")

          train_subset = Subset(dataset, train_idx)
          val_subset = Subset(dataset, val_idx)

          train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
          val_loader = DataLoader(val_subset, batch_size=batch_size)

          optimizer = AdamW(model.model.parameters(), lr=learning_rate, weight_decay=0.01)

          for epoch in range(epochs):
              logger.print(f"Epoch {epoch + 1}/{epochs}","green")
              train_loss = model.train_epoch(train_loader, optimizer)
              logger.print(f"Train loss: {train_loss}","green")

              accuracy, precision, recall, f1, matrix = model.evaluate(val_loader)
              f.write(f"{accuracy},{precision},{recall},{f1}\n")
              plot_confusion(matrix, ['Non-Dementia', 'Dementia'], model_name, fold, epoch)

              logger.print(f"Validation accuracy: {accuracy}","green")
              scores_matrix[fold, epoch] = accuracy

          model.save_model(f"{CORE_PATH}models/{model_name}_{fold}")
    return scores_matrix



def plot_confusion(conf_matrix, labels, model_name, fold, epoch):
    title = f"{model_name} - Fold {fold} - Epoch {epoch}"
    if not os.path.exists(CORE_PATH + str("plots")):
      os.makedirs(CORE_PATH + str("plots"))
    path = f"{CORE_PATH}plots/{model_name}_fold_{fold}_epoch_{epoch}.png"
    plt.figure(figsize=(10, 7))
    sns.heatmap(conf_matrix, annot=True, fmt='d', xticklabels=labels, yticklabels=labels)
    plt.title(title)
    plt.savefig(path)

def save_scores(scores_matrix, model_name):
    scores_df = pd.DataFrame(scores_matrix)
    scores_df.to_csv(f"scores/{model_name}_scores.csv")

In [None]:
df = pd.read_csv(DATA_PATH)
data = df['text'].tolist()
labels = df['gt'].tolist()

In [None]:
%%capture
for model_name in BERT_VARIANTS:
    epochs = 5
    batch_size = 16
    learning_rate = 1e-5
    n_splits = 5

    cross_validation(model_name, data, labels, epochs, batch_size, learning_rate, n_splits)

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at FacebookAI/roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at FacebookAI/roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at FacebookAI/roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weig