In [1]:
import os

from pathlib import Path
from datetime import datetime, timedelta
from dotenv import load_dotenv

import numpy as np
import openai
import pandas as pd

from bson import ObjectId
from pymongo import MongoClient, errors
from pymongo.database import Database
from tqdm import tqdm

# bertopic components
from bertopic import BERTopic
from bertopic.representation import OpenAI
from bertopic.vectorizers import ClassTfidfTransformer
from sklearn.feature_extraction.text import CountVectorizer
from umap import UMAP
from hdbscan import HDBSCAN

load_dotenv("../../.env")

True

# Data collection and preprocessing

In [2]:
DATABASE = 'insightfinder-dev'

DOCS_COLLECTION = 'documents'
DOC_EMBEDDINGS_COLLECTION = 'embeddings'
TOPICS_COLLECTION = 'topics'
TOPIC_EMBEDDINGS_COLLECTION = 'topic_embeddings'

MONGO_HOST = os.getenv("MONGO_HOST")

In [3]:
def mongo_transaction(func):
    """
    A decorator to execute a MongoDB operation with error handling.
    """
    def wrapper(*args, **kwargs):
        try:
            with MongoClient(MONGO_HOST) as mongo_client:
                db = mongo_client[DATABASE]
                return func(db, *args, **kwargs)
        except errors.PyMongoError as e:
            print(f"MongoDB error: {e}")
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
    return wrapper
        

@mongo_transaction
def get_data(db: Database, start_date: str, end_date: str, limit: int = None):
    """
    Retrieves the content and embedding data for all ingested articles.
    """
    docs_collection = db[DOCS_COLLECTION]
    embeddings_collection = db[DOC_EMBEDDINGS_COLLECTION]

    # get embeddings
    embeddings = list(
        embeddings_collection.find(
            filter={"embedding": {"$exists": True}},
        )
    )

    # get documents with embeddings
    doc_ids_with_embedding = [doc["_id"] for doc in embeddings]
    documents = docs_collection.find(
        filter={
            "_id": {"$in": doc_ids_with_embedding},
            "parsed_date": {"$gte": start_date, "$lte": end_date},
        },
        projection={
            "_id": 1, "url": 1, "parsed_date": 1,
            "title": 1, "description": 1, "paragraphs": 1
        }
    )
    if limit:
        documents = documents.limit(limit)
    documents = list(documents)

    # filter embeddings
    result_ids = {doc["_id"] for doc in documents}
    embeddings = [doc for doc in embeddings if doc["_id"] in result_ids]
    return documents, embeddings


@mongo_transaction
def check_topics_exist(db: Database, start_date: str, end_date: str):
    """
    Checks if we already have topics calculated for (start_date, end_date) pair.
    """
    topics_collection = db[TOPICS_COLLECTION]
    query = {"topic_start_date": {"$eq": start_date}, "topic_end_date": {"$eq": end_date}}
    return topics_collection.find_one(query) is not None


@mongo_transaction
def insert_topics(db: Database, topics_data: list):
    """
    Insert topic data into the topics collection.
    """
    topics_collection = db[TOPICS_COLLECTION]
    topics_collection.insert_many(topics_data)


@mongo_transaction
def insert_topic_embeddings(db: Database, topics_to_embeddings: dict):
    """
    Insert topic embeddings into the topic embeddings collection.
    """
    topic_embeddings_collection = db[TOPIC_EMBEDDINGS_COLLECTION]
    payload = [{"_id": topic_id, "embedding": topic_embedding} for topic_id, topic_embedding in topics_to_embeddings.items()]
    topic_embeddings_collection.insert_many(payload)


def extract_article_content(document: dict):
    """
    Transforms articles content data (documents) by merging the title, description and paragraphs.
    """
    title = document.get("title")
    if title:
        title = title.strip()

    description = document.get("description")
    if description:
        description = description.strip()

    paragraphs = document.get("paragraphs")
    if isinstance(paragraphs, list):
        paragraphs = (" ".join(p.strip() for p in paragraphs if len(p.strip()) > 0)).strip()
        if description is not None and paragraphs.startswith(description):
            description = None

    article_items = [title or "", description or "", paragraphs or ""]
    return "\n".join([item for item in article_items if len(item) > 0])


