In [1]:
my_number = "972545380874"
chat_file = "founders_chat.txt"
group_name = "GenAI Founders"

In [2]:
import uuid
import warnings
from pathlib import Path

import dotenv
import pandas as pd
from whatstk import WhatsAppChat

from utils.importing_wa import (
    merge_contact_dfs,
    filter_messages,
    match_and_rename_users,
    split_chats,
)

dotenv.load_dotenv()


def parse_chat(filename: str, group_name: str):
    wa_chat = WhatsAppChat.from_source(filepath=filename)

    with warnings.catch_warnings():
        warnings.simplefilter("ignore")

        contacts_df = merge_contact_dfs(
            pd.read_csv("whatsmeow_contacts.csv"),
            pd.read_csv("whatsmeow_contacts2.csv"),
            pd.read_csv("whatsmeow_contacts3.csv"),
            pd.read_csv("whatsmeow_contacts4.csv"),
            pd.read_csv("whatsmeow_contacts_202502090741.csv"),
        )
        wa_chat = match_and_rename_users(wa_chat, contacts_df)

    chat_df = wa_chat.df
    chat_df["username"] = chat_df["username"].apply(
        lambda n: n.replace("(", "")
        .replace(")", "")
        .replace(" ", "")
        .replace("-", "")
        .replace("‐", "")[1:]
        if n.startswith("+")
        else n
    )

    chat_df["group"] = group_name
    chat_df["id"] = [f"imported_{uuid.uuid4()}" for _ in range(len(chat_df))]
    chat_df = filter_messages(chat_df)

    # export as filename but with .csv ext
    chat_df.to_csv(Path(filename).with_suffix(".csv"))
    return chat_df

In [3]:
chat_df = parse_chat(chat_file, group_name)

In [4]:
from typing import Dict
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

from sqlmodel import select
from config import Settings
from models.group import Group
from voyageai.client_async import AsyncClient

settings = Settings()  # pyright: ignore [reportCallIssue]

engine = create_async_engine(
    settings.db_uri,
    pool_size=50,
    max_overflow=400,
    pool_timeout=90,
    pool_pre_ping=True,
    pool_recycle=600,
    future=True,
    connect_args={"timeout": 60},
)
async_session = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

embedding_client = AsyncClient(
    api_key=settings.voyage_api_key, max_retries=settings.voyage_max_retries
)


IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html



In [5]:
# Identify conversations
conversation_dfs = split_chats(chat_df, "date")
print(f"total conversations: {len(conversation_dfs)}")

total conversations: 6


In [6]:
import logging
from load_new_kbtopics import get_conversation_topics, load_topics
from models import Message
from tenacity import (
    retry,
    wait_random_exponential,
    stop_after_attempt,
    before_sleep_log,
)

logger = logging.getLogger(__name__)


@retry(
    wait=wait_random_exponential(min=10, max=90),
    stop=stop_after_attempt(6),
    before_sleep=before_sleep_log(logger, logging.DEBUG),
    reraise=True,
)
async def _process_conversation(df, group) -> Dict:
    messages = [
        Message(
            message_id=f"na-{row['date']}",
            timestamp=row["date"],
            chat_jid=group.group_jid,
            text=row["message"],
            sender_jid=row["username"],
            group_jid=group.group_jid,
        )
        for _, row in df.iterrows()
    ]
    if len(messages) == 0:
        return
    topics = await get_conversation_topics(messages, my_number)
    # print(f"different conversations: {len(topics)}; topics: {",".join([t.subject for t in topics])}")
    async with async_session() as session:
        group = await session.merge(group)
        await load_topics(
            session,
            group,
            embedding_client,
            topics,
            df["date"].min().to_pydatetime(),
        )
        await session.commit()

In [7]:
async with async_session() as session:
    res = await session.exec(
        select(Group).where(Group.group_name == chat_df.iloc[0]["group"])
    )
    group = res.first()

In [8]:
import asyncio
import json
from tqdm.asyncio import tqdm_asyncio

# Maximum number of concurrent tasks
MAX_CONCURRENT_TASKS = 30

# File to store processed conversation indices
processed_file = f"{group.group_jid}_processed.json"

# Load processed conversation indices
if Path(processed_file).exists():
    with open(processed_file, "r") as f:
        processed_indices = set(json.load(f))
else:
    processed_indices = set()

# Calculate total items
total_conversations = len(conversation_dfs)

# Semaphore to limit concurrency
semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)

# Lock for updating the processed set
processed_lock = asyncio.Lock()


# Process conversation with progress tracking
async def process_conversation(df, group, idx):
    async with semaphore:
        await _process_conversation(df, group)
        # Update the processed set
        async with processed_lock:
            processed_indices.add(idx)
            with open(processed_file, "w") as f:
                json.dump(sorted(processed_indices), f, indent=2)


# Filter out already processed conversations
tasks = [
    process_conversation(df, group, idx)
    for idx, df in enumerate(conversation_dfs)
    if idx not in processed_indices
]

# Create progress bar
with tqdm_asyncio(total=len(tasks), desc="Processing Conversations") as pbar:
    for f in asyncio.as_completed(tasks):
        await f
        pbar.update(1)



Processing Conversations: 100%|██████████| 6/6 [00:31<00:00,  5.31s/it]
