In [2]:
import torch
import pandas as pd
import torch.nn as nn
import numpy as np
import torch.optim as optim
from tqdm.auto import tqdm
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, f1_score
from transformers import ElectraTokenizer, ElectraForSequenceClassification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

In [3]:
# Load the datasets
df = pd.read_csv('CNN_Articels_clean.csv')

In [4]:
df.drop(labels= ['Index', 'Headline', 'Author', 'Date published', 'Section',
                 'Url', 'Keywords', 'Second headline', 'Article text'], axis= 1, inplace= True)

In [6]:
texts = df['Description'].tolist()
labels = df['Category'].tolist()

In [7]:
# Split the data into train and test sets (80% train, 20% test)
train_texts, test_texts, train_labels, test_labels = train_test_split(texts, labels, test_size=0.2, random_state=42)

# Further split the test set into dev and test sets (50% dev, 50% test)
dev_texts, test_texts, dev_labels, test_labels = train_test_split(test_texts, test_labels, test_size=0.5, random_state=42)

In [8]:
# Load the pre-trained tokenizer and model
tokenizer = ElectraTokenizer.from_pretrained('google/electra-base-discriminator')

In [9]:
# Tokenize the input texts
tokenized_train_texts = tokenizer(train_texts, padding=True, truncation=True, return_tensors='pt')

tokenized_dev_texts = tokenizer(dev_texts, padding=True, truncation=True, return_tensors='pt')

tokenized_test_texts = tokenizer(test_texts, padding=True, truncation=True, return_tensors='pt')

In [10]:
# Initialize the LabelEncoder
encoder = LabelEncoder()

# Fit the encoder on the labels from all splits to ensure all classes are known
encoder.fit(train_labels + dev_labels + test_labels)

# Transform labels to integers
train_labels_int = encoder.transform(train_labels)

dev_labels_int = encoder.transform(dev_labels)

test_labels_int = encoder.transform(test_labels)

In [11]:
# Convert the integer labels to tensors
train_labels = torch.tensor(train_labels_int)

dev_labels = torch.tensor(dev_labels_int)

test_labels = torch.tensor(test_labels_int)

In [12]:
# Define hyperparameters
learning_rate = 1e-3
batch_size = 128
weight_decay = 1e-4
num_epochs = 100

In [13]:
# Create TensorDatasets for train, dev, and test sets
train_dataset = TensorDataset(tokenized_train_texts['input_ids'], tokenized_train_texts['attention_mask'], train_labels)

dev_dataset = TensorDataset(tokenized_dev_texts['input_ids'], tokenized_dev_texts['attention_mask'], dev_labels)

test_dataset = TensorDataset(tokenized_test_texts['input_ids'], tokenized_test_texts['attention_mask'], test_labels)


# Create DataLoaders for train, dev, and test sets
train_dataloader = DataLoader(train_dataset, batch_size= batch_size, shuffle= True)

dev_dataloader = DataLoader(dev_dataset, batch_size= batch_size, shuffle= False)

test_dataloader = DataLoader(test_dataset, batch_size= batch_size, shuffle= False)

In [14]:
# Load the pre-trained model for sequence classification
model = ElectraForSequenceClassification.from_pretrained('google/electra-base-discriminator', num_labels=6)

# Freeze parameters
for param in model.base_model.parameters():
    param.requires_grad = False

