In [1]:
import pandas as pd
from transformers import BertTokenizer
import torch
from torch import nn
from transformers import BertModel


In [2]:
answers = pd.read_csv("data/processed_data_english.csv")

In [3]:
len(answers['TEXT'][0])

1675

In [4]:
answers['cOPN'].value_counts()

0    1532
1    1431
Name: cOPN, dtype: int64

In [5]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [6]:
max_length = 512  # BERT's maximum length

# Tokenizing
input_ids = []
attention_masks = []

for text in answers['TEXT']:
    encoded_dict = tokenizer.encode_plus(
                        text,                      # Text to encode
                        add_special_tokens = True, # Add '[CLS]' and '[SEP]'
                        max_length = max_length,   # Pad & truncate
                        pad_to_max_length = True,
                        return_attention_mask = True,   # Construct attention masks
                        return_tensors = 'pt',     # Return PyTorch tensors
                        truncation=True,
                   )

    input_ids.append(encoded_dict['input_ids'])
    attention_masks.append(encoded_dict['attention_mask'])

# Convert lists to tensors
input_ids = torch.cat(input_ids, dim=0)
attention_masks = torch.cat(attention_masks, dim=0)




In [7]:
from sklearn.model_selection import train_test_split

# Splitting the dataset for the extraversion trait (cEXT)
X_train_ext, X_test_ext, y_train_ext, y_test_ext = train_test_split(input_ids, answers['cEXT'], 
                                                                    random_state=42, test_size=0.2)

# Do the same for attention masks
train_masks_ext, test_masks_ext, _, _ = train_test_split(attention_masks, answers['cEXT'],
                                                         random_state=42, test_size=0.2)


In [8]:
y_train_ext = y_train_ext.reset_index(drop=True)
y_test_ext = y_test_ext.reset_index(drop=True)

In [9]:
y_train_ext = torch.tensor(y_train_ext)
y_test_ext = torch.tensor(y_test_ext)

In [10]:
len(y_test_ext)

593

In [11]:
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

batch_size = 8  # You can adjust batch size

# Create the DataLoader for training set
train_data = TensorDataset(X_train_ext, train_masks_ext, y_train_ext)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

# Create the DataLoader for test set
test_data = TensorDataset(X_test_ext, test_masks_ext, y_test_ext)
test_sampler = SequentialSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)


In [12]:
class BertBinaryClassifier(nn.Module):
    def __init__(self, freeze_bert=False):
        super(BertBinaryClassifier, self).__init__()

        # Use the 'bert-base-uncased' pre-trained BERT model
        self.bert = BertModel.from_pretrained('bert-base-uncased')

        # Enhanced classifier layers
        self.classifier = nn.Sequential(
            nn.Dropout(0.1),  # Dropout for regularization
            nn.Linear(768, 512),  # First layer
            nn.ReLU(),  # Activation function
            nn.Dropout(0.1),  # Additional dropout layer for regularization
            nn.Linear(512, 128),  # Second layer
            nn.ReLU(),  # Activation function
            nn.Linear(128, 2)  # Final layer for binary classification
        )

        if freeze_bert:
            for param in self.bert.parameters():
                param.requires_grad = False

    def forward(self, input_ids, attention_mask):
        # Pass the inputs through BERT
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        
        # Extract the last hidden state of the token `[CLS]` for classification
        cls_output = outputs.pooler_output

        # Pass the BERT output through the classifier
        logits = self.classifier(cls_output)

        return logits


In [13]:
model = BertBinaryClassifier()


In [14]:
from torch.optim import Adam

# Define the optimizer
optimizer = Adam(model.parameters(), lr=1e-5)  # Learning rate is adjustable

# Define the loss function
loss_fn = nn.CrossEntropyLoss()


In [15]:
device = torch.device("mps")
model.to(device)


BertBinaryClassifier(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, elementw

In [16]:
epochs = 4  # This can be adjusted based on your dataset and model performance


In [17]:
def train(model, train_dataloader, optimizer, loss_fn, device):
    model.train()  # Set the model to training mode
    total_loss = 0

    for batch in train_dataloader:
        # Move batch to device
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)

        # Clear previously calculated gradients
        model.zero_grad()

        # Perform a forward pass. This will return logits.
        outputs = model(input_ids, attention_mask)

        # Compute loss and accumulate the loss value
        loss = loss_fn(outputs, labels)
        total_loss += loss.item()

        # Perform a backward pass to calculate gradients
        loss.backward()

        # Update parameters
        optimizer.step()

    # Compute the average loss over the training data
    avg_train_loss = total_loss / len(train_dataloader)  
    
    return avg_train_loss

