# Topic Modelling Using Transformers

In [None]:
import numpy as np
import pandas as pd
from sklearn.datasets import fetch_20newsgroups
from sklearn.model_selection import train_test_split
from bertopic import BERTopic
from sentence_transformers import SentenceTransformer
from umap import UMAP
from hdbscan import HDBSCAN
from sklearn.feature_extraction.text import CountVectorizer
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AdamW, get_linear_schedule_with_warmup
import pytorch_lightning as pl

# Check for GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

### Load and preprocess the dataset:

In [None]:
# Load the 20 Newsgroups dataset
newsgroups = fetch_20newsgroups(
    subset='all', remove=('headers', 'footers', 'quotes'))

# Create a DataFrame
df = pd.DataFrame({'text': newsgroups.data, 'target': newsgroups.target})

# Split the data into train and test sets
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

print(f"Train set size: {len(train_df)}")
print(f"Test set size: {len(test_df)}")

### Define a custom dataset and data module for fine-tuning:

In [None]:
class NewsGroupsDataset(Dataset):
    def __init__(self, texts, targets):
        self.texts = texts
        self.targets = targets

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        return self.texts[idx], self.targets[idx]


class NewsGroupsDataModule(pl.LightningDataModule):
    def __init__(self, train_df, test_df, batch_size=32):
        super().__init__()
        self.train_df = train_df
        self.test_df = test_df
        self.batch_size = batch_size

    def setup(self, stage=None):
        self.train_dataset = NewsGroupsDataset(
            self.train_df['text'].tolist(), self.train_df['target'].tolist())
        self.test_dataset = NewsGroupsDataset(
            self.test_df['text'].tolist(), self.test_df['target'].tolist())

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size)

### Fine-tune the SentenceTransformer model:

In [None]:
import torch
import pytorch_lightning as pl
from sentence_transformers import SentenceTransformer
from torch.optim import AdamW
from transformers import get_linear_schedule_with_warmup


class SentenceTransformerFinetuner(pl.LightningModule):
    def __init__(self, model_name='all-MiniLM-L6-v2', lr=2e-5):
        super().__init__()
        self.model = SentenceTransformer(model_name)
        self.lr = lr

    def forward(self, batch):
        return self.model.encode(batch, convert_to_tensor=True)

    def training_step(self, batch, batch_idx):
        texts, labels = batch
        embeddings = self(texts)

        # Ensure embeddings require gradients
        if not embeddings.requires_grad:
            embeddings.requires_grad_(True)

        # Calculate pairwise distances
        distances = torch.cdist(embeddings, embeddings)

        # Calculate loss
        loss = self.model.get_sentence_embedding_dimension() - distances.mean()

        self.log('train_loss', loss)
        return loss

    def configure_optimizers(self):
        # Ensure all parameters require gradients
        for param in self.model.parameters():
            param.requires_grad = True

        optimizer = AdamW(self.model.parameters(), lr=self.lr)
        scheduler = get_linear_schedule_with_warmup(
            optimizer,
            num_warmup_steps=0,
            num_training_steps=self.trainer.estimated_stepping_batches
        )
        return [optimizer], [scheduler]


# Initialize the data module and model
data_module = NewsGroupsDataModule(train_df, test_df)
model = SentenceTransformerFinetuner()

# Train the model
trainer = pl.Trainer(max_epochs=3, accelerator='auto')
trainer.fit(model, data_module)

# Save the fine-tuned model
model.model.save('fine_tuned_sentence_transformer')

### Create and train the BERTopic model:

In [None]:
# Load the fine-tuned SentenceTransformer model
sentence_model = SentenceTransformer('fine_tuned_sentence_transformer')

# Initialize UMAP and HDBSCAN
umap_model = UMAP(n_neighbors=15, n_components=5,
                  min_dist=0.0, metric='cosine')
hdbscan_model = HDBSCAN(min_cluster_size=15, metric='euclidean',
                        cluster_selection_method='eom', prediction_data=True)

# Initialize CountVectorizer
vectorizer_model = CountVectorizer(stop_words="english")

# Create and train BERTopic model
topic_model = BERTopic(
    embedding_model=sentence_model,
    umap_model=umap_model,
    hdbscan_model=hdbscan_model,
    vectorizer_model=vectorizer_model,
    verbose=True
)

topics, _ = topic_model.fit_transform(train_df['text'].tolist())

print("BERTopic model training completed.")

### Evaluate the model:

