In [13]:
# torch libraries
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# pytorch libraries
import pytorch_lightning as pl
from torchmetrics import F1Score
from torchmetrics.functional import accuracy, auroc #F1Score #f1
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger

# transformers libraries
from transformers import AutoTokenizer, DebertaV2Model, AdamW, get_linear_schedule_with_warmup

import tqdm

import pandas as pd

In [None]:
#set working directory to the downloaded folder
import os
os.chdir('/path/to/your/directory')

In [None]:
#define function to apply mDeBERTa model
LABEL_COLUMNS = ['anger_v2', 'fear_v2', 'disgust_v2', 'sadness_v2', 'joy_v2', 'enthusiasm_v2', 'pride_v2', 'hope_v2']
BASE_MODEL_NAME = "microsoft/mdeberta-v3-base"
tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL_NAME)
batch_size = 8
device = "cuda" if torch.cuda.is_available() else "cpu"


class CrowdCodedTagger(pl.LightningModule):

  def __init__(self, n_classes: int, n_training_steps=None, n_warmup_steps=None):
    super().__init__()
    self.bert = DebertaV2Model.from_pretrained(BASE_MODEL_NAME, return_dict=True)
    self.classifier = nn.Linear(self.bert.config.hidden_size, n_classes)
    self.n_training_steps = n_training_steps
    self.n_warmup_steps = n_warmup_steps
    self.criterion = nn.BCELoss()

  def forward(self, input_ids, attention_mask, labels=None, token_type_ids=None):
    output = self.bert(input_ids, attention_mask=attention_mask)
    output = self.classifier(output.last_hidden_state[:, 0])
    output = torch.sigmoid(output)
    loss = 0
    if labels is not None:
        loss = self.criterion(output, labels)
    return loss, output

  def training_step(self, batch, batch_idx):
    input_ids = batch["input_ids"]
    attention_mask = batch["attention_mask"]
    labels = batch["labels"]
    loss, outputs = self(input_ids, attention_mask, labels)
    self.log("train_loss", loss, prog_bar=True, logger=True)
    return {"loss": loss, "predictions": outputs, "labels": labels}

  def validation_step(self, batch, batch_idx):
    input_ids = batch["input_ids"]
    attention_mask = batch["attention_mask"]
    labels = batch["labels"]
    loss, outputs = self(input_ids, attention_mask, labels)
    self.log("val_loss", loss, prog_bar=True, logger=True)
    return loss

  def test_step(self, batch, batch_idx):
    input_ids = batch["input_ids"]
    attention_mask = batch["attention_mask"]
    labels = batch["labels"]
    loss, outputs = self(input_ids, attention_mask, labels)
    self.log("test_loss", loss, prog_bar=True, logger=True)
    return loss

  def training_epoch_end(self, outputs):

    labels = []
    predictions = []
    for output in outputs:
      for out_labels in output["labels"].detach().cpu():
        labels.append(out_labels)
      for out_predictions in output["predictions"].detach().cpu():
        predictions.append(out_predictions)

    labels = torch.stack(labels).int()
    predictions = torch.stack(predictions)

    for i, name in enumerate(LABEL_COLUMNS):
      class_roc_auc = auroc(predictions[:, i], labels[:, i])
      self.logger.experiment.add_scalar(f"{name}_roc_auc/Train", class_roc_auc, self.current_epoch)

  def configure_optimizers(self):

    optimizer = AdamW(self.parameters(), lr=2e-5) #DEFINING LEARNING RATE

    scheduler = get_linear_schedule_with_warmup(
      optimizer,
      num_warmup_steps=self.n_warmup_steps,
      num_training_steps=self.n_training_steps
    )

    return dict(
      optimizer=optimizer,
      lr_scheduler=dict(
        scheduler=scheduler,
        interval='step'
      )
    )

# Define function for inference
def predict_labels(df):
    input_text = df['sentence'].tolist()
    num_inputs = len(input_text)
    num_batches = (num_inputs - 1) // batch_size + 1
    batched_input = [input_text[i * batch_size:(i + 1) * batch_size] for i in range(num_batches)]
    batched_output = []

    for i, batch in enumerate(tqdm.tqdm(batched_input)):
        encoded_input = tokenizer(batch, padding=True, truncation=True, max_length=512, return_tensors='pt')
        outputs = model(**encoded_input.to(device))

        # Extract the decimal numbers from the tensor
        tensor_values = outputs[1].tolist()
        decimal_numbers = [[num for num in sublist] for sublist in tensor_values]

        # Create Pandas DataFrame
        output_df = pd.DataFrame(decimal_numbers, columns=LABEL_COLUMNS)

        # Apply threshold function to DataFrame
        threshold = 0.65
        threshold_fn = lambda x: 1 if x >= threshold else 0
        output_df = output_df.applymap(threshold_fn)

        # Concatenate input DataFrame with output DataFrame
        input_df = df.iloc[i * batch_size:(i + 1) * batch_size].reset_index(drop=True)
        output_df = pd.concat([input_df, output_df], axis=1)

        batched_output.append(output_df)


    # Concatenate all batches into a single output DataFrame
    output_df = pd.concat(batched_output, ignore_index=True)

    return output_df

In [None]:
#provide an input dataframe, for example by loading in a csv file
#by default, the column in the dataframe including sentences to be classified should be called "sentence" (can be adjusted above)
input_df = pd.read_csv("./example_data.csv")
input_df

In [None]:
# putting model into evaluation mode
model = CrowdCodedTagger(n_classes=8)
model.load_state_dict(torch.load("./model/pytorch_model.pt"), strict = False)
model.to(device)
model.eval()

In [None]:
#apply function to the input_df
results = predict_labels(input_df)
print(results)

In [None]:
#save results as a csv file in the working directory
result.to_csv('example_results.csv', index=False)