# **Text Classification for Topic Modeling**

# Installation and Imports

In [None]:
# Install necessary dependencies
!pip install transformers torch datasets imbalanced-learn

In [None]:
# Core libraries
import os
import re
import numpy as np
import pandas as pd

# Text processing
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
from html.parser import HTMLParser

# Plotting
import matplotlib.pyplot as plt

# Sklearn utilities
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, classification_report, precision_score, recall_score, f1_score

# Classical ML models
from sklearn.naive_bayes import MultinomialNB
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier

# Resampling
from imblearn.over_sampling import SMOTE

# Hugging Face Transformers
import torch
from datasets import Dataset, Value
from transformers import (
    DistilBertTokenizerFast,
    DistilBertForSequenceClassification,
    Trainer,
    TrainingArguments,
    EarlyStoppingCallback,
    TextClassificationPipeline
)

In [None]:
# Download NLTK resources
nltk.download('punkt_tab')  # For word tokenization
nltk.download('stopwords')  # For stopwords list

# Load Data

https://kdd.ics.uci.edu/databases/reuters21578/reuters21578.html

In [None]:
# Mount Google Drive to access dataset
drive.mount('/content/drive')

In [None]:
# Set Path to Dataset in Google Drive
data_dir = '/content/drive/My Drive/reuters21578'  # Update this to your folder path
print(f"Dataset path: {data_dir}")

In [None]:
# Helper function adapted from https://github.com/marius92mc/document-classification-reuters21578/blob/master/classification/reuters_parser.py

# Define ReutersParser class (adapted from SGML parser)
class ReutersParser(HTMLParser):
    def __init__(self, encoding='latin-1'):
        HTMLParser.__init__(self)
        self._reset()
        self.encoding = encoding

    def _reset(self):
        self.in_body = False
        self.in_topics = False
        self.in_topic_d = False
        self.in_reuters = False
        self.body = ""
        self.topics = []
        self.topic_d = ""
        self.reuters = ""
        self.cgisplit = ""

    def parse(self, fd):
        self.docs = []
        for chunk in fd:
            self.feed(chunk)
            for doc in self.docs:
                yield doc
            self.docs = []
        self.close()

    def handle_starttag(self, tag, attrs):
        if tag == "reuters":
            self.in_reuters = True
            for attribute in attrs:
                if attribute[0] == "cgisplit":
                    self.cgisplit = attribute[1].encode("utf-8").lower()
                    break
        elif tag == "body":
            self.in_body = True
        elif tag == "topics":
            self.in_topics = True
        elif tag == "d":
            self.in_topic_d = True

    def handle_endtag(self, tag):
        if tag == "reuters":
            self.body = re.sub(r'\s+', r' ', self.body)
            self.in_reuters = False
            self.docs.append((self.topics, self.body, self.cgisplit))
            self._reset()
        elif tag == "body":
            self.in_body = False
        elif tag == "topics":
            self.in_topics = False
        elif tag == "d":
            self.in_topic_d = False
            self.topics.append(self.topic_d)
            self.topic_d = ""

    def handle_data(self, data):
        if self.in_body:
            self.body += data
        elif self.in_topic_d:
            self.topic_d += data

In [None]:
# Parse the SGML files from Google Drive folder
def parse_sgm(data_dir):
    articles = []
    for filename in os.listdir(data_dir):
        if filename.endswith('.sgm'):
            try:
                file_path = os.path.join(data_dir, filename)
                print(f"Parsing file: {file_path}")

                parser = ReutersParser(encoding='latin-1')
                with open(file_path, 'r', encoding='latin-1') as file:
                    for topics, body, cgisplit in parser.parse(file):
                        articles.append({"text": body, "categories": topics})
            except Exception as e:
                print(f"Error parsing file {filename}: {e}")
    return articles

In [None]:
articles = parse_sgm(data_dir)

In [None]:
# Convert to a Pandas DataFrame for easier handling
df = pd.DataFrame(articles)

# Preprocess and explore data

In [None]:
# Preprocess the Text Data (Tokenization, Stopwords Removal, Stemming)
def preprocess_text(text):
    stop_words = set(stopwords.words('english'))
    stemmer = PorterStemmer()

    # Tokenization and lowercasing
    words = word_tokenize(text.lower())
    words = [word for word in words if word.isalpha()]  # Remove non-alphabetic characters
    words = [word for word in words if word not in stop_words]  # Remove stopwords
    words = [stemmer.stem(word) for word in words]  # Apply stemming

    return " ".join(words)


In [None]:
# Apply preprocessing to all articles
df['processed_text'] = df['text'].apply(preprocess_text)