In [None]:
def evaluate_topic_model(topic_model, test_df):
    # Get predictions on test data
    test_topics, _ = topic_model.transform(test_df['text'].tolist())

    # Calculate coherence score
    coherence_score = topic_model.get_topic_info()['Coherence'].mean()

    # Calculate diversity score (unique words in top N words per topic)
    N = 20
    topic_words = [word for topic in topic_model.get_topics().values()
                   for word, _ in topic[:N]]
    diversity_score = len(set(topic_words)) / \
        (len(topic_model.get_topics()) * N)

    # Calculate topic quality using Adjusted Rand Index and Normalized Mutual Information
    true_labels = test_df['target'].tolist()
    ari = adjusted_rand_score(true_labels, test_topics)
    nmi = normalized_mutual_info_score(true_labels, test_topics)

    return {
        'coherence_score': coherence_score,
        'diversity_score': diversity_score,
        'adjusted_rand_index': ari,
        'normalized_mutual_info': nmi
    }


evaluation_results = evaluate_topic_model(topic_model, test_df)
print("Evaluation Results:")
for metric, value in evaluation_results.items():
    print(f"{metric}: {value}")

# Visualize the topics:

In [None]:
# Visualize topics
topic_model.visualize_topics().write_html("topic_visualization.html")
print("Topic visualization saved as 'topic_visualization.html'")

# Visualize topic distribution
topic_distr = topic_model.get_topic_info()
plt.figure(figsize=(12, 6))
sns.barplot(x='Topic', y='Count', data=topic_distr)
plt.title('Topic Distribution')
plt.xlabel('Topic')
plt.ylabel('Number of Documents')
plt.xticks(rotation=90)
plt.tight_layout()
plt.savefig('topic_distribution.png')
plt.close()
print("Topic distribution plot saved as 'topic_distribution.png'")

### Create a function to get topics for new text:

In [None]:
def get_topics_for_text(texts, topic_model):
    topics, probs = topic_model.transform(texts)
    return list(zip(topics, probs))


# Example usage
new_texts = [
    "This is a sample text about computer science and programming.",
    "The stock market experienced significant fluctuations today."
]
new_topics = get_topics_for_text(new_texts, topic_model)
for text, (topic, prob) in zip(new_texts, new_topics):
    print(f"Text: {text}")
    print(f"Assigned Topic: {topic}")
    print(f"Probability: {prob}")
    print(
        f"Top words: {', '.join([word for word, _ in topic_model.get_topic(topic)])}")
    print()

### Write tests for the topic modeling pipeline:

In [None]:
import unittest


class TestTopicModeling(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        # Load the trained model and necessary data
        cls.topic_model = BERTopic.load("trained_bertopic_model")
        cls.test_df = pd.read_csv("test_data.csv")

    def test_model_output_format(self):
        topics, probs = self.topic_model.transform(
            self.test_df['text'].tolist())
        self.assertEqual(len(topics), len(self.test_df))
        self.assertEqual(len(probs), len(self.test_df))
        self.assertTrue(all(isinstance(topic, int) for topic in topics))
        self.assertTrue(all(isinstance(prob, float) and 0 <=
                        prob <= 1 for prob in probs))

    def test_topic_coherence(self):
        coherence_score = self.topic_model.get_topic_info()['Coherence'].mean()
        self.assertGreater(coherence_score, 0.3)  # Adjust threshold as needed

    def test_topic_diversity(self):
        N = 20
        topic_words = [word for topic in self.topic_model.get_topics().values()
                       for word, _ in topic[:N]]
        diversity_score = len(set(topic_words)) / \
            (len(self.topic_model.get_topics()) * N)
        self.assertGreater(diversity_score, 0.5)  # Adjust threshold as needed

    def test_new_text_assignment(self):
        new_text = "This is a sample text about artificial intelligence and machine learning."
        topic, prob = self.topic_model.transform([new_text])[0]
        self.assertIsInstance(topic, int)
        self.assertIsInstance(prob, float)
        self.assertTrue(0 <= prob <= 1)

    def test_model_consistency(self):
        text = "This is a test text for consistency."
        topics1, _ = self.topic_model.transform([text])
        topics2, _ = self.topic_model.transform([text])
        self.assertEqual(topics1, topics2)


if __name__ == '__main__':
    unittest.main()

### Save and load the model:

In [None]:
# Save the model
topic_model.save("trained_bertopic_model")
print("Model saved as 'trained_bertopic_model'")

# Load the model (for future use)
loaded_model = BERTopic.load("trained_bertopic_model")
print("Model loaded successfully")