In [None]:
# Install necessary libraries and clone the dataset
!git clone https://github.com/ramybaly/Article-Bias-Prediction.git  # Cloning the dataset repository

# Install required libraries
!pip install optuna transformers peft datasets scikit-learn evaluate

# Import necessary libraries
import os
import json
import pandas as pd
import numpy as np
import optuna
from datasets import Dataset
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, BertForSequenceClassification, TrainingArguments, Trainer
from peft import get_peft_model, LoraConfig, TaskType
from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

# Change directory to the cloned dataset
%cd Article-Bias-Prediction


In [None]:
def print_number_of_trainable_model_parameters(model):
    """
    Print the number of trainable and total parameters in the model, and the percentage of trainable parameters.

    Args:
        model: A PyTorch model whose parameters are to be analyzed.

    Returns:
        str: A formatted string with the counts and percentage of trainable parameters.
    """
    trainable_model_params = 0
    all_model_params = 0

    for _, param in model.named_parameters():
        all_model_params += param.numel()  # Total number of parameters
        if param.requires_grad:  # Check if the parameter is trainable
            trainable_model_params += param.numel()

    percentage_trainable = 100 * trainable_model_params / all_model_params if all_model_params > 0 else 0

    return (f"Trainable model parameters: {trainable_model_params}\n"
            f"All model parameters: {all_model_params}\n"
            f"Percentage of trainable model parameters: {percentage_trainable:.2f}%")


In [None]:


# Load and prepare data
json_dir = 'data/jsons'
data_list = []


for filename in os.listdir(json_dir):
    if filename.endswith('.json'):
        with open(os.path.join(json_dir, filename), 'r') as file:
            data = json.load(file)
            # Change 'bias' to 'label' in each JSON object
            if 'bias' in data:
                data['labels'] = data.pop('bias')
            data_list.append(data)
# Convert to DataFrame
df = pd.DataFrame(data_list)

# Define column names
text_column = 'content_original'
label_column = 'bias_text'

# Map bias_text to numerical labels
label_map = {'left': 0, 'center': 1, 'right': 2}
df[label_column] = df[label_column].map(label_map)

# Split the dataset into training and testing sets
train_texts, test_texts, train_labels, test_labels = train_test_split(
    df['content_original'], df['labels'], test_size=0.15, stratify=df['labels'], random_state=42
)

# Combine the training texts and labels into a single DataFrame
train_df = pd.DataFrame({'content_original': train_texts, 'labels': train_labels})
test_df = pd.DataFrame({text_column: test_texts,  'labels': test_labels})

train_df.reset_index(drop=True, inplace=True)
test_df.reset_index(drop=True, inplace=True)
# Split the training set into training and validation sets
train_df, val_df = train_test_split(train_df, test_size=3/17, random_state=42)
val_df.reset_index(drop=True, inplace=True)
train_df.reset_index(drop=True, inplace=True)
# Display the shapes of the resulting DataFrames
print(f"Training set shape: {train_df.shape}")
print(f"Validation set shape: {val_df.shape}")
print(f"Test set shape:{test_df.shape}")

print(train_df.head())


In [None]:
from transformers import DistilBertForSequenceClassification, AutoTokenizer

# Load the DistilBERT model for sequence classification
model = DistilBertForSequenceClassification.from_pretrained('distilbert-base-cased', num_labels=3)

# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-cased")

# Print the number of trainable parameters in the original model
print(print_number_of_trainable_model_parameters(model))

from peft import LoraConfig, TaskType, get_peft_model

# Define the LoRA configuration
lora_config = LoraConfig(
    r=64,                # Rank Number
    lora_alpha=64,       # Alpha (Scaling Factor)
    lora_dropout=0.0,    # Dropout Probability for LoRA
    target_modules=["q_lin", "k_lin", "v_lin"],  # Target modules in the model to apply LoRA (usually MultiHead Attention Layers)
    bias='none',         # No bias in the LoRA layers
    task_type=TaskType.SEQ_CLS  # Sequence to Classification Task
)