### Convert Multilabel to Multiclass

In [None]:
# Ensure unique_categories is defined
unique_categories = list(set([category for sublist in df['categories'] for category in sublist]))

In [None]:
# Convert Multi-Label to Multi-Class Based on the Most Frequent Label
def get_most_frequent_label(categories, unique_categories):
    if not categories:  # Check if the categories list is empty
        return np.random.choice(unique_categories)  # Return a random label if no categories exist
    label_counts = {label: categories.count(label) for label in set(categories)}
    max_count = max(label_counts.values())  # Find the label(s) with the highest frequency
    most_frequent_labels = [label for label, count in label_counts.items() if count == max_count]
    return np.random.choice(most_frequent_labels)  # Randomly select in case of tie

In [None]:
df['most_frequent_label'] = df['categories'].apply(lambda x: get_most_frequent_label(x, unique_categories))

In [None]:
# Checking the updated DataFrame
print(df.head())

### Check for Class Imbalance

In [None]:
# Number of unique classes
num_unique_classes = df['most_frequent_label'].nunique()
print(f"Number of unique classes: {num_unique_classes}")

In [None]:
# Count the frequency of each label in the 'most_frequent_label' column
label_counts = Counter(df['most_frequent_label'])

In [None]:
# Convert the label counts to a DataFrame for easier handling
label_df = pd.DataFrame(label_counts.items(), columns=['Category', 'Count'])
label_df = label_df.sort_values(by='Count', ascending=False)

In [None]:
# Plot the distribution of categories
plt.figure(figsize=(12, 6))
label_df.head(20).plot(kind='bar', x='Category', y='Count', legend=False)
plt.title("Top 20 Categories in Reuters-21578 Dataset (Converted to Single Class)")
plt.xlabel("Category")
plt.ylabel("Frequency")
plt.xticks(rotation=90)
plt.tight_layout()
plt.show()

In [None]:
# Print out the top 20 categories and their counts
print("Top 20 categories by frequency:\n", label_df.head(20))

In [None]:
# Check the class distribution in percentage
label_df['Percentage'] = label_df['Count'] / label_df['Count'].sum() * 100
print("Class distribution (in percentage):\n", label_df[['Category', 'Percentage']])

### Vectorisation

In [None]:
# Convert Processed Text into TF-IDF Features
vectorizer = TfidfVectorizer(max_features=1000)
X = vectorizer.fit_transform(df['processed_text'])

# Training

### Train Test Split

In [None]:
# Split Data into Train and Test
X_train, X_test, y_train, y_test = train_test_split(X, df['most_frequent_label'], test_size=0.3, random_state=42)

## Train Naive Bayes Model

In [None]:
# Initialize Naive Bayes model
nb_model = MultinomialNB()

In [None]:
# Train the model
nb_model.fit(X_train, y_train)
y_pred = nb_model.predict(X_test)

In [None]:
# Evaluate the model
print("Naive Bayes Model without SMOTE:")
print(f"Accuracy: {accuracy_score(y_test, y_pred)}")
print(classification_report(y_test, y_pred))

## Train Random Forest

In [None]:
# Initialize Random Forest classifier with class weights
rf_model = RandomForestClassifier(n_estimators=100, class_weight='balanced', random_state=42)

In [None]:
# Train the Random Forest model
rf_model.fit(X_train, y_train)

In [None]:
# Make predictions on the test set
y_pred_rf = rf_model.predict(X_test)

In [None]:
# Evaluate the Random Forest model
print("Random Forest Model Evaluation:")
print(f'Accuracy: {accuracy_score(y_test, y_pred_rf)}')
print(classification_report(y_test, y_pred_rf))

## Random Forest: Hyperparameter Tuning with RandomSearch

In [None]:
# Define the hyperparameter grid for Random Forest
param_dist = {
    'n_estimators': np.arange(50, 500, 50),  # Number of trees in the forest
    'max_depth': [None, 10, 20, 30],  # Maximum depth of the trees
    'min_samples_split': [2, 5, 10],  # Minimum number of samples required to split an internal node
    'min_samples_leaf': [1, 2, 4],  # Minimum number of samples required to be at a leaf node
    'max_features': ['auto', 'sqrt', 'log2'],  # Number of features to consider when looking for the best split
    'bootstrap': [True, False],  # Whether bootstrap samples are used when building trees
}

In [None]:
# Initialize the Random Forest Classifier
rf_model = RandomForestClassifier(class_weight='balanced', random_state=42)

