<a href="https://colab.research.google.com/github/navrgithub/NLP_Authorship_Attribution/blob/main/LIWC_DeBERTa_CNN_task3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import DebertaTokenizer, DebertaModel
from sklearn.model_selection import train_test_split

In [None]:
# Define the LIWC+DeBERTA-CNN Model
class LIWCDebertaCNN(nn.Module):
    def __init__(self, num_classes):
        super(LIWCDebertaCNN, self).__init__()
        self.liwc_features = 100  # Number of LIWC features
        self.deberta_hidden_size = 768  # Hidden size of DeBERTA model
        self.conv_filters = 256  # Number of CNN filters
        self.kernel_size = 3  # CNN kernel size

        self.liwc_embedding = nn.Embedding(num_liwc_categories, self.liwc_features)
        self.deberta = DebertaModel.from_pretrained('microsoft/deberta-base')
        self.cnn = nn.Conv1d(self.liwc_features + self.deberta_hidden_size, self.conv_filters, self.kernel_size)
        self.fc = nn.Linear(self.conv_filters, num_classes)

    def forward(self, liwc_inputs, deberta_inputs):
        liwc_embedded = self.liwc_embedding(liwc_inputs)
        deberta_outputs = self.deberta(**deberta_inputs).last_hidden_state

        # Reshape DeBERTA outputs to match LIWC features
        deberta_outputs = deberta_outputs.permute(0, 2, 1)

        # Concatenate LIWC and DeBERTA features
        combined_features = torch.cat((liwc_embedded, deberta_outputs), dim=2)

        # Apply CNN
        cnn_outputs = self.cnn(combined_features)

        # Max pooling
        pooled_outputs = F.max_pool1d(cnn_outputs, cnn_outputs.size(2)).squeeze(2)

        # Fully connected layer
        logits = self.fc(pooled_outputs)

        return logits

In [None]:
# Define the LIWCDebertaDataset class
class LIWCDebertaDataset(Dataset):
    def __init__(self, data, tokenizer, max_length):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        generation = self.data['Generation'].iloc[index]
        label = self.data['Label'].iloc[index]

        encoded_inputs = self.tokenizer(generation, truncation=True, padding='max_length', max_length=self.max_length, return_tensors='pt')
        input_ids = encoded_inputs['input_ids'].squeeze(0)
        attention_mask = encoded_inputs['attention_mask'].squeeze(0)

        return {'input_ids': input_ids, 'attention_mask': attention_mask, 'label': label}

In [None]:
# Prepare the data
df = pd.read_csv('final_task3_data.csv')

# Split the data into train and test sets
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

In [None]:
# Initialize the DeBERTA tokenizer
tokenizer = DebertaTokenizer.from_pretrained('microsoft/deberta-base')

In [None]:
# Define hyperparameters
batch_size = 16
num_epochs = 10
learning_rate = 1e-4
num_classes = 2  # Number of label classes

In [None]:
# Create LIWCDebertaDataset instances
train_dataset = LIWCDebertaDataset(train_df, tokenizer, max_length=128)
test_dataset = LIWCDebertaDataset(test_df, tokenizer, max_length=128)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [None]:
# Initialize LIWCDebertaCNN model
model = LIWCDebertaCNN(num_classes)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Training loop
for epoch in range(num_epochs):
  model.train()
  running_loss = 0.0
  correct_predictions = 0
  for batch in train_loader:
    input_ids = batch['input_ids']
    attention_mask = batch['attention_mask']
    labels = batch['label']

    # Zero the gradients
    optimizer.zero_grad()

    # Forward pass
    logits = model(input_ids, attention_mask)
    loss = criterion(logits, labels)

    # Backward pass and optimization
    loss.backward()
    optimizer.step()

    # Update statistics
    running_loss += loss.item() * input_ids.size(0)
    _, predictions = torch.max(logits, dim=1)
    correct_predictions += torch.sum(predictions == labels).item()

    # Calculate epoch statistics
    epoch_loss = running_loss / len(train_dataset)
    epoch_accuracy = correct_predictions / len(train_dataset)

    # Evaluation on test set
    model.eval()
    test_loss = 0.0
    test_correct_predictions = 0

    with torch.no_grad():
      for batch in test_loader:
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['label']

        logits = model(input_ids, attention_mask)
        loss = criterion(logits, labels)

        test_loss += loss.item() * input_ids.size(0)
        _, predictions = torch.max(logits, dim=1)
        test_correct_predictions += torch.sum(predictions == labels).item()

    # Calculate test set statistics
    test_loss /= len(test_dataset)
    test_accuracy = test_correct_predictions / len(test_dataset)

    # Print epoch results
    print(f'Epoch {epoch + 1}/{num_epochs} | '
          f'Training Loss: {epoch_loss:.4f} | Training Accuracy: {epoch_accuracy:.4f} | '
          f'Test Loss: {test_loss:.4f} | Test Accuracy: {test_accuracy:.4f}')


In [None]:
# Save the trained model
torch.save(model.state_dict(), 'liwc_deberta_cnn_model.pth')

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score

# Evaluation on test set
model.eval()
predictions = []
true_labels = []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['label']

        logits = model(input_ids, attention_mask)
        _, batch_predictions = torch.max(logits, dim=1)

        predictions.extend(batch_predictions.tolist())
        true_labels.extend(labels.tolist())

# Calculate metrics
f1 = f1_score(true_labels, predictions, average='weighted')
precision = precision_score(true_labels, predictions, average='weighted')
recall = recall_score(true_labels, predictions, average='weighted')

In [None]:
# Print the metrics
print(f'F1 Score: {f1:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')