# 1. DATA PREPROCESSING
This function performs a thorough cleaning and preprocessing of the dataset, essential for consistent and robust model training.

In [None]:
# Data manipulation and Spark
from pyspark.sql import SparkSession, DataFrame, functions as F

# Machine Learning and metrics
from sklearn.metrics import confusion_matrix, accuracy_score, precision_recall_fscore_support, f1_score
from sklearn.model_selection import KFold

# NLP and text processing
import spacy
from bs4 import BeautifulSoup
import re

# Deep Learning and model handling
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from transformers import AdamW, RobertaTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments

# Hyperparameter tuning
import optuna

# Utilities and typing
import numpy as np
from typing import List, Dict, Tuple, Any
from collections import Counter
import json

In [None]:
def load_dataset(file_path: str) -> DataFrame:
    """
    Loads the dataset from a specified file path in Parquet format.

    Parameters:
    - file_path (str): Path to the dataset file.

    Returns:
    - DataFrame: Loaded dataset as a Spark DataFrame.
    """
    # Initialize a Spark session
    spark = SparkSession.builder \
        .appName("NewsClassification") \
        .getOrCreate()

    # Load the Parquet file into a Spark DataFrame
    dataset = spark.read.parquet(file_path)
    
    # Show basic information for verification
    dataset.printSchema()
    print(f"Loaded dataset with {dataset.count()} records.")
    
    return dataset

Clean Categories - This step removes special characters, trims whitespaces, and standardizes the format to handle multi-label cases by splitting with a delimiter.

In [None]:

def clean_categories(categories: List[str]) -> List[str]:
    """
    Cleans and standardizes a list of categories, handling delimiters and removing
    special characters. Ensures all categories are formatted consistently.
    
    Parameters:
    - categories (List[str]): List of category strings to clean.
    
    Returns:
    - List[str]: List of cleaned and standardized categories.
    """
    
    cleaned_categories = []
    
    for category in categories:
        # Step 1: Remove special characters and standardize formatting
        clean_category = re.sub(r'[^\w\s]', '', category)  # Remove special characters
        clean_category = clean_category.lower().strip()    # Convert to lowercase and trim whitespace
        
        # Step 2: Handle delimiters within category names
        if ',' in clean_category or ';' in clean_category or '/' in clean_category:
            # Split on common delimiters (comma, semicolon, or slash)
            subcategories = re.split(r'[,/;]', clean_category)
            for subcategory in subcategories:
                cleaned_subcategory = subcategory.strip()
                if cleaned_subcategory:  # Avoid appending empty strings
                    cleaned_categories.append(cleaned_subcategory)
        else:
            # If no delimiter, add the cleaned category as is
            cleaned_categories.append(clean_category)
    
    return cleaned_categories

Standardize Categories - Once cleaned, categories are filtered based on frequency and standardized by mapping synonyms to a unified format. This improves model generalization by reducing redundancy in labels.

In [None]:
# Load Spacy model ( en_core_web_sm, en_core_web_md  , en_core_web_lg  , en_core_web_trf )
nlp = spacy.load("en_core_web_trf")

def standardize_categories(cleaned_dataset: DataFrame, threshold_percentage: float = 0.005) -> DataFrame:
    """
    Standardizes categories in the dataset based on a percentage frequency threshold and Spacy
    category mappings. This involves filtering out infrequent categories and using Spacy for
    synonym and abbreviation normalization.
    
    Parameters:
    - cleaned_dataset (DataFrame): Spark DataFrame with a cleaned categories column.
    - threshold_percentage (float): Percentage threshold for category frequency (e.g., 0.005 for 0.5%).
    
    Returns:
    - DataFrame: Updated DataFrame with standardized categories.
    """

    # Step 1: Calculate minimum frequency based on percentage of total records
    total_records = cleaned_dataset.count()
    min_frequency = int(total_records * threshold_percentage)

    # Step 2: Define Spacy-based category mapping function
    def map_category(category: str) -> str:
        """
        Uses Spacy's NLP pipeline to find a standardized label for the category by analyzing
        synonymy and similarity, returning a normalized label.
        
        Parameters:
        - category (str): Category name to standardize.
        
        Returns:
        - str: Standardized category name.
        """
        doc = nlp(category)
        # Extract the root lemma of the category for standardization
        root_lemma = doc[0].lemma_ if doc else category
        return root_lemma

    # Register map_category as a UDF for Spark to apply it on the DataFrame
    map_category_udf = F.udf(map_category)

    # Apply the Spacy-based mapping UDF to standardize category names
    standardized_dataset = cleaned_dataset.withColumn(
        "standardized_category", map_category_udf(F.col("categories"))
    )

    # Step 3: Calculate category frequencies to filter out infrequent categories
    category_counts = standardized_dataset.groupBy("standardized_category").count()

    # Filter categories based on the calculated minimum frequency
    frequent_categories = category_counts.filter(F.col("count") >= min_frequency).select("standardized_category")

    # Step 4: Join with the original dataset to keep only frequent categories
    standardized_dataset = standardized_dataset.join(
        frequent_categories, 
        on="standardized_category", 
        how="inner"
    )

    # Drop the original categories column if no longer needed
    standardized_dataset = standardized_dataset.drop("categories")

    return standardized_dataset