In [None]:
# Setup RandomizedSearchCV to search over the parameter grid
random_search = RandomizedSearchCV(rf_model, param_distributions=param_dist,
                                   n_iter=10, cv=3, n_jobs=-1, random_state=42, verbose=2)

In [None]:
# Train the model with hyperparameter tuning
random_search.fit(X_train, y_train)

In [None]:
print(f"Best parameters found: {random_search.best_params_}")

In [None]:
# Evaluate the model on the test data using the best parameters
y_pred_rf = random_search.predict(X_test)  # Predict using the best model found by RandomizedSearchCV

In [None]:
# Display the classification report
print("Random Forest Model Evaluation with Hyperparameter Tuning:")
print(f"Accuracy: {accuracy_score(y_test, y_pred_rf)}")
print(classification_report(y_test, y_pred_rf))  # Shows precision, recall, f1-score for each class

## Train XG Boost

In [None]:
# Remove rare classes (fewer than 2 samples)
label_counts = df['most_frequent_label'].value_counts()
valid_labels = label_counts[label_counts >= 2].index
df_filtered = df[df['most_frequent_label'].isin(valid_labels)].copy()

In [None]:
# Encode labels
label_encoder = LabelEncoder()
df_filtered['label_encoded'] = label_encoder.fit_transform(df_filtered['most_frequent_label'])

In [None]:
# Train/test split
# Split the data only AFTER filtering rare classes and re-encoding labels.
# This ensures the label indices are consistent, and stratified split won't break due to classes with only 1 sample.
X_train, X_test, y_train, y_test = train_test_split(
    X[df_filtered.index],  # Make sure X matches filtered rows
    df_filtered['label_encoded'],
    test_size=0.3,
    stratify=df_filtered['label_encoded'],
    random_state=42
)

In [None]:
# Count unique classes in training set
n_classes = len(np.unique(y_train_encoded))
print("Number of unique classes in training set:", n_classes)

In [None]:
# Initialize XGBoost classifier
xgb_model = XGBClassifier(
    objective='multi:softmax',
    num_class=n_classes,
    eval_metric='mlogloss',
    use_label_encoder=False,
    n_estimators=100,
    max_depth=6,
    learning_rate=0.1,
    subsample=0.8,
    colsample_bytree=0.8,
    random_state=42
)

In [None]:
# Train the model
xgb_model.fit(X_train, y_train_encoded)

In [None]:
# Predict and evaluate
y_pred_xgb = xgb_model.predict(X_test)

In [None]:
print("XGBoost Model Evaluation:")
print(f"Accuracy: {accuracy_score(y_test_encoded, y_pred_xgb):.4f}")
print("Classification Report:\n", classification_report(y_test_encoded, y_pred_xgb))

## Train BERT Transformer Model

In [None]:
# Debugging Setup
# Enables clearer CUDA traceback on crash
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

In [None]:
# Filter Rare Classes
label_counts = df['most_frequent_label'].value_counts()
valid_labels = label_counts[label_counts >= 5].index  # Only keep classes with at least 5 examples
df_filtered = df[df['most_frequent_label'].isin(valid_labels)].copy()


In [None]:
# Encode Labels
label_encoder = LabelEncoder()
df_filtered['label'] = label_encoder.fit_transform(df_filtered['most_frequent_label'])
num_labels = df_filtered['label'].nunique()

print("Min label:", df_filtered['label'].min())
print("Max label:", df_filtered['label'].max())
print("Number of labels:", num_labels)

In [None]:
# Train/Test Split
train_texts, test_texts, train_labels, test_labels = train_test_split(
    df_filtered['processed_text'].tolist(),
    df_filtered['label'].tolist(),
    test_size=0.3,
    stratify=df_filtered['label'],
    random_state=42
)

print("Train labels range:", min(train_labels), max(train_labels))
print("Test labels range:", min(test_labels), max(test_labels))


In [None]:
# Tokenization
tokenizer = DistilBertTokenizerFast.from_pretrained("distilbert-base-uncased")

def tokenize(batch):
    return tokenizer(batch["text"], padding="max_length", truncation=True)

train_dataset = Dataset.from_dict({"text": train_texts, "label": train_labels}).map(tokenize, batched=True)
test_dataset = Dataset.from_dict({"text": test_texts, "label": test_labels}).map(tokenize, batched=True)

In [None]:
# Compute Metrics
def compute_metrics(pred):
    labels = pred.label_ids
    preds = np.argmax(pred.predictions, axis=1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average="weighted", zero_division=0)
    acc = accuracy_score(labels, preds)
    return {"accuracy": acc, "precision": precision, "recall": recall, "f1": f1}