# Apply the LoRA configuration to the model
peft_model = get_peft_model(model, lora_config)

# Print the number of trainable parameters in the LoRA-adapted model
print(print_number_of_trainable_model_parameters(peft_model))

In [None]:
# Tokenize function
def tokenize_func(data):
    return tokenizer(
        data[text_column],
        max_length=512,
        padding='max_length',
        return_attention_mask=True,
        truncation=True
    )
from datasets import Dataset
# Convert DataFrame to Dataset
# Convert DataFrames to Datasets
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)
test_dataset = Dataset.from_pandas(test_df)

# Tokenize the datasets
train_dataset = train_dataset.map(
    tokenize_func,
    batched=True,
    remove_columns=[text_column]
)
val_dataset = val_dataset.map(
    tokenize_func,
    batched=True,
    remove_columns=[text_column]
)
test_dataset = test_dataset.map(
    tokenize_func,
    batched=True,
    remove_columns=[text_column]
)




In [None]:
from transformers import AdamW, get_linear_schedule_with_warmup, DataCollatorWithPadding, Adafactor
import math
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

# Define evaluation metrics
def metrics(eval_prediction):
    logits, labels = eval_prediction
    pred = np.argmax(logits, axis=1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, pred, average='macro')
    accuracy = accuracy_score(labels, pred)
    return {"accuracy": accuracy, "f1": f1}

# Training parameters
train_batch_size = 32
eval_batch_size = 32
lr = 5e-4

# Define training arguments
peft_training_args = TrainingArguments(
    output_dir='./result-distilbert-lora',
    logging_dir='./logs-distilbert-lora',
    learning_rate=lr,
    per_device_train_batch_size=train_batch_size,  # Adjust based on GPU memory
    per_device_eval_batch_size=eval_batch_size,    # Adjust based on GPU memory
    num_train_epochs=5,
    logging_steps=500,
    evaluation_strategy='steps',
    eval_steps=500,
    weight_decay=0.01,
    seed=42,
    fp16=True,  # Only use with GPU
    report_to='none'
)

# Define optimizer
optimizer = AdamW(peft_model.parameters(), lr=lr)

# Optionally replace AdamW with Adafactor
# optimizer = Adafactor(
#     peft_model.parameters(),
#     lr=lr,
#     eps=(1e-30, 1e-3),
#     clip_threshold=1.0,
#     decay_rate=-0.8,
#     beta1=None,
#     weight_decay=0.0,
#     relative_step=False,
#     scale_parameter=False,
#     warmup_init=False,
# )

# Define scheduler
n_epochs = peft_training_args.num_train_epochs
total_steps = n_epochs * math.ceil(len(train_dataset) / train_batch_size)
lr_scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=0,
    num_training_steps=total_steps
)

# Data collator
collator = DataCollatorWithPadding(
    tokenizer=tokenizer,
    padding="longest"
)

# Define Trainer
peft_trainer = Trainer(
    model=peft_model,
    args=peft_training_args,
    train_dataset=train_dataset,  # Training Data
    eval_dataset=val_dataset,     # Evaluation Data
    tokenizer=tokenizer,
    compute_metrics=metrics,
    optimizers=(optimizer, lr_scheduler),
    data_collator=collator
)

print(f"Total Steps: {total_steps}")

# Path to save the fine-tuned model
peft_model_path = "./"

# Train the model
peft_trainer.train()

# Save the fine-tuned model and tokenizer
peft_trainer.model.save_pretrained(peft_model_path)
tokenizer.save_pretrained(peft_model_path)


In [None]:
# Evaluate the model on the test dataset
test_results = peft_trainer.evaluate(test_dataset)
print(f"Test Results: {test_results}")

# Use the predict method to get predictions and labels
predictions, labels, _ = peft_trainer.predict(test_dataset)

# Convert predictions to class labels
preds = np.argmax(predictions, axis=1)

# Calculate the confusion matrix
cm = confusion_matrix(labels, preds)

# Display the confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.show()