Text Cleaning - Removes unwanted elements like special characters and extra spaces, enhancing text uniformity.

In [None]:
def clean_text(text: str) -> str:
    """
    Cleans a given text by removing special characters, HTML tags, converting to 
    lowercase, and removing extra spaces.
    
    Parameters:
    - text (str): The text to clean.
    
    Returns:
    - str: Cleaned text.
    """
    # Step 1: Remove HTML tags
    text = BeautifulSoup(text, "html.parser").get_text()
    
    # Step 2: Remove special characters (keep alphanumeric and spaces only)
    text = re.sub(r'[^A-Za-z0-9\s]', '', text)
    
    # Step 3: Convert text to lowercase
    text = text.lower()
    
    # Step 4: Remove extra spaces
    text = re.sub(r'\s+', ' ', text).strip()
    
    return text


In [None]:
def print_category_distribution(categories: List[str]) -> None:
    """
    Prints the distribution of categories for analysis and verification.
    
    Parameters:
    - categories (List[str]): List of categories to analyze.
    
    Returns:
    - None
    """
    # Step 1: Calculate category frequency using Counter
    category_counts = Counter(categories)
    
    # Step 2: Calculate total number of categories
    total_count = sum(category_counts.values())
    
    # Step 3: Display category distribution as percentages
    print("Category Distribution:")
    for category, count in category_counts.items():
        percentage = (count / total_count) * 100
        print(f"{category}: {count} ({percentage:.2f}%)")


In [None]:
def preprocess_data(dataset: DataFrame) -> DataFrame:
    """
    Main procedure for data preprocessing, including cleaning text, categories,
    combining fields, and standardizing categories. 
    
    Parameters:
    - dataset (DataFrame): Spark DataFrame containing raw data.
    
    Returns:
    - DataFrame: Processed Spark DataFrame with cleaned and standardized columns.
    """
    
    # Step 1: Clean the categories column using clean_categories function
    # Assuming the column name for categories is 'categories' in the dataset
    categories_col = dataset.select("categories").rdd.flatMap(lambda x: x).collect()
    cleaned_categories = clean_categories(categories_col)
    dataset = dataset.withColumn("categories", F.array(*[F.lit(cat) for cat in cleaned_categories]))

    # Step 2: Standardize categories based on frequency threshold
    standardized_dataset = standardize_categories(dataset)

    # Step 3: Clean the text content in 'plain_text' column
    # Assuming the column name for the main article content is 'plain_text'
    clean_text_udf = F.udf(clean_text, StringType())
    standardized_dataset = standardized_dataset.withColumn("plain_text", clean_text_udf(F.col("plain_text")))

    # Step 4: Clean the title in 'title' column
    # Assuming the column name for title is 'title'
    clean_title_udf = F.udf(clean_text, StringType())  # Reuse clean_text function for title
    standardized_dataset = standardized_dataset.withColumn("title", clean_title_udf(F.col("title")))

    # Step 5: Concatenate title and text into a new column 'full_text' without using UDF
    separator = " [SEP] "
    standardized_dataset = standardized_dataset.withColumn(
        "full_text",
        F.concat_ws(separator, F.col("title"), F.col("plain_text"))
    )

    # Step 6: Remove duplicates from the dataset directly without a function call
    standardized_dataset = standardized_dataset.dropDuplicates()

    # Step 7: Print category distribution for analysis
    # Assuming we have a list of categories from the cleaned 'categories' column
    unique_categories = standardized_dataset.select("categories").rdd.flatMap(lambda x: x).collect()
    print_category_distribution(unique_categories)

    # Step 8: Drop unnecessary columns if not needed
    # Drop 'title' and 'plain_text' if 'full_text' will be used for model training
    standardized_dataset = standardized_dataset.drop("title", "plain_text")

    return standardized_dataset


# 2.TEXT COMPOSITION VALIDATION
This function identifies the best text structure for model input, crucial for maximizing classification accuracy.

Validation Set Split - A subset of data is allocated to evaluate different text composition strategies.

In [None]:
def split_validation_set(dataset: DataFrame, size: float) -> DataFrame:
    """
    Splits the dataset into a small validation set based on the specified size.
    
    Parameters:
    - dataset (DataFrame): Spark DataFrame containing the dataset.
    - size (float): Proportion of the dataset to use as the validation set (e.g., 0.1 for 10%).
    
    Returns:
    - DataFrame: Spark DataFrame containing the validation subset.
    """
    # Validate the size parameter to ensure it is within a proper range
    if not 0 < size < 1:
        raise ValueError("Size parameter must be between 0 and 1 (exclusive)")

    # Step 1: Split the dataset into training and validation sets based on the specified size
    train_set, validation_set = dataset.randomSplit([1 - size, size], seed=42)
    
    # Return only the validation subset
    return validation_set


In [None]:

# Initialize RoBERTa tokenizer and model for multi-class classification
tokenizer = RobertaTokenizer.from_pretrained("roberta-base")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def compute_metrics(pred) -> dict:
    """
    Computes accuracy and F1 score for model evaluation.
    
    Parameters:
    - pred: Predictions and labels from the model.
    
    Returns:
    - dict: Dictionary containing accuracy and F1 score.
    """
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    acc = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average="weighted")
    return {"accuracy": acc, "f1": f1}

