In [25]:
import asyncio
from playwright.async_api import async_playwright
import qrcode
from PIL import Image
import io
import base64
from pyzbar import pyzbar
import os

In [26]:
playwright = await async_playwright().start()
browser = await playwright.chromium.launch(headless=False)
context = await browser.new_context()
page = await context.new_page()

In [27]:
await page.goto("https://web.whatsapp.com")

<Response url='https://web.whatsapp.com/' request=<Request url='https://web.whatsapp.com/' method='GET'>>

In [28]:
async def extract_message_details(message):
    name_element = await message.query_selector("div._aou8._aj_h > span")
    name = await name_element.inner_text() if name_element else "N/A"

    time_element = await message.query_selector("div._ak8i")
    time = await time_element.inner_text() if time_element else "N/A"

    message_element = await message.query_selector("div._ak8k > span > span")
    message = await message_element.inner_text() if message_element else "N/A"

    return {"name": name, "time": time, "message": message}

In [30]:
async def fetch_latest_message():
    selector = 'div[role="listitem"]'
    # or selector = '//*[@id="pane-side"]/div[2]/div/div/child::div'

    await page.wait_for_selector(selector, timeout=1000)
    list_items = await page.query_selector_all(selector)

    for item in list_items:
        transform = await item.evaluate(
            "element => window.getComputedStyle(element).transform"
        )

        if transform:
            if "matrix" in transform:
                parts = transform.replace("matrix(", "").replace(")", "").split(", ")
                if len(parts) == 6:
                    translate_y = float(parts[5])
                    if translate_y == 0:
                        return await extract_message_details(item)
            elif "translateY(0px)" in transform:
                print("Div with role='listitem' and translateY=72px found.")
                return await extract_message_details(item)

    print("Desired div not found.")
    return None


await fetch_latest_message()

{'name': 'Kausthubh', 'time': '12:19 PM', 'message': "heyy! what's up?"}

In [31]:
sent_messages = []


async def send_message(name: str, message: str):
    search_input_selector = (
        "div[contenteditable='true'][role='textbox'][data-lexical-editor='true']"
    )
    await page.wait_for_selector(search_input_selector, timeout=60000)
    search_input = await page.query_selector(search_input_selector)

    if search_input:
        await search_input.click()

        await page.keyboard.down("Control")
        await page.keyboard.press("A")
        await page.keyboard.up("Control")
        await page.keyboard.press("Backspace")

        await search_input.type(name, delay=20)
        await page.keyboard.press("Enter")

        await page.keyboard.down("Control")
        await page.keyboard.press("A")
        await page.keyboard.up("Control")
        await page.keyboard.press("Backspace")

        await page.keyboard.type(message, delay=20)
        await page.keyboard.press("Enter")
        await page.wait_for_timeout(100)
        await page.keyboard.press("Enter")

        await page.wait_for_timeout(200)

        if len(sent_messages) >= 20:
            sent_messages.pop(0)
        sent_messages.append(message)
    else:
        print("Search input not found.")

In [32]:
async def watch_new_messages():
    """
    Continuously watches for new messages and yields them as they arrive,
    excluding messages sent by this script.
    """
    new_message = await fetch_latest_message()
    assert new_message is not None

    name = new_message.get("name", None)
    message = new_message.get("message", None)
    time = new_message.get("time", None)

    await asyncio.sleep(1)

    while True:
        try:
            new_message = await fetch_latest_message()
            if new_message:
                # Check if the message has been sent by this script
                if not (
                    (
                        new_message.get("message") in sent_messages
                        or new_message.get("message") == "N/A"
                    )
                    and new_message.get("name", None) == name
                    # and new_message.get("time", None) == time
                ):
                    name = new_message.get("name", None)
                    message = new_message.get("message", None)
                    time = new_message.get("time", None)
                    yield new_message
        except Exception as e:
            print(f"Error while fetching latest message: {e}")
        await asyncio.sleep(1)

In [35]:
from gradio_client import Client

client = Client("http://127.0.0.1:7860/")

Loaded as API: http://127.0.0.1:7860/ ✔


In [37]:
async for message in watch_new_messages():
    print("New message received:")
    print(message)
    result = client.predict(
        user_input=str(message["message"]),
        recipient=message["name"],
        chat_history="",
        timestamp=message["time"],
        api_name="/predict",
    )
    await send_message(message["name"], result)

New message received:
{'name': 'Kausthubh', 'time': '12:19 PM', 'message': "heyy! what's up?"}
New message received:
{'name': 'Kausthubh', 'time': '12:27 PM', 'message': 'i like to eat food'}
New message received:
{'name': 'Kausthubh', 'time': '12:30 PM', 'message': 'what is the purpose of life'}
