In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
import numpy as np
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import matplotlib.pyplot as plt

In [4]:
def load_and_preprocess_data(file_path):
    dataframe = pd.read_csv(file_path)

    texts = dataframe['content'].tolist()
    labels = dataframe['majority_voting'].tolist()
    original_labels = dataframe['label'].tolist()

    label_encoder = LabelEncoder()
    encoded_labels = label_encoder.fit_transform(labels)
    encoded_original_labels = label_encoder.fit_transform(original_labels)

    return texts, encoded_labels, encoded_original_labels

texts, labels, original_labels = load_and_preprocess_data('datasets/ISOT/merged/dataset_labeled.csv')

100%|██████████| 9/9 [00:01<00:00,  5.42it/s]


In [None]:
def extract_embeddings(texts):
    tokenizer = DistilBertTokenizer.from_pretrained(
        'distilbert-base-uncased-finetuned-sst-2-english'
    )
    model = DistilBertModel.from_pretrained(
        'distilbert-base-uncased-finetuned-sst-2-english'
    )

    encodings = tokenizer(
        texts,
        truncation=True,
        padding=True,
        return_tensors='pt'
    )

    with torch.no_grad():
        outputs = model(
            input_ids=encodings['input_ids'],
            attention_mask=encodings['attention_mask']
        )
        embeddings = outputs.last_hidden_state.numpy()


    return embeddings

In [None]:
embeddings = extract_embeddings(texts)

In [58]:
class CNNModel(nn.Module):
    def __init__(self, input_shape, num_classes):
        super(CNNModel, self).__init__()

        # Convolutional Layers
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(input_shape[0], 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(256)
        )

        self.conv_block2 = nn.Sequential(
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.conv_block3 = nn.Sequential(
            nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.conv_block4 = nn.Sequential(
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Determine the size of the flattened features
        self._initialize_feature_extractor(input_shape)

        # Fully Connected Layers
        self.fc_layers = nn.Sequential(
            nn.Linear(self.flattened_size, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.BatchNorm1d(256),
            nn.Linear(256, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def _initialize_feature_extractor(self, input_shape):
        # Create a dummy input to calculate the flattened feature size
        with torch.no_grad():
            x = torch.zeros(1, *input_shape)
            x = self.conv_block1(x)
            x = self.conv_block2(x)
            x = self.conv_block3(x)
            x = self.conv_block4(x)
            self.flattened_size = x.view(1, -1).size(1)

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.conv_block4(x)

        x = x.view(x.size(0), -1)  # Flatten
        x = self.fc_layers(x)

        return x

In [None]:
def cross_validate(X, y, num_classes, original_labels, k_folds=5, random_state=42, device=None):
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Ensure inputs are torch tensors
    X = X if isinstance(X, torch.Tensor) else torch.tensor(X, dtype=torch.float32)
    y = y if isinstance(y, torch.Tensor) else torch.tensor(y, dtype=torch.long)
    original_labels = original_labels if isinstance(original_labels, torch.Tensor) else torch.tensor(original_labels, dtype=torch.long)

    # Reshape embeddings for 2D convolution
    X_2d = X.reshape(X.shape[0], 1, X.shape[1], X.shape[2])

    skf = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=random_state)

    fold_metrics = {
        'accuracy': [],
        'precision': [],
        'recall': [],
        'f1_score': []
    }

    detailed_results = []

    for fold, (train_idx, val_idx) in enumerate(skf.split(X_2d.cpu().numpy(), y.cpu().numpy()), 1):
        print(f"\nFold {fold}")

        # Prepare train and validation data
        X_train, X_val = X_2d[train_idx].to(device), X_2d[val_idx].to(device)
        y_train, y_val = y[train_idx].to(device), original_labels[val_idx].to(device)

        # Create DataLoaders
        train_dataset = TensorDataset(X_train, y_train)
        val_dataset = TensorDataset(X_val, y_val)

        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

        # Initialize model
        model = CNNModel(
            input_shape=(1, X_train.shape[2], X_train.shape[3]),
            num_classes=num_classes
        ).to(device)
        # Loss and optimizer
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters())

        # Training loop
        for epoch in range(3):
            model.train()
            train_loss = 0.0
            for batch_X, batch_y in train_loader:
                optimizer.zero_grad()
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()

            print(f"Epoch {epoch+1}, Loss: {train_loss/len(train_loader):.4f}")

        # Validation
        model.eval()
        all_preds = []
        all_true = []

        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                outputs = model(batch_X)
                _, predicted = torch.max(outputs, 1)
                all_preds.extend(predicted.cpu().numpy())
                all_true.extend(batch_y.cpu().numpy())

        # Convert to numpy for sklearn metrics
        y_pred_classes = np.array(all_preds)
        y_val_np = np.array(all_true)

        # Calculate metrics
        accuracy = accuracy_score(y_val_np, y_pred_classes)
        precision = precision_score(y_val_np, y_pred_classes, average='weighted')
        recall = recall_score(y_val_np, y_pred_classes, average='weighted')
        f1 = f1_score(y_val_np, y_pred_classes, average='weighted')

        # Store metrics
        fold_metrics['accuracy'].append(accuracy)
        fold_metrics['precision'].append(precision)
        fold_metrics['recall'].append(recall)
        fold_metrics['f1_score'].append(f1)

        # Store detailed results
        detailed_results.append({
            'fold': fold,
            'y_true': y_val_np,
            'y_pred': y_pred_classes,
            'metrics': {
                'accuracy': accuracy,
                'precision': precision,
                'recall': recall,
                'f1_score': f1
            }
        })

        # Print classification report
        print(classification_report(y_val_np, y_pred_classes))

    # Print cross-validation summary
    print("\nCross-Validation Summary:")
    for metric, values in fold_metrics.items():
        print(f"{metric.capitalize()} - Mean: {np.mean(values):.4f}, Std: {np.std(values):.4f}")

    return fold_metrics, detailed_results


num_classes = len(np.unique(labels))
metrics, results = cross_validate(embeddings, labels, num_classes, original_labels)

In [None]:
def plot_cross_validation_results(metrics):
    plt.figure(figsize=(12, 6))

    for i, (metric, values) in enumerate(metrics.items(), 1):
        plt.subplot(2, 2, i)

        # Plot bars for each fold
        bars = plt.bar(range(1, len(values) + 1), values)

        # Calculate and plot mean line
        mean_value = np.mean(values)
        plt.axhline(y=mean_value, color='r', linestyle='--', alpha=0.8)

        # Add mean value text annotation
        plt.text(len(values)/2, mean_value, f'Mean: {mean_value:.3f}',
                 horizontalalignment='center', verticalalignment='bottom',
                 color='r', bbox=dict(facecolor='white', alpha=0.8))

        plt.title(f"{metric.capitalize()} per Fold")
        plt.xlabel("Fold")
        plt.ylabel(metric.capitalize())

        # Add individual values on top of each bar
        for bar in bars:
            height = bar.get_height()
            plt.text(bar.get_x() + bar.get_width()/2., height,
                     f'{height:.3f}',
                     ha='center', va='bottom')

    plt.tight_layout()
    plt.show()

plot_cross_validation_results(metrics)