def train_small_model(text_column: str) -> float:
    """
    Trains a small RoBERTa model on the specified text composition strategy and 
    returns the validation performance metric (F1 score).
    
    Parameters:
    - text_column (str): The name of the column in the DataFrame containing the text
                         composition strategy to evaluate.
    
    Returns:
    - float: The F1 score on the validation set.
    """
    # Step 1: Tokenize the text data in the column
    texts = validation_set.select(text_column).rdd.flatMap(lambda x: x).collect()
    labels = validation_set.select("label").rdd.flatMap(lambda x: x).collect()
    
    encodings = tokenizer(texts, truncation=True, padding=True, max_length=512)
    encodings = {key: torch.tensor(val) for key, val in encodings.items()}
    labels = torch.tensor(labels)

    # Step 2: Create PyTorch Dataset
    class TextDataset(torch.utils.data.Dataset):
        def __init__(self, encodings, labels):
            self.encodings = encodings
            self.labels = labels
            
        def __getitem__(self, idx):
            item = {key: val[idx] for key, val in self.encodings.items()}
            item["labels"] = self.labels[idx]
            return item
        
        def __len__(self):
            return len(self.labels)
    
    validation_dataset = TextDataset(encodings, labels)
    
    # Step 3: Initialize model for classification with the appropriate number of classes
    num_labels = len(set(labels.tolist()))
    model = RobertaForSequenceClassification.from_pretrained("roberta-base", num_labels=num_labels)
    model.to(device)

    # Step 4: Set up Trainer and Training Arguments
    training_args = TrainingArguments(
        output_dir="./results",
        evaluation_strategy="epoch",
        logging_dir="./logs",
        per_device_train_batch_size=8,
        per_device_eval_batch_size=8,
        num_train_epochs=1,
        seed=42,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        eval_dataset=validation_dataset,
        compute_metrics=compute_metrics
    )

    # Step 5: Perform evaluation to get the F1 score
    eval_result = trainer.evaluate()
    f1_score = eval_result["eval_f1"]

    return f1_score


In [None]:
def compare_strategies(validation_set: DataFrame) -> Dict[str, float]:
    """
    Compares various text composition strategies by training small models and evaluates
    their performance on the validation set.
    
    Parameters:
    - validation_set (DataFrame): Spark DataFrame used for model validation.
    
    Returns:
    - Dict[str, float]: A dictionary with strategy names as keys and their respective
                        performance metrics as values.
    """
    # Dictionary to store results for each strategy
    strategies_results = {}

    # Strategy 1: Only text
    validation_set = validation_set.withColumnRenamed("plain_text", "text_only")
    strategy1_f1 = train_small_model("text_only")
    strategies_results["Only Text"] = strategy1_f1

    # Strategy 2: Only title
    validation_set = validation_set.withColumnRenamed("title", "title_only")
    strategy2_f1 = train_small_model("title_only")
    strategies_results["Only Title"] = strategy2_f1

    # Strategy 3: Title + text
    validation_set = validation_set.withColumn("title_text", F.concat(F.col("title"), F.lit(" "), F.col("plain_text")))
    strategy3_f1 = train_small_model("title_text")
    strategies_results["Title + Text"] = strategy3_f1

    # Strategy 4: Title + [SEP] + text (using pre-existing 'full_text' column)
    strategy4_f1 = train_small_model("full_text")
    strategies_results["Title + [SEP] + Text"] = strategy4_f1

    # Strategy 5: Publisher + Title + Text
    validation_set = validation_set.withColumn("publisher_title_text", F.concat(F.col("publisher"), F.lit(" [SEP] "), F.col("title"), F.lit(" [SEP] "), F.col("plain_text")))
    strategy5_f1 = train_small_model("publisher_title_text")
    strategies_results["Publisher + Title + Text"] = strategy5_f1

    # Strategy 6: Title + Text + Publisher
    validation_set = validation_set.withColumn("title_text_publisher", F.concat(F.col("title"), F.lit(" [SEP] "), F.col("plain_text"), F.lit(" [SEP] "), F.col("publisher")))
    strategy6_f1 = train_small_model("title_text_publisher")
    strategies_results["Title + Text + Publisher"] = strategy6_f1

    # Ablation study: Removing title
    validation_set = validation_set.withColumnRenamed("plain_text", "text_without_title")
    ablation1_f1 = train_small_model("text_without_title")
    strategies_results["Without Title"] = ablation1_f1

    # Ablation study: Removing publisher
    validation_set = validation_set.withColumn("text_without_publisher", F.concat(F.col("title"), F.lit(" [SEP] "), F.col("plain_text")))
    ablation2_f1 = train_small_model("text_without_publisher")
    strategies_results["Without Publisher"] = ablation2_f1

    # Ablation study: Without title and publisher
    validation_set = validation_set.withColumn("text_without_title_publisher", F.col("plain_text"))
    ablation3_f1 = train_small_model("text_without_title_publisher")
    strategies_results["Without Title & Publisher"] = ablation3_f1

    return strategies_results


