In [None]:
from src import WhatsappClient

In [None]:
client = WhatsappClient(DEBUG=True)
await client.initialize_playwright()

In [None]:
await client.login()

In [None]:
await client.find_user("Prathwik")
# await client.extract_chat_details_from_side_pane()
# await client.search_pane_scroll_down()

In [None]:
await client.fetch_latest_message()

In [None]:
await client.on_new_message(lambda x: print(x))

In [None]:
# await client.search_pane_scroll_down()
await client.chat_pane_scroll_up()

In [None]:
await client.open_chat_panel("Prathwik")

In [None]:
from tabulate import tabulate

messages = await client.extract_messages()

table_data = [
    [msg["sender"], msg["time"], msg["message"], str(msg["attachment"])]
    for msg in messages
]
headers = ["Sender", "Time Sent", "Message", "Attachment Details"]

print("Number of previous messages: ", len(messages))
print(tabulate(table_data, headers=headers, tablefmt="grid"))

In [None]:
## LEAVE THIS AS IT IS GOOD FOR REFACTORING STUFF
import asyncio
from playwright.async_api import async_playwright
import os

BASE_URL = "https://web.whatsapp.com"


async def initialize_playwright(browser_instance, user_data_dir, headless):
    # TODO: Perform browser level optimizations and other stuff
    playwright = await async_playwright().start()

    browser = await playwright[browser_instance].launch_persistent_context(
        user_data_dir, headless=headless
    )

    page_instance = await browser.new_page()

    # await page_instance.set_viewport_size({"width": 1920, "height": 1080})
    return playwright, browser, page_instance


async def login(page, user_data_dir):
    # TODO: QR code & phone number login via script

    await page.goto(BASE_URL)
    await page.bring_to_front()

    print("Waiting for WhatsApp chats to load...")
    await page.wait_for_selector(
        '//*[@id="pane-side"]/div[2]/div/div/child::div', timeout=600000
    )
    print("WhatsApp chats loaded.")

In [None]:
playwright, browser, page = await initialize_playwright("chromium", "user_data", False)
await login(page, "user_data")

In [None]:
async def clear_text():
    await page.keyboard.press("Control+A")
    await page.keyboard.press("Backspace")


def get_search_box_locator():
    """Returns the locator for the search box on the side pane."""
    return page.locator(
        "#side div[contenteditable='true'][role='textbox'][data-lexical-editor='true']"
    )


async def find_user(username: str) -> str | bool:
    """
    Returns str | bool: chat_name if the chat is found, False otherwise.

    TODO:
        - find the user without opening the chat panel
        - right now this function is same as open_chat_panel
    """
    try:
        search_box = get_search_box_locator()

        await search_box.click()
        await clear_text()
        await search_box.type(username)

        await asyncio.sleep(0.5)

        chat_list_div = await page.query_selector('div[aria-label="Search results."]')
        children = await chat_list_div.query_selector_all('div[role="listitem"]')
        chat_name = None

        for chat in children:
            name_element = await chat.query_selector('span[dir="auto"]')
            name = await name_element.inner_text() if name_element else "Unknown"
            # print(name)

            translate_y = await chat.evaluate(
                "element => window.getComputedStyle(element).transform"
            )

            if translate_y:
                if not "matrix" in translate_y:
                    continue
                parts = translate_y.replace("matrix(", "").replace(")", "").split(", ")

                if len(parts) == 6:
                    translate_y = float(parts[5])
                    if translate_y != float(72):
                        continue
                    chat_name = name

        if chat_name and chat_name.upper().startswith(username.upper()):
            print(f'Username with prefix "{username}" found as "{chat_name}"')
            return chat_name
        else:
            print(f'Username with prefix "{username}" not found')
            return False
    except TimeoutError:
        print(f'It was not possible to fetch chat "{username}"')
        return False

In [None]:
await find_user("Prathwik")
## ADD TESTING STUFF HERE

In [None]:
async def extract_chat_details_from_side_pane(self):
    """
    Get the list of messages in the side pane (name, recent message, time, unread messages).
    """
    chat_list_div = await self.page.query_selector('div[aria-label="Chat list"]')
    children = await chat_list_div.query_selector_all('div[role="listitem"]')

    try:
        logger.info("Chat list div found.")
        all_chats = []
        for chat in children:
            name_element = await chat.query_selector('span[dir="auto"]')
            name = await name_element.inner_text() if name_element else "Unknown"

            # Sole purpose of sorting the chats based on latest message
            translate_y = await chat.evaluate(
                "element => window.getComputedStyle(element).transform"
            )

            if translate_y:
                if "matrix" in translate_y:
                    parts = (
                        translate_y.replace("matrix(", "").replace(")", "").split(", ")
                    )
                    if len(parts) == 6:
                        translate_y = float(parts[5])

                elif "translateY(0px)" in translate_y:
                    logger.info("Div with translateY(0px) found.")
                    translate_y = 0

            recent_message_element = await chat.query_selector(
                'div[class="_ak8k"]>span>span'
            )
            recent_message = (
                await recent_message_element.inner_text()
                if recent_message_element
                else "No recent message"
            )

            time_element = await chat.query_selector('span[dir="auto"]')
            time = (
                await time_element.inner_text() if time_element else "No time available"
            )

            unread_messages_element = await chat.query_selector(
                'span[aria-label*="unread"]'
            )
            unread_messages = (
                await unread_messages_element.inner_text()
                if unread_messages_element
                else "0"
            )

            all_chats.append(
                {
                    "name": name,
                    "recent_message": recent_message,
                    "time": time,
                    "unread_messages": unread_messages,
                    "translate_y": translate_y,
                }
            )

            # This sort function is trash, Ill leave to the future me to fix it
            all_chats.sort(key=lambda chat: chat["translate_y"])

            # logger.info(
            #     f"Chat: {name}, Recent message: {recent_message}, Time: {time}, Unread messages: {unread_messages}"
            # )

    except Exception as e:
        logger.exception(f"Error extracting chat details from side pane: {e}")

    # TODO: Additional overhead to remove the translate_y key, Ill leave it as it is for now
    # for chat in all_chats:
    #     chat.pop("translate_y", None)

    return all_chats

In [None]:
## ADD TESTING STUFF HERE
async def trigger_on_notification():
    await page.evaluate(
        """
        const originalNotification = window.Notification;
        window.Notification = function(title, options) {
            console.log("New message received:", title, options);
            return new originalNotification(title, options);
        };
        """
    )

    def handle_console(msg):
        if "New message received:" in msg.text:
            print(f"Notification received: {msg.text}")
            # Add your notification handling logic here

    page.on("console", handle_console)

    print("Listening for notifications...")

    # Keep the function running
    while True:
        await asyncio.sleep(1)

In [None]:
await trigger_on_notification()