Some weights of ElectraForSequenceClassification were not initialized from the model checkpoint at google/electra-base-discriminator and are newly initialized: ['classifier.out_proj.bias', 'classifier.dense.weight', 'classifier.dense.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [15]:
# Define the optimizer for training the softmax layer
optimizer = optim.Adam(model.classifier.parameters(), lr= learning_rate, weight_decay= weight_decay)

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
model.to(device)

# Define the loss function
criterion = nn.CrossEntropyLoss()

In [16]:
num_training_steps = num_epochs * len(train_dataloader)
lr_scheduler = get_scheduler(
    "linear",
    optimizer= optimizer,
    num_warmup_steps= 0,
    num_training_steps= num_training_steps,
)

progress_bar = tqdm(range(num_training_steps))

  0%|          | 0/78 [00:00<?, ?it/s]

In [17]:
# Train the model
best_dev_accuracy = 0.0
best_model_state_dict = None
Validation_results= []

for epoch in range(num_epochs):
    # Training loop
    model.train()
    for batch in train_dataloader:
        input_ids, attention_mask, batch_labels = batch
        
        input_ids= input_ids.to(device)
        attention_mask= attention_mask.to(device)
        batch_labels= batch_labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(input_ids= input_ids, attention_mask= attention_mask, labels= batch_labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        #### to show progress_bar
        lr_scheduler.step()
        progress_bar.update(1)
        
        
    # Validation loop
    model.eval()
    dev_correct = 0
    total_dev = 0
    y_true = []
    y_pred = []
    loss_epoch= []
    with torch.no_grad():
        for batch in dev_dataloader:
            input_ids, attention_mask, batch_labels = batch
            
            input_ids= input_ids.to(device)
            attention_mask= attention_mask.to(device)
            batch_labels= batch_labels.to(device)
            
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            _, predicted = torch.max(logits, 1)

            # Append true labels and predicted labels for later use
            y_true.extend(batch_labels.tolist())
            y_pred.extend(predicted.tolist())
            
            # Calculate the loss
            loss = criterion(logits, batch_labels)
            loss_epoch.append(loss)
        
    # Calculate accuracy and F1 score
    f1 = f1_score(y_true, y_pred, average='weighted')
    accuracy = accuracy_score(y_true, y_pred)
    
    # Calculate the average loss
    loss_epoch_np = [tensor.cpu().detach().numpy() for tensor in loss_epoch]
    average_loss= np.mean(loss_epoch_np)
    print(f'epoch No. : {epoch}, Devset Accuracy : {round(accuracy,5)}, Devset f1_score : {round(f1,5)}, Average loss: {round(average_loss.tolist(),5)}')
    
    Validation_results.append([accuracy, f1, average_loss])
    
    if accuracy > best_dev_accuracy:
        best_dev_accuracy = accuracy
        # Save the best model (optional)
        best_model_state_dict = model.state_dict()

epoch No. : 0, Devset Accuracy : 0.71814, Devset f1_score : 0.68638, Average loss: 0.84379
epoch No. : 1, Devset Accuracy : 0.73529, Devset f1_score : 0.70395, Average loss: 0.80381
epoch No. : 2, Devset Accuracy : 0.73039, Devset f1_score : 0.69952, Average loss: 0.80934


In [19]:
# Saving Testset Results
data = {
    'Validation_results': Validation_results,
}
df = pd.DataFrame(data)
df.to_csv('Model-3_Validation_results.csv', index= False)

In [20]:
# Load the best model state dict
if best_model_state_dict is not None:
    model.load_state_dict(best_model_state_dict)
    
    # Define the directory path to save the model
    save_path = 'Model-3.pth'  

    # Save the model state dictionary and other relevant information
    torch.save({
        'model_state_dict': best_model_state_dict,
        'tokenizer': tokenizer  
    }, save_path)

In [21]:
# Evaluate on the test set
model.eval()
y_true_test = []
y_pred_test = []
loss_epoch= []

with torch.no_grad():
    for batch in test_dataloader:
        input_ids, attention_mask, batch_labels = batch
        
        input_ids= input_ids.to(device)
        attention_mask= attention_mask.to(device)
        batch_labels= batch_labels.to(device)
        
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        _, predicted = torch.max(logits, 1)

        # Append true labels and predicted labels for later use
        y_true_test.extend(batch_labels.tolist())
        y_pred_test.extend(predicted.tolist())
        
        # Calculate the loss
        loss = criterion(logits, batch_labels)
        loss_epoch.append(loss)

# Calculate accuracy and F1 score for the test set
test_accuracy = accuracy_score(y_true_test, y_pred_test)
test_f1 = f1_score(y_true_test, y_pred_test, average='weighted')

# Calculate the average loss
loss_epoch_np = [tensor.cpu().detach().numpy() for tensor in loss_epoch]
average_loss= np.mean(loss_epoch_np)

print(f"Testset accuracy: {round(test_accuracy,5)} , Testset F1 score: {round(test_f1,5)}, Average loss: {round(average_loss.tolist(),5)}")
Test_results= [test_accuracy, test_f1, average_loss]

Testset accuracy: 0.71569 , Testset F1 score: 0.68111, Average loss: 0.9027


In [23]:
# Saving Testset Results
data = {
    'Test_results': Test_results
}
df = pd.DataFrame(data)
df.to_csv('Model-3_Test_results.csv', index= False)