def build_topic_model(openai_model_key: str) -> BERTopic:
    # GPT model is used to provide a name for each topic based on a set of topic examplar documents
    openai_client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    openai_model = OpenAI(
        openai_client,
        model=openai_model_key,
        chat=True,
        tokenizer="char",
        doc_length=1000,
        nr_docs=5,
    )
    
    # components of the BERTopic algorithm
    umap_model = UMAP(n_neighbors=10, n_components=3, min_dist=0.0, metric="cosine")
    hdbscan_model = HDBSCAN(min_cluster_size=10, min_samples=2)
    ctfidf_model = ClassTfidfTransformer(bm25_weighting=True, reduce_frequent_words=True)
    vectorizer_model = CountVectorizer(min_df=10, ngram_range=(1, 3))
    
    return BERTopic(
        umap_model=umap_model,
        hdbscan_model=hdbscan_model,
        ctfidf_model=ctfidf_model,
        vectorizer_model=vectorizer_model,
        # representation_model=openai_model,
        # verbose=True,
    )

In [7]:
# parameters

start_date = "2024-07-27"
end_date = "2024-08-05"

openai_model_key = "gpt-4o-mini"

In [8]:
mondays = pd.date_range(start_date, end_date, freq="W-MON")
mondays

DatetimeIndex(['2024-07-29', '2024-08-05'], dtype='datetime64[ns]', freq='W-MON')

In [9]:


pbar = tqdm(
    zip(mondays, mondays[1:]),
    total=len(mondays) - 1,
    desc="Calculating topics",
    leave=True,
    position=0,
)

for monday, next_monday in pbar:
    sunday = next_monday - timedelta(days=1)
    monday_str = monday.strftime("%Y-%m-%d")
    sunday_str = sunday.strftime("%Y-%m-%d")

    # skip already ingested topics
    if check_topics_exist(monday_str, sunday_str):
        print(f"Skipping topics creation for {monday_str} - {sunday_str} ...")
        continue

    # extract documents and embeddings within the date range
    documents, embeddings = get_data(monday.strftime("%Y-%m-%d"), sunday.strftime("%Y-%m-%d"))

    # align order of documents and embeddings
    doc_order = [doc["_id"] for doc in embeddings]
    documents = sorted(documents, key=lambda doc: doc_order.index(doc["_id"]))
    assert [doc["_id"] for doc in documents] == [doc["_id"] for doc in embeddings]

    # extract the article contents and embedding vectors into separate lists
    articles = [extract_article_content(doc) for doc in documents]
    vectors = [doc["embedding"] for doc in embeddings]

    # build topic model
    topic_model = build_topic_model(openai_model_key)

    # calculate topic assignments and assignment probabilities (of document to topic)
    topics_assignment, assignment_probs = topic_model.fit_transform(
        documents=articles,
        embeddings=np.array(vectors),
    )
    topics_labels = topic_model.topic_labels_
    topic_embeddings = topic_model.topic_embeddings_

    # generate unique identifiers for computed topics
    topic_ids = {topic_idx: ObjectId() for topic_idx in topics_labels}

    # insert topic data into the db
    topics_data = []
    for i in range(len(documents)):
        document = documents[i]
        topic_index = topics_assignment[i]
        topic_label = topics_labels[topic_index]
        topics_data.append({
            "document_id": document["_id"],
            "document_date": document["parsed_date"],
            "topic_index": topic_index,
            "topic_label": topic_label,
            "assignment_probability": assignment_probs[i],
            "topic_start_date": monday.strftime("%Y-%m-%d"),
            "topic_end_date": sunday.strftime("%Y-%m-%d"),
            "topic_id": topic_ids[topic_index],
        })
    insert_topics(topics_data)

    # insert topic embeddings into the db
    topic_to_embedding = {}
    for topic_index in topics_labels:
        topic_id = topic_ids[topic_index]
        topic_embedding = topic_embeddings[topic_index]
        topic_to_embedding[topic_id] = list(topic_embedding)
    insert_topic_embeddings(topic_to_embedding)

Calculating topics: 100%|████████████████████████| 1/1 [04:13<00:00, 253.97s/it]


In [10]:
# checking if we manage to prevent overwriting the precalcuated topics

mondays = pd.date_range(start_date, end_date, freq="W-MON")

for monday, next_monday in zip(mondays, mondays[1:]):
    sunday = next_monday - timedelta(days=1)
    monday_str = monday.strftime("%Y-%m-%d")
    sunday_str = sunday.strftime("%Y-%m-%d")
    print(f"Skip topics creation for {monday_str} - {sunday_str}?", check_topics_exist(monday_str, sunday_str))

Skip topics creation for 2024-07-29 - 2024-08-04? True