Select Best Strategy - Determines the optimal composition to improve model interpretability and relevance.

In [None]:
def select_best_strategy(strategies_results: Dict[str, float]) -> str:
    """
    Selects the best performing text composition strategy based on the results from 
    the validation phase.
    
    Parameters:
    - strategies_results (Dict[str, float]): A dictionary with strategy names as keys
                                             and performance metrics (F1 scores) as values.
    
    Returns:
    - str: The name of the best performing strategy.
    """
    # Validate input dictionary to ensure it contains strategies and metrics
    if not strategies_results:
        raise ValueError("The strategies_results dictionary is empty. Ensure it has valid entries.")
    
    # Find the strategy with the highest F1 score
    best_strategy = max(strategies_results, key=strategies_results.get)
    best_score = strategies_results[best_strategy]

    # Log or print the best strategy and its score
    print(f"Best Strategy: '{best_strategy}' with an F1 score of {best_score:.4f}")

    return best_strategy


In [None]:
def validate_text_composition(dataset: DataFrame, validation_split: float = 0.1) -> str:
    """
    Validates and selects the best text composition strategy by training small models
    on various text composition strategies with a validation set.
    
    Parameters:
    - dataset (DataFrame): Spark DataFrame containing the dataset.
    - validation_split (float): The proportion of data to use for validation. Default is 0.1 (10%).
    
    Returns:
    - str: The best performing strategy based on validation metrics.
    """
    # Step 1: Split the dataset into a validation set
    validation_set = split_validation_set(dataset, validation_split)

    # Step 2: Compare different text composition strategies
    strategies_results = compare_strategies(validation_set)

    # Step 3: Select the best performing strategy
    best_strategy = select_best_strategy(strategies_results)

    # Log or print the selected strategy for confirmation
    print(f"Best text composition strategy selected: {best_strategy}")

    return best_strategy


# 3. Data Preparation
This step finalizes the dataset for training by encoding categories, tokenizing texts, and splitting into training, validation, and test sets.

Apply Best Composition Strategy - Incorporates the selected composition strategy across the dataset for consistency.

In [None]:
def apply_best_text_composition(dataset: DataFrame, strategy: str) -> DataFrame:
    """
    Applies the specified text composition strategy to the full dataset by transforming
    the text columns based on the chosen approach.
    
    Parameters:
    - dataset (DataFrame): Spark DataFrame containing the dataset to transform.
    - strategy (str): Text composition strategy to apply, as determined by `validate_text_composition`.
    
    Returns:
    - DataFrame: Spark DataFrame with transformed text composition in the 'best_text' column.
    """
    
    # Define the separator token for composition
    separator = " [SEP] "

    # Use the provided strategy to determine text composition
    if strategy == "Only Text":
        # Use only the 'plain_text' column
        transformed_dataset = dataset.withColumn("best_text", F.col("plain_text"))
    
    elif strategy == "Only Title":
        # Use only the 'title' column
        transformed_dataset = dataset.withColumn("best_text", F.col("title"))
    
    elif strategy == "Title + Text":
        # Concatenate 'title' and 'plain_text' without separator
        transformed_dataset = dataset.withColumn("best_text", F.concat(F.col("title"), F.lit(" "), F.col("plain_text")))
    
    elif strategy == "Title + [SEP] + Text":
        # Concatenate 'title' and 'plain_text' with "[SEP]" separator
        transformed_dataset = dataset.withColumn("best_text", F.concat(F.col("title"), F.lit(separator), F.col("plain_text")))
    
    elif strategy == "Publisher + Title + Text":
        # Concatenate 'publisher', 'title', and 'plain_text' with "[SEP]" separator
        transformed_dataset = dataset.withColumn(
            "best_text",
            F.concat(F.col("publisher"), F.lit(separator), F.col("title"), F.lit(separator), F.col("plain_text"))
        )
    
    elif strategy == "Title + Text + Publisher":
        # Concatenate 'title', 'plain_text', and 'publisher' with "[SEP]" separator
        transformed_dataset = dataset.withColumn(
            "best_text",
            F.concat(F.col("title"), F.lit(separator), F.col("plain_text"), F.lit(separator), F.col("publisher"))
        )
    
    elif strategy == "Without Title":
        # Use only 'plain_text', omitting 'title'
        transformed_dataset = dataset.withColumn("best_text", F.col("plain_text"))
    
    elif strategy == "Without Publisher":
        # Concatenate 'title' and 'plain_text' with "[SEP]" separator, omitting 'publisher'
        transformed_dataset = dataset.withColumn("best_text", F.concat(F.col("title"), F.lit(separator), F.col("plain_text")))
    
    elif strategy == "Without Title & Publisher":
        # Use only 'plain_text', omitting both 'title' and 'publisher'
        transformed_dataset = dataset.withColumn("best_text", F.col("plain_text"))
    
    else:
        raise ValueError(f"Unknown strategy: {strategy}. Please provide a valid text composition strategy.")
    
    return transformed_dataset


Encode Categories - Converts categorical labels into numerical format for model compatibility.