In [None]:
# Model Definition
model = DistilBertForSequenceClassification.from_pretrained(
    "distilbert-base-uncased",
    num_labels=num_labels
)

# Training Arguments
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=8,     # Smaller batch size to avoid CUDA issues
    per_device_eval_batch_size=32,
    num_train_epochs=10,
    weight_decay=0.01,
    logging_dir="./logs",
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    greater_is_better=True,
    seed=42
)

In [None]:
# Trainer Setup
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)

# Train the Model
trainer.train()

In [None]:
# Evaluate the Model
metrics = trainer.evaluate()
print("\nFinal Evaluation Metrics:")
print(metrics)

##  Save & Load BERT Model to/from Google Drive

In [None]:
# Mount Google Drive
drive.mount('/content/drive')

In [None]:
# Define Save Path
save_path = "/content/drive/MyDrive/reuters_classifier"
os.makedirs(save_path, exist_ok=True)

In [None]:
# Save the trained BERT model and tokenizer for future use
trainer.save_model(save_path)
tokenizer.save_pretrained(save_path)

### Load the Saved Model for Predictions

In [None]:
# Load Model and Tokenizer from Google Drive without retraining
load_path = "/content/drive/MyDrive/reuters_classifier"
model = DistilBertForSequenceClassification.from_pretrained(load_path)
tokenizer = DistilBertTokenizerFast.from_pretrained(load_path)


In [None]:
def create_classification_pipeline(model, tokenizer, use_gpu=True, return_probs=False):
    """
    Create a text classification pipeline with the given model and tokenizer.
    """
    # Make sure model is in evaluation mode and on the correct device
    device = 0 if (use_gpu and torch.cuda.is_available()) else -1
    return TextClassificationPipeline(
        model=model,
        tokenizer=tokenizer,
        device=device,
        return_all_scores=return_probs
    )

def extract_predicted_class_ids(predictions):
    """
    Extract class IDs from pipeline predictions.
    """
    return [int(pred['label'].split('_')[-1]) for pred in predictions]

def decode_predictions(class_ids, label_encoder):
    """
    Decode integer class IDs into original label names.
    """
    return label_encoder.inverse_transform(class_ids)

def predict_texts(texts, model, tokenizer, label_encoder, use_gpu=True, return_probs=False):
    """
    Complete modular prediction pipeline:
    - Create pipeline
    - Predict
    - Decode labels
    """
    pipeline = create_classification_pipeline(model, tokenizer, use_gpu, return_probs)
    predictions = pipeline(texts)

    if return_probs:
        return predictions  # Raw scores for all classes

    class_ids = extract_predicted_class_ids(predictions)
    decoded_labels = decode_predictions(class_ids, label_encoder)
    return decoded_labels


# Model Generalization

# Predict on unseen data

In [None]:
#generated example texts
unseen_texts = [
    "The stock market responded positively to the company's earnings report.",
    "The agricultural sector saw a sharp drop in wheat exports.",
    "Major oil producers are discussing supply cuts."
]

predicted = predict_texts(unseen_texts, model, tokenizer, label_encoder)

for text, label in zip(unseen_texts, predicted):
    print(f"Text:\n{text}\n→ Predicted Label: {label}\n")


### Predict on Twitter Dataset

In [None]:
def load_and_clean_tweet_data(file_path):
    """
    Load and clean the tweet dataset.

    Args:
        file_path (str): Path to the CSV file.

    Returns:
        pd.DataFrame: Cleaned DataFrame with columns ['id', 'tweet', 'labels'].
    """
    df = pd.read_csv(file_path, header=None, usecols=[0, 1, 2], names=["id", "tweet", "labels"], encoding="utf-8", engine='python', on_bad_lines='skip')

    # Drop rows with missing or empty labels
    df = df[df['labels'].notna() & (df['labels'].str.strip() != "")]

    # Remove stray semicolons and clean whitespace
    df['labels'] = df['labels'].astype(str).str.replace(";", "", regex=False).str.strip()

    return df

In [None]:
twitter_df = load_and_clean_tweet_data("/content/drive/MyDrive/mLabel_tweets.csv")
twitter_df.head(10)

In [None]:
# Sample 10 random tweets from the dataset
sampled_tweets = df.sample(n=10, random_state=42)['tweet'].tolist()

In [None]:
# Format into a list of strings
unseen_texts = [str(tweet) for tweet in sampled_tweets]

In [None]:
# Print the result
print("unseen_texts = [")
for tweet in unseen_texts:
    print(f'    "{tweet}",')
print("]")