def evaluate(model, validation_dataloader, loss_fn, device):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0

    for batch in validation_dataloader:
        # Move batch to device
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)

        # Forward pass, calculate logits
        with torch.no_grad():
            outputs = model(input_ids, attention_mask)

        # Compute loss and accumulate the loss value
        loss = loss_fn(outputs, labels)
        total_loss += loss.item()

    avg_val_loss = total_loss / len(validation_dataloader)

    return avg_val_loss


In [18]:
def save_checkpoint(model, optimizer, epoch, filename):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict()
    }
    torch.save(checkpoint, filename)


In [19]:
best_val_loss = float('inf')

for epoch in range(epochs):
    print(f'Epoch {epoch + 1}/{epochs}')
    print('-' * 10)

    train_loss = train(model, train_dataloader, optimizer, loss_fn, device)
    print(f'Training loss: {train_loss}')

    val_loss = evaluate(model, test_dataloader, loss_fn, device)
    print(f'Validation loss: {val_loss}')

    # Save checkpoint if validation loss has improved
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        save_checkpoint(model, optimizer, epoch, 'best_model_checkpoint.pth')
        print("Checkpoint saved")


Epoch 1/4
----------
Training loss: 0.6926558505405079
Validation loss: 0.6938677549362182
Checkpoint saved
Epoch 2/4
----------
Training loss: 0.6944544969584404
Validation loss: 0.6932699282964071
Checkpoint saved
Epoch 3/4
----------
Training loss: 0.6931815976245637
Validation loss: 0.6934319067001343
Epoch 4/4
----------
Training loss: 0.6922610656982319
Validation loss: 0.6996817016601562


In [20]:
# def load_checkpoint(model, optimizer, filename):
#     checkpoint = torch.load(filename)
#     model.load_state_dict(checkpoint['model_state_dict'])
#     optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
#     return checkpoint['epoch']

# # Before starting training, load the checkpoint if it exists
# checkpoint_filename = 'best_model_checkpoint.pth'
# try:
#     start_epoch = load_checkpoint(model, optimizer, checkpoint_filename) + 1
#     print(f"Resuming training from epoch {start_epoch}")
# except FileNotFoundError:
#     print("No checkpoint found, starting from scratch")
#     start_epoch = 0

# # Then, adjust your training loop to start from `start_epoch`
# for epoch in range(start_epoch, epochs):
#     # Training loop continues as before


In [25]:
def predict(model, test_dataloader, device):
    model.eval()  # Set the model to evaluation mode

    predictions = []
    true_labels = []

    with torch.no_grad():
        for batch in test_dataloader:
            input_ids = batch[0].to(device)
            attention_mask = batch[1].to(device)
            labels = batch[2].to(device)

            outputs = model(input_ids, attention_mask)
            logits = outputs

            # Move logits and labels to CPU
            logits = logits.detach().cpu().numpy()
            label_ids = labels.to('cpu').numpy()

            # Store predictions and true labels
            predictions.append(logits)
            true_labels.append(label_ids)

    return predictions, true_labels


In [26]:
predictions, true_labels = predict(model, test_dataloader, device)


In [27]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import numpy as np

# Flatten the outputs
flat_predictions = np.concatenate(predictions, axis=0)
flat_true_labels = np.concatenate(true_labels, axis=0)

# Convert logits to predicted class (0 or 1)
flat_predictions = np.argmax(flat_predictions, axis=1).flatten()


In [28]:
accuracy = accuracy_score(flat_true_labels, flat_predictions)
precision, recall, f1, _ = precision_recall_fscore_support(flat_true_labels, flat_predictions, average='binary')

print(f'Accuracy: {accuracy}')
print(f'Precision: {precision}')
print(f'Recall: {recall}')
print(f'F1 Score: {f1}')


Accuracy: 0.5025295109612141
Precision: 0.5025295109612141
Recall: 1.0
F1 Score: 0.6689113355780022