In [None]:
def encode_categories_labels(categories: List[str]) -> Tuple[List[int], Dict[str, int]]:
    """
    Encodes categories into numerical labels and creates a mapping from category names to 
    integer labels.
    
    Parameters:
    - categories (List[str]): List of category names to encode.
    
    Returns:
    - Tuple[List[int], Dict[str, int]]: Encoded category labels and a dictionary mapping 
      category names to integer labels.
    """
    
    # Step 1: Get unique categories and create a mapping from category names to integer labels
    unique_categories = sorted(set(categories))
    category_mapping = {category: idx for idx, category in enumerate(unique_categories)}

    # Step 2: Encode categories using the created mapping
    encoded_labels = [category_mapping[category] for category in categories]

    return encoded_labels, category_mapping

Split Data - Segregates data into training, validation, and test sets, ensuring balanced distribution across categories.

In [None]:
def split_data(dataset: DataFrame, ratios: List[float] = [0.7, 0.15, 0.15], stratify_column: str = "categories") -> Tuple[DataFrame, DataFrame, DataFrame]:
    """
    Splits the dataset into training, validation, and test sets according to specified ratios.
    
    Parameters:
    - dataset (DataFrame): Spark DataFrame to split.
    - ratios (List[float]): List of ratios for splitting the dataset. Default is [0.7, 0.15, 0.15].
    - stratify_column (str): Column name to use for stratification, default is "categories".
    
    Returns:
    - Tuple[DataFrame, DataFrame, DataFrame]: A tuple of DataFrames for training, validation, and test sets.
    """

    # Validate ratios to ensure they sum up to 1
    if sum(ratios) != 1.0:
        raise ValueError("Ratios must sum to 1.0")

    # Convert ratios to percentages for stratified splitting
    train_ratio, val_ratio, test_ratio = ratios

    # Step 1: Add a stratification index column if stratification is required
    if stratify_column:
        # Generate a unique row ID to handle exact stratification with approximate splits
        dataset = dataset.withColumn("stratify_id", F.monotonically_increasing_id())
        
        # Calculate row numbers within each stratification group
        stratified_dataset = dataset.withColumn(
            "stratify_row_num", 
            F.row_number().over(Window.partitionBy(stratify_column).orderBy("stratify_id"))
        )
    else:
        stratified_dataset = dataset.withColumn("stratify_row_num", F.monotonically_increasing_id())

    # Step 2: Calculate total count for stratification-based splitting
    total_rows = stratified_dataset.count()
    train_count = int(total_rows * train_ratio)
    val_count = int(total_rows * val_ratio)

    # Step 3: Split dataset based on stratified row number ranges
    train_set = stratified_dataset.filter(F.col("stratify_row_num") <= train_count).drop("stratify_row_num", "stratify_id")
    val_set = stratified_dataset.filter((F.col("stratify_row_num") > train_count) & (F.col("stratify_row_num") <= train_count + val_count)).drop("stratify_row_num", "stratify_id")
    test_set = stratified_dataset.filter(F.col("stratify_row_num") > train_count + val_count).drop("stratify_row_num", "stratify_id")

    return train_set, val_set, test_set


Tokenization - Tokenizes each text for compatibility with language models like RoBERTa.

In [None]:
def create_tokenized_dataset(texts: List[str], labels: List[int]) -> List[Tuple[List[int], int]]:
    """
    Tokenizes the input texts and pairs each tokenized text with its corresponding label.
    
    Parameters:
    - texts (List[str]): List of texts to tokenize.
    - labels (List[int]): List of labels corresponding to each text.
    
    Returns:
    - List[Tuple[List[int], int]]: A list of tuples where each tuple contains a list of tokenized
      IDs for a text and its corresponding label.
    """
    tokenized_data = []

    # Ensure texts and labels are of the same length
    if len(texts) != len(labels):
        raise ValueError("The length of texts and labels must be the same.")

    # Tokenize each text and pair it with the corresponding label
    for text, label in zip(texts, labels):
        # Tokenize text and convert to token IDs
        encoding = tokenizer(
            text,
            truncation=True,
            padding="max_length",
            max_length=512,
            return_tensors="pt"  # Return PyTorch tensors
        )

        # Extract token IDs as a list and store with label
        token_ids = encoding["input_ids"].squeeze().tolist()  # Convert tensor to list
        tokenized_data.append((token_ids, label))

    return tokenized_data