In [None]:
# Make predictions on Twitter dataset
predicted = predict_texts(unseen_texts, model, tokenizer, label_encoder)

for text, label in zip(unseen_texts, predicted):
    print(f"Text:\n{text}\n→ Predicted Label: {label}\n")


## Finetuning on Twitter Dataset

https://www.kaggle.com/datasets/prox37/twitter-multilabel-classification-dataset

In [None]:
# Set Device & Seed
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
torch.manual_seed(42)

In [None]:
# Load and Clean Data
def load_twitter_data(path):
    df = pd.read_csv(path, header=None, names=["id", "tweet", "labels"], usecols=[0, 1, 2],
                     encoding="utf-8", engine="python", on_bad_lines="skip")
    df = df[df["labels"].notna() & (df["labels"].str.strip() != "")]
    df["labels"] = df["labels"].astype(str).str.replace(";", "", regex=False).str.strip()
    return df

df = load_twitter_data("/content/drive/MyDrive/mLabel_tweets.csv")

# Encode labels first
label_encoder = LabelEncoder()
df["label_raw"] = df["labels"]
df["label"] = label_encoder.fit_transform(df["label_raw"])

# Remove classes that appear only once
label_counts = df["label"].value_counts()
valid_labels = label_counts[label_counts > 1].index
df = df[df["label"].isin(valid_labels)].copy()

# Re-encode after filtering
df["label_raw"] = label_encoder.inverse_transform(df["label"])
label_encoder = LabelEncoder()
df["label"] = label_encoder.fit_transform(df["label_raw"])
num_classes = len(label_encoder.classes_)

print("Number of unique classes after filtering:", num_classes)

In [None]:
# Train/Test Split
train_texts, test_texts, train_labels, test_labels = train_test_split(
    df["tweet"].tolist(),
    df["label"].tolist(),
    test_size=0.2,
    stratify=df["label"],
    random_state=42
)

In [None]:
# Tokenization
tokenizer = DistilBertTokenizerFast.from_pretrained("distilbert-base-uncased")

def tokenize(batch):
    return tokenizer(batch["text"], padding="max_length", truncation=True, max_length=128)

# Create HuggingFace Datasets
train_dataset = Dataset.from_dict({
    "text": train_texts,
    "label": [int(label) for label in train_labels]
}).map(tokenize, batched=True)

test_dataset = Dataset.from_dict({
    "text": test_texts,
    "label": [int(label) for label in test_labels]
}).map(tokenize, batched=True)

# Fix label dtype for CrossEntropyLoss
train_dataset = train_dataset.cast_column("label", Value("int64"))
test_dataset = test_dataset.cast_column("label", Value("int64"))

In [None]:
# Load and Adapt Pretrained Base Model
from transformers import DistilBertConfig

# Load config and change num_labels
config = DistilBertConfig.from_pretrained(load_path)
config.num_labels = num_classes

# Load the model WITHOUT classification head weights
model = DistilBertForSequenceClassification.from_pretrained(load_path, config=config, ignore_mismatched_sizes=True)
model.to(device)


In [None]:
#  Metrics
def compute_metrics(pred):
    logits, labels = pred.predictions, pred.label_ids
    preds = np.argmax(logits, axis=1)
    return {
        "accuracy": accuracy_score(labels, preds),
        "precision": precision_score(labels, preds, average="weighted", zero_division=0),
        "recall": recall_score(labels, preds, average="weighted", zero_division=0),
        "f1": f1_score(labels, preds, average="weighted", zero_division=0),
    }

In [None]:
# Training Arguments
training_args = TrainingArguments(
    output_dir="./results_twitter_finetune",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    num_train_epochs=5,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    greater_is_better=True,
    logging_dir="./logs_twitter_finetune",
)

# Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

# Train
trainer.train()

In [None]:
# Evaluate
metrics = trainer.evaluate()
print("Final Evaluation on Twitter Data:", metrics)

In [None]:
# Save Model
save_path = "/content/drive/MyDrive/twitter_finetuned_bert"
trainer.save_model(save_path)
tokenizer.save_pretrained(save_path)

### Make predictions

In [None]:
#generated example texts
unseen_texts = [
    "Vaccines have many side effects including blood clots.",
    "I don't trust pharmaceutical companies making money off this.",
    "It is a rushed and untested experiment.",
    "Vaccines are unnecessary because we have natural immunity."
]

predicted = predict_texts(unseen_texts, model, tokenizer, label_encoder)

for text, label in zip(unseen_texts, predicted):
    print(f"Text:\n{text}\n→ Predicted Label: {label}\n")