In [None]:
def prepare_data(cleaned_dataset: DataFrame, best_strategy: str) -> Tuple[List[Tuple[List[int], int]], List[Tuple[List[int], int]], List[Tuple[List[int], int]], Dict[str, int]]:
    """
    Prepares the dataset for model training by applying the best text composition strategy, 
    removing duplicates, encoding categories, splitting the data, and tokenizing it.
    
    Parameters:
    - cleaned_dataset (DataFrame): Spark DataFrame with cleaned and standardized data.
    - best_strategy (str): The best-performing text composition strategy determined through validation.
    
    Returns:
    - Tuple[List[Tuple[List[int], int]], List[Tuple[List[int], int]], List[Tuple[List[int], int]], Dict[str, int]]:
      A tuple containing the tokenized training, validation, and test datasets, along with the 
      category mapping dictionary.
    """

    # Step 1: Apply the best text composition strategy to the dataset
    dataset_with_best_strategy = apply_best_text_composition(cleaned_dataset, best_strategy)

    # Step 2: Extract and clean the categories column for encoding
    categories_col = dataset_with_best_strategy.select("categories").rdd.flatMap(lambda x: x).collect()
    encoded_labels, category_mapping = encode_categories_labels(categories_col)

    # Add encoded labels as a new column to the dataset
    dataset_with_best_strategy = dataset_with_best_strategy.withColumn("label", F.array(*[F.lit(label) for label in encoded_labels]))

    # Step 3: Split the dataset into training, validation, and test sets
    train_set, val_set, test_set = split_data(dataset_with_best_strategy, ratios=[0.7, 0.15, 0.15], stratify_column="categories")

    # Step 4: Collect text and labels for each split
    train_texts = train_set.select("best_text").rdd.flatMap(lambda x: x).collect()
    val_texts = val_set.select("best_text").rdd.flatMap(lambda x: x).collect()
    test_texts = test_set.select("best_text").rdd.flatMap(lambda x: x).collect()

    train_labels = train_set.select("label").rdd.flatMap(lambda x: x).collect()
    val_labels = val_set.select("label").rdd.flatMap(lambda x: x).collect()
    test_labels = test_set.select("label").rdd.flatMap(lambda x: x).collect()

    # Step 5: Tokenize texts for each dataset split
    train_dataset = create_tokenized_dataset(train_texts, train_labels)
    val_dataset = create_tokenized_dataset(val_texts, val_labels)
    test_dataset = create_tokenized_dataset(test_texts, test_labels)

    return train_dataset, val_dataset, test_dataset, category_mapping


# 4. MODEL INITIALIZATION
Loads a pre-trained RoBERTa model with a classification layer, tailored for multi-class classification.

In [None]:
def initialize_model(num_categories: int) -> nn.Module:
    """
    Initializes a RoBERTa model with a classification head for multi-class classification.
    
    Parameters:
    - num_categories (int): The number of unique categories for classification.
    
    Returns:
    - nn.Module: A PyTorch model with a RoBERTa backbone and a classification head.
    """
    
    # Load the pre-trained RoBERTa model with a classification head
    model = RobertaForSequenceClassification.from_pretrained(
        "roberta-large",
        num_labels=num_categories  # Set the number of output labels for classification
    )
    
    return model

## 4.1 Hyperparameter Tuning
Fine-tunes model hyperparameters using Bayesian Optimization via Optuna to achieve optimal training configuration.

In [None]:
def tune_hyperparameters(train_dataset: Dataset, validation_dataset: Dataset, num_categories: int) -> Dict[str, Any]:
    """
    Tunes hyperparameters for the RoBERTa model using Bayesian Optimization with Optuna.
    
    Parameters:
    - train_dataset (Dataset): The training dataset to use for hyperparameter tuning.
    - validation_dataset (Dataset): The validation dataset to use for evaluating performance.
    - num_categories (int): The number of unique categories for classification.
    
    Returns:
    - Dict[str, Any]: A dictionary containing the best hyperparameters found during tuning.
    """

    def model_init() -> nn.Module:
        """
        Initializes the RoBERTa model with a classification head based on the number of categories.
        
        Returns:
        - nn.Module: A PyTorch model instance ready for training.
        """
        return RobertaForSequenceClassification.from_pretrained("roberta-large", num_labels=num_categories)

    def objective(trial) -> float:
        """
        Objective function for Bayesian Optimization, which defines the search space and evaluates 
        each hyperparameter combination.
        
        Parameters:
        - trial: An Optuna trial object that suggests hyperparameter values.
        
        Returns:
        - float: Validation loss for the current trial’s hyperparameters.
        """
        
        # Define the hyperparameter search space
        learning_rate = trial.suggest_float("learning_rate", 1e-5, 5e-5, log=True)
        batch_size = trial.suggest_categorical("batch_size", [16, 32])
        epochs = trial.suggest_int("epochs", 3, 10)
        warmup_steps = trial.suggest_int("warmup_steps", 0, 1000)
        dropout_rate = trial.suggest_float("dropout_rate", 0.1, 0.5)
        
        # Set up training arguments with current trial’s hyperparameters
        training_args = TrainingArguments(
            output_dir="./results",
            evaluation_strategy="epoch",
            learning_rate=learning_rate,
            per_device_train_batch_size=batch_size,
            num_train_epochs=epochs,
            warmup_steps=warmup_steps,
            weight_decay=dropout_rate,
            logging_dir='./logs',
            logging_steps=10
        )

        # Initialize the Trainer with current hyperparameters
        trainer = Trainer(
            model_init=model_init,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=validation_dataset
        )

        # Run training and return the validation loss for evaluation
        result = trainer.evaluate()
        return result["eval_loss"]

    # Create an Optuna study to perform Bayesian Optimization
    study = optuna.create_study(direction="minimize")
    study.optimize(objective, n_trials=20)  # Run for a fixed number of trials (20 here)

    # Extract the best hyperparameters
    best_hyperparameters = study.best_params
    print("Best Hyperparameters:", best_hyperparameters)

    return best_hyperparameters

# 5. TRAINING MODEL
Conducts the model training, including early stopping to prevent overfitting and achieve efficient convergence.

In [None]:
def train_model(
    model: nn.Module,
    train_dataset: List[Tuple[List[int], int]],
    validation_dataset: List[Tuple[List[int], int]],
    max_epochs: int,
    batch_size: int,
    learning_rate: float,
    patience: int = 3
) -> nn.Module:
    """
    Trains the RoBERTa model on the provided training dataset and validates on the validation dataset,
    implementing early stopping with checkpointing based on validation loss.
    
    Parameters:
    - model (nn.Module): The PyTorch model to be trained.
    - train_dataset (List[Tuple[List[int], int]]): Tokenized training dataset with labels.
    - validation_dataset (List[Tuple[List[int], int]]): Tokenized validation dataset with labels.
    - max_epochs (int): Maximum number of training epochs.
    - batch_size (int): Batch size for training.
    - learning_rate (float): Learning rate for optimization.
    - patience (int): Patience for early stopping. Default is 3.
    
    Returns:
    - nn.Module: The trained PyTorch model.
    """
    
    # Prepare DataLoader for training and validation datasets
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(validation_dataset, batch_size=batch_size)
    
    # Initialize optimizer
    optimizer = AdamW(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()

    # Training and early stopping parameters
    best_val_loss = float('inf')
    patience_counter = 0
    
    # Move model to device (GPU if available)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    for epoch in range(max_epochs):
        model.train()
        train_loss = 0.0
        train_correct = 0
        total_train = 0

        # Training phase
        for batch in train_loader:
            input_ids = batch[0].to(device)
            labels = batch[1].to(device)

            # Forward pass
            outputs = model(input_ids=input_ids, labels=labels)
            loss = outputs.loss
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            predictions = outputs.logits.argmax(dim=1)
            train_correct += (predictions == labels).sum().item()
            total_train += labels.size(0)

        avg_train_loss = train_loss / len(train_loader)
        train_accuracy = train_correct / total_train
        print(f"Epoch {epoch + 1}/{max_epochs}, Training Loss: {avg_train_loss:.4f}, Training Accuracy: {train_accuracy:.4f}")

        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        total_val = 0

        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch[0].to(device)
                labels = batch[1].to(device)

                outputs = model(input_ids=input_ids, labels=labels)
                loss = outputs.loss
                val_loss += loss.item()
                
                predictions = outputs.logits.argmax(dim=1)
                val_correct += (predictions == labels).sum().item()
                total_val += labels.size(0)
        
        avg_val_loss = val_loss / len(val_loader)
        val_accuracy = val_correct / total_val
        print(f"Epoch {epoch + 1}/{max_epochs}, Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")

        # Checkpointing and early stopping check
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            patience_counter = 0
            torch.save(model.state_dict(), "best_model.pt")  # Save the best model checkpoint
            print("Validation loss improved, saving model checkpoint.")
        else:
            patience_counter += 1
            print(f"No improvement in validation loss. Patience counter: {patience_counter}")

        # Stop training if patience limit is reached
        if patience_counter >= patience:
            print("Early stopping triggered.")
            break

    # Load the best model before returning
    model.load_state_dict(torch.load("best_model.pt"))
    return model


# 6. EVALUATING MODEL
Assesses model performance on the test dataset, generating accuracy, confusion matrix, and precision-recall metrics.

In [None]:
def evaluate_model(
    model: nn.Module,
    test_dataset: List[Tuple[List[int], int]],
    num_categories: int
) -> Tuple[Dict[str, float], List[List[int]], float, Dict[str, float]]:
    """
    Evaluates the trained model on the test dataset, calculating predictions, 
    confusion matrix, accuracy, precision, recall, F1 scores, and performs k-fold cross-validation.
    
    Parameters:
    - model (nn.Module): The trained PyTorch model to be evaluated.
    - test_dataset (List[Tuple[List[int], int]]): Tokenized test dataset with labels.
    - num_categories (int): The number of unique categories for classification.
    
    Returns:
    - Tuple[Dict[str, float], List[List[int]], float, Dict[str, float]]: 
        Dictionary with precision, recall, F1 scores for each class, confusion matrix, 
        overall accuracy, and k-fold cross-validation metrics.
    """
    
    # DataLoader for test dataset
    test_loader = DataLoader(test_dataset, batch_size=32)
    
    # Set model to evaluation mode
    model.eval()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    all_labels = []
    all_preds = []

    # No gradient calculation needed during evaluation
    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch[0].to(device)
            labels = batch[1].to(device)
            
            # Forward pass
            outputs = model(input_ids=input_ids)
            logits = outputs.logits
            preds = torch.argmax(logits, dim=1)
            
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    # Calculate metrics
    confusion = confusion_matrix(all_labels, all_preds)
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average="weighted")
    
    per_class_metrics = {
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "accuracy": accuracy
    }
    
    # Print per-category metrics
    for category in range(num_categories):
        cat_precision, cat_recall, cat_f1, _ = precision_recall_fscore_support(
            all_labels, all_preds, labels=[category], average="binary", zero_division=0
        )
        print(f"Category {category} - Precision: {cat_precision:.4f}, Recall: {cat_recall:.4f}, F1: {cat_f1:.4f}")
    
    # K-Fold Cross-Validation
    kfold_metrics = {}
    kf = KFold(n_splits=5, shuffle=True, random_state=42)
    
    fold_accuracies = []
    for train_idx, val_idx in kf.split(test_dataset):
        train_subset = [test_dataset[i] for i in train_idx]
        val_subset = [test_dataset[i] for i in val_idx]
        
        train_loader = DataLoader(train_subset, batch_size=32)
        val_loader = DataLoader(val_subset, batch_size=32)
        
        model.train()
        optimizer = torch.optim.AdamW(model.parameters(), lr=0.00002)
        
        for epoch in range(1):  # Training for one epoch
            for batch in train_loader:
                input_ids = batch[0].to(device)
                labels = batch[1].to(device)

                optimizer.zero_grad()
                outputs = model(input_ids=input_ids, labels=labels)
                loss = outputs.loss
                loss.backward()
                optimizer.step()

        # Evaluation on validation set for the fold
        val_labels, val_preds = [], []
        model.eval()
        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch[0].to(device)
                labels = batch[1].to(device)

                outputs = model(input_ids=input_ids)
                preds = torch.argmax(outputs.logits, dim=1)

                val_labels.extend(labels.cpu().numpy())
                val_preds.extend(preds.cpu().numpy())

        fold_accuracy = accuracy_score(val_labels, val_preds)
        fold_accuracies.append(fold_accuracy)

    kfold_metrics["kfold_accuracy_mean"] = np.mean(fold_accuracies)
    kfold_metrics["kfold_accuracy_std"] = np.std(fold_accuracies)

    return per_class_metrics, confusion.tolist(), accuracy, kfold_metrics


# SAVE RESULTS
Persistently stores the trained model, evaluation metrics, and category mappings, ensuring reproducibility.

In [None]:
def save_model(model: nn.Module, file_path: str) -> None:
    """
    Saves the trained model to the specified file path in PyTorch's native format.

    Parameters:
    - model (nn.Module): Model to save, typically a PyTorch-based transformer model.
    - file_path (str): Path to save the model (should end with .pth or .pt).
    """
    # Ensure the model is in evaluation mode before saving
    model.eval()
    
    # Save the model's state dictionary to the specified file path
    torch.save(model.state_dict(), file_path)
    
    print(f"Model saved successfully at {file_path}")


In [None]:
def save_metrics(metrics: Dict[str, Any], file_path: str) -> None:
    """
    Saves evaluation metrics to a specified file path in JSON format.

    Parameters:
    - metrics (Dict[str, Any]): Dictionary containing the evaluation metrics to save.
    - file_path (str): Path to save the metrics (should end with .json).
    """
    with open(file_path, 'w') as f:
        json.dump(metrics, f, indent=4)
    
    print(f"Metrics saved successfully at {file_path}")

In [None]:
def save_category_mapping(mapping: Dict[str, int], file_path: str) -> None:
    """
    Saves category mapping to a specified file path in JSON format.

    Parameters:
    - mapping (Dict[str, int]): Mapping of categories to integer labels.
    - file_path (str): Path to save the category mapping (should end with .json).
    """
    with open(file_path, 'w') as f:
        json.dump(mapping, f, indent=4)
    
    print(f"Category mapping saved successfully at {file_path}")


# MAIN EXECUTION FLOW
The main procedure orchestrates the entire workflow, from data loading and preprocessing to model training and evaluation.

In [None]:
def main() -> None:
    """
    Main execution flow for news classification, including data loading, preprocessing, 
    model initialization, hyperparameter tuning, training, evaluation, and saving results.
    """
    # Step 1: Load and preprocess
    raw_data = load_dataset("data.parquet")
    cleaned_data = preprocess_data(raw_data)
    
    # Step 2: Validate text composition
    best_strategy = validate_text_composition(cleaned_data)
    
    # Step 3: Prepare data
    train_loader, validation_loader, test_loader, category_mapping = prepare_data(cleaned_data, best_strategy)
    
    # Step 4: Hyperparameter tuning
    best_hyperparameters = tune_hyperparameters(
        train_dataset=train_loader,
        validation_dataset=validation_loader,
        num_categories=len(category_mapping)
    )

    # Step 5: Initialize model with the correct number of categories
    model = initialize_model(num_categories=len(category_mapping))
    
    # Step 6: Train with tuned hyperparameters
    trained_model = train_model(
        model=model,
        train_dataset=train_loader,
        validation_dataset=validation_loader,
        max_epochs=best_hyperparameters["epochs"],
        batch_size=best_hyperparameters["batch_size"],
        learning_rate=best_hyperparameters["learning_rate"],
        patience=3  # Optional: Adjust if needed based on results
    )
    
    # Step 7: Evaluate on the test set
    metrics = evaluate_model(trained_model, test_loader, num_categories=len(category_mapping))
    
    # Step 8: Save results
    save_model(trained_model, "final_model.pth")
    save_metrics(metrics, "evaluation_metrics.json")
    save_category_mapping(category_mapping, "category_mapping.json")
