In [3]:
from selenium.webdriver.common.keys import Keys 
from selenium.webdriver.common.by import By
from selenium.webdriver import ActionChains
from time import sleep
from datetime import datetime
import pytz
import json

In [None]:
import os
import urllib.request
from pathlib import Path


def setup_tesseract_local():
    """Configure Tesseract to use local language files in the venv"""
    # Define paths
    venv_path = Path(os.environ.get('VIRTUAL_ENV', '.venv'))
    tessdata_path = venv_path / 'tessdata'

    # Create tessdata directory if it doesn't exist
    tessdata_path.mkdir(exist_ok=True)

    # Download Portuguese language data if needed
    por_traineddata = tessdata_path / 'por.traineddata'
    if not por_traineddata.exists():
        print("Downloading Portuguese language data...")
        url = "https://github.com/tesseract-ocr/tessdata/raw/main/por.traineddata"
        urllib.request.urlretrieve(url, por_traineddata)

    # Set environment variable to use local tessdata
    os.environ['TESSDATA_PREFIX'] = str(tessdata_path)

    print(f"Tesseract data directory set to: {tessdata_path}")
    return tessdata_path


# Chamar a função no início do seu script
setup_tesseract_local()

In [None]:
from driver import Bot
from colorama import Fore, Style
from selenium.webdriver.common.keys import Keys 

# Initialize the bot
bot = Bot()

# Test login and chat counting functionality
print(Fore.GREEN + "Starting WhatsApp Web test..." + Style.RESET_ALL)
# bot.login_and_count_chats()

# Keep the session open for manual testing
# When done testing, you can close the driver with:
# bot.quit_driver()

In [None]:
bot.login()

In [13]:
import re
def is_receipt_by_keywords(text):
    keywords = [
        'comprovante',
        'pagamento',
        'transferência',
        'pix',
        'valor',
        'data da transação',
        'beneficiário',
        'ted',
        'doc',
        'recibo',
        'autenticação',
        'instituição'
    ]
    
    text_lower = text.lower()
    matches = sum(1 for keyword in keywords if keyword in text_lower)
    currency_pattern = r'R?\$?\s*\d+[,.]\d{2}'
    has_currency = bool(re.search(currency_pattern, text))
    is_receipt = has_currency and matches >= 1
    return is_receipt

In [8]:
from PyPDF2 import PdfReader
import requests
import tempfile
import os
import random
import shutil

def process_pdf_content(download_url):
    try:
        print(f"Downloading PDF from {download_url}")
        # Download PDF
        response = requests.get(download_url)

        # Create temporary file
        with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp_file:
            tmp_file.write(response.content)
            tmp_path = tmp_file.name

        # Read PDF content
        reader = PdfReader(tmp_path)
        text_content = ""

        # Extract text from all pages
        for page in reader.pages:
            text_content += page.extract_text()

        # Clean up temporary file
        os.unlink(tmp_path)

        return text_content

    except Exception as e:
        print(f"Erro ao processar conteúdo do PDF: {e}")
        return None
    

def process_pdf_attachment(pdf_element):
    try:
        # Clicar para iniciar o download
        pdf_element.click()
        sleep(2)  # Esperar o download começar

        # Esperar arquivo aparecer no diretório temporário
        max_wait = 30  # segundos
        start_time = datetime.now()  # Fixed: Using datetime.now() instead of datetime.time()
        downloaded_file = None
        print(f"Waiting for PDF to download in {bot.TEMP_DIR}")
        while (datetime.now() - start_time).total_seconds() < max_wait:
            files = os.listdir(bot.TEMP_DIR)
            pdf_files = [f for f in files if f.endswith(
                '.pdf') and not f.endswith('.crdownload')]
            if pdf_files:
                downloaded_file = os.path.join(bot.TEMP_DIR, pdf_files[0])
                break
            sleep(1)

        if downloaded_file:
            # Read PDF content
            reader = PdfReader(downloaded_file)
            pdf_content = ""

            # Extract text from all pages
            for page in reader.pages:
                pdf_content += page.extract_text()

            # Limpar o arquivo
            os.remove(downloaded_file)

            return pdf_content
        else:
            print("Timeout: PDF não foi baixado no tempo esperado")
            return None

    except Exception as e:
        print(f"Erro ao processar PDF: {e}")
        return None

In [14]:
from PIL import Image
import io
import pytesseract  # for OCR
import re

def process_image_attachment(image_element):
    try:
        # Primeiro procura o container pai que tem o botão de download e informações
        image_container = image_element.find_element(
            By.XPATH, "./ancestor::div[@role='button']")

        # Pegar o tamanho da imagem (está em um botão com o ícone de download)
        try:
            size_button = image_container.find_elements(
                By.CSS_SELECTOR, "button[class*='x6s0dn4'] span:last-child")
            if len(size_button) > 0:
                file_size = size_button[0].text
            else:
                file_size = None
        except Exception as e:
            print(f"Não foi possível encontrar o tamanho da imagem: {e}")
            file_size = None

        try:
            img_elements = image_container.find_elements(
                By.CSS_SELECTOR, "img[src^='blob:'], img[src^='http']")
            
            def click_and_process_image():
                try:
                    img_elements = image_container.find_elements(
                        By.CSS_SELECTOR, "img[src^='blob:'], img[src^='http']")
                    print("Tentando screenshot")
                    
                    if len(img_elements) < 1:
                        print("Não há imagens para processar")
                        return {
                            "type": "image",
                            "size": None,
                            "file_path": None,
                            "content": None,
                            "is_receipt": False,
                            "error": "Download failed"
                        }
                    
                    img_elements[0].click()
                    sleep(2)  # Espera o modal abrir
                    
                    # Procura a imagem expandida usando a classe overlay
                    modal_img = bot.driver.find_element(
                        By.CSS_SELECTOR, 
                        "div.overlay img[src^='blob:']"  # usando a classe overlay
                    )
                    
                    # Pega localização e tamanho da imagem expandida
                    location = modal_img.location
                    size = modal_img.size
                    
                    # Capturar screenshot completo
                    screenshot = bot.driver.get_screenshot_as_png()
                                        
                    
                    # Criar imagem a partir do screenshot
                    image = Image.open(io.BytesIO(screenshot))
                    
                    # Cortar região da imagem
                    left = location['x']
                    top = location['y']
                    right = location['x'] + size['width']
                    bottom = location['y'] + size['height']
                    
                    # Considerar o DPR (Device Pixel Ratio) para telas de alta resolução
                    dpr = bot.driver.execute_script('return window.devicePixelRatio')
                    image = image.crop((left * dpr, top * dpr, right * dpr, bottom * dpr))
                    
                    # Extract text from image using OCR
                    text = pytesseract.image_to_string(image, lang='por')
                    
                    is_receipt = is_receipt_by_keywords(text)
                    
                    # Salvar em arquivo temporário
                    file_path = os.path.join(bot.TEMP_DIR, f"image_{datetime.now().timestamp()}.png")
                    image.save(file_path)
                    print(f"Image saved to {file_path}")
                    
                    # Fechar o modal
                    ActionChains(bot.driver).send_keys(Keys.ESCAPE).perform()
                    sleep(1)
                    
                    return {
                        "type": "image",
                        "size": file_size,
                        "file_path": file_path,
                        "content": text,
                        "is_receipt": is_receipt
                    }
                except Exception as e:
                    print(f"Error processing image: {e}")
                    try:
                        ActionChains(bot.driver).send_keys(
                            Keys.ESCAPE).perform()
                    except:
                        pass    
                    return None, None
            
            if len(img_elements) > 0:
                return click_and_process_image()
            else:
                # Check if download button is visible first
                download_button = image_container.find_elements(
                    By.CSS_SELECTOR, "button span[data-icon='media-download']")
                
                if len(download_button) > 0:
                    print("Trying click download method...")
                    image_container.click()
                    sleep(2)

                    return click_and_process_image()
                else:
                    print("No download button found - skipping click method")
                    return None, file_size

        except Exception as e:
            print(f"Error getting image source: {e}")
            return None, file_size

    except Exception as e:
        print(f"Erro ao processar imagem: {e}")
        return None, None

In [16]:
def decode_latin(text):
    try:
        # Primeira tentativa: decodificar direto como utf-8
        return text.encode().decode('utf-8')
    except Exception as e1:
        try:
            # Segunda tentativa: decodificar caracteres unicode
            return bytes(text, 'utf-8').decode('unicode_escape')
        except Exception as e2:
            try:
                # Terceira tentativa: método original modificado
                return text.encode('raw_unicode_escape').decode('utf-8')
            except Exception as e3:
                print(f"Aviso: Não foi possível decodificar texto: '{text}'")
                print(f"Erros: {e1}, {e2}, {e3}")
                return text
            
def get_all_message_info(messages_elements):
    messages = []
    for message in messages_elements:
        try:
            copyable_text = message.find_elements(
                By.CSS_SELECTOR, ".copyable-text")
            time, date, sender, utc_dt = None, None, None, None

            # Verificar se existe mensagem citada
            quoted_message = None
            quoted_elements = message.find_elements(
                By.CSS_SELECTOR, "div[role='button'][aria-label='Quoted message']")
            if len(quoted_elements) > 0:
                try:
                    quoted_sender = quoted_elements[0].find_elements(
                        By.CSS_SELECTOR, "span[dir='auto']._ao3e")
                    quoted_text = quoted_elements[0].find_elements(
                        By.CSS_SELECTOR, "span.quoted-mention._ao3e")
                    if len(quoted_text) > 0:
                        quoted_message = {
                            "sender": quoted_sender[0].text if len(quoted_sender) > 0 else '',
                            "text": quoted_text[0].text
                        }
                except Exception as e:
                    print(f"Erro ao processar mensagem citada: {e}")

            text = ''
            if len(copyable_text) > 0:
                main_text_element = message.find_elements(
                    By.CSS_SELECTOR, "span.selectable-text.copyable-text")
                if len(main_text_element) > 0:  # Se encontrou algum elemento
                    try:
                        # Pega o texto do primeiro elemento
                        text = decode_latin(main_text_element[0].text.strip())
                    except Exception as e:
                        print(f"Erro ao processar mensagem main: {e}")

                date_text = copyable_text[0].get_attribute(
                    "data-pre-plain-text")
                if date_text:
                    # Extrai a data do formato '[HH:MM, DD/MM/YYYY] Nome: '
                    date_parts = date_text.split('] ')[
                        0].replace('[', '').split(', ')
                    time = date_parts[0]
                    date = date_parts[1]
                    sender = date_text.split('] ')[1].replace(': ', '')

                    # Convertendo para UTC
                    # Assumindo fuso horário de São Paulo
                    local_tz = pytz.timezone('America/Sao_Paulo')
                    datetime_str = f"{date} {time}"
                    local_dt = datetime.strptime(
                        datetime_str, "%d/%m/%Y %H:%M")
                    local_dt = local_tz.localize(local_dt)
                    utc_dt = local_dt.astimezone(pytz.UTC)

            attachment_data = None
            # TODO: Check date and time of the message when is attachment
            # Check for images
            image_elements = message.find_elements(
                By.CSS_SELECTOR, "img[data-testid='image-thumb'], img[class*='x15kfjtz']")
            if len(image_elements) > 0:
                attachment_data = process_image_attachment(
                    image_elements[0])

            pdf_elements = message.find_elements(
                By.CSS_SELECTOR, "div[role='button'][title^='Download']")
            if pdf_elements:
                try:
                    # Get file name
                    file_name = pdf_elements[0].find_element(
                        By.CSS_SELECTOR, "span.x13faqbe._ao3e").text

                    # Get file size
                    file_size = pdf_elements[0].find_element(
                        By.CSS_SELECTOR, "span[title$='kB']").text

                    # Process PDF and get content
                    # pdf_content = process_pdf_attachment(pdf_elements[0])

                    attachment_data = {
                        "type": "document",
                        "name": file_name,
                        "size": file_size,
                        "file_type": "PDF" if file_name.lower().endswith('.pdf') else "unknown",
                        "content": None
                        # "content": pdf_content
                    }

                except Exception as e:
                    print(f"Erro ao processar detalhes do PDF: {e}")
                    attachment_data = {
                        "type": "document",
                        "name": file_name if 'file_name' in locals() else "Unknown"
                    }

            # Check for audio messages
            audio_elements = message.find_elements(
                By.CSS_SELECTOR, "audio[data-testid='audio-player']")
            if audio_elements:
                # Try alternative selector for audio download button
                # audio_elements = message.find_elements(
                #     By.CSS_SELECTOR, "button[aria-label='Download voice message']")
            
                try:
                    # Click download button if present
                    download_button = message.find_element(
                        By.CSS_SELECTOR, "button[aria-label='Download voice message']")
                    download_button.click()
                    sleep(2)  # Wait for download to start

                    # Get audio duration
                    duration = message.find_element(
                        By.CSS_SELECTOR, "div._ak8w").text

                    # Wait for download to complete in temp directory
                    max_wait = 30
                    start_time = datetime.now()
                    downloaded_file = None
                    
                    while (datetime.now() - start_time).total_seconds() < max_wait:
                        files = os.listdir(bot.TEMP_DIR)
                        audio_files = [f for f in files if f.endswith(('.mp3', '.ogg', '.m4a'))]
                        if audio_files:
                            downloaded_file = os.path.join(bot.TEMP_DIR, audio_files[0])
                            break
                        sleep(1)
                        
                    if downloaded_file:
                        # TODO: Add audio transcription here using your preferred service
                        # Example: transcribed_text = transcribe_audio(downloaded_file)

                        attachment_data = {
                            "type": "audio",
                            "duration": duration,
                            "file_path": downloaded_file,
                            # "transcription": transcribed_text  # Add this when transcription is implemented
                        }
                    else:
                        print("Timeout: Audio file not downloaded")
                        attachment_data = {
                            "type": "audio",
                            "duration": duration,
                            "error": "Download failed"
                        }

                except Exception as e:
                    print(f"Error processing audio message: {e}")
                    attachment_data = {
                        "type": "audio",
                        "error": str(e)
                    }

            message_data = {
                "text": text,
                "time": time,
                "date": date,
                "sender": sender,
                "quoted_message": quoted_message,
                "attachment_data": attachment_data,
                # TODO modificar se salvar em db
                "timestamp_utc": utc_dt.isoformat() if utc_dt else None,
            }
            print(message_data)

            messages.append(message_data)
        except Exception as e:
            print(f"Erro ao processar uma mensagem: {e}")
    return messages

# PEGAR TEXTO DA MENSAGEM
def get_all_messages():
    # Localize o contêiner da conversa
    conversation_container = bot.driver.find_element(
        By.XPATH, '//*[@id="main"]/div[3]/div/div[2]')
    
    
    previous_height = 0
    previous_message_count = 0
    messages_elements = []
    
    # Extraxt all messages
    while True:
        # Extraia mensagens atuais
        messages_elements = conversation_container.find_elements(
            By.CSS_SELECTOR, ".message-in, .message-out")
        
        # Se não houver novas mensagens após o scroll, sair do loop
        if len(messages_elements) == previous_message_count:
            break

        previous_message_count = len(messages_elements)

        
        # TODO VER FRASE EM PORTUGUES
        # TODO Pode ocorrer o problema "Couldn't get older messages. Open WhatsApp on your phone and click here to try again.""
        older_messages_button = bot.driver.find_elements(
                By.XPATH, "//button[.//div[contains(text(), 'Click here to get older messages from your phone.')]]")
        if len(older_messages_button) > 0:
            older_messages_button[0].click()
            sleep(10)
             
        print('Attempting to scroll')
        # bot.driver.execute_script(
        #     "arguments[0].scrollTop = arguments[0].scrollHeight;", conversation_container)
        # ou
        bot.driver.execute_script(
            "arguments[0].scrollTop = 0;", conversation_container)
        sleep(2)
       
    messages = get_all_message_info(messages_elements)
    print(messages)
    return list(messages)


# Coleta de mensagens
# all_messages = get_all_messages(conversation_container)

# Exibir mensagens coletadas
# for msg in all_messages:
#     print(msg)

In [17]:
def scroll_chats_list():
    chat_list_element = bot.driver.find_element(By.ID, "pane-side")
    # current_scroll = bot.driver.execute_script(
    #     "return arguments[0].scrollTop", chat_list_element)
    bot.driver.execute_script(
        "arguments[0].scrollTop = -arguments[0].scrollTop + 1000", chat_list_element)
    sleep(2)


In [None]:
#bot.click_first_chat_and_scroll()
# GET ALL CONTACTS NAMES FROM CHATS LIST
# Localizar a lista de chats
all_chats = set()  # Usar set para evitar duplicatas
last_seen_name = None  # Nome do último item visível na lista
# chat_list_element = bot.driver.find_element(By.ID, "pane-side")

while True:
    # Encontrar os chats visíveis
    chat_items = bot.driver.find_elements(By.XPATH, "//div[@aria-label='Chat list']//div[@role='listitem']")
    new_last_seen_name = None
    
    # se todos os chats já foram coletados, sair do loop, checa todos os no
    already_collected_chats = []
    # para teste somente 1 chat
    chat_items = [chat_items[0]]
    for chat in chat_items:
        try:
            # Captura o nome do chat e outras informações
            name = chat.find_element(By.XPATH, ".//span[@dir='auto']").text
            
            # Adicionar informações únicas ao conjunto
            if name not in all_chats:
                all_chats.add(f"{name}")
                chat.click()
                sleep(2)
                # get all messages and save in a file
                messages = get_all_messages()
                with open(f"{name}.json", "w", encoding='utf-8') as f:
                    json.dump(messages, f, ensure_ascii=False, indent=2)
                chat.click()
                sleep(2)
            else:
                already_collected_chats.append(name) 
            
            # Atualiza o último nome visto no final da lista
            # if i == len(chat_items) - 1:
            #     new_last_seen_name = name
            #     chat.click()
            #     sleep(2)
        except Exception as e:
            print(f"Erro ao processar um chat: {e}")
    break
    if len(already_collected_chats) == len(chat_items):
        break
    
    scroll_chats_list()
    # Fazer scroll para o último item
    # if chat_items:
    #     current_scroll = bot.driver.execute_script(
    #         "return arguments[0].scrollTop", chat_list_element)
    #     bot.driver.execute_script(
    #         "arguments[0].scrollTop = arguments[0].scrollTop + 1000", chat_list_element)
    #     sleep(2)  # Esperar o carregamento de novos chats

    # Condição de parada: o último nome não mudou
    # if new_last_seen_name == last_seen_name:
    #     break
    # last_seen_name = new_last_seen_name

print(all_chats)
print(len(all_chats))

In [5]:
# SEARCH THE CONTACT
message_input = bot.driver.find_element(
    By.XPATH, "//div[contains(@class, 'x1hx0egp')][@role='textbox']")
message_input.click()
first_contact = next(iter(all_chats))
message_input.send_keys(first_contact)
message_input.send_keys(Keys.ENTER)
sleep(2)


In [None]:
# FIND CHAT ELEMENTS
chat_list_div = bot.driver.find_element(By.CLASS_NAME, "x1y332i5")
chat_elements = chat_list_div.find_elements(By.CLASS_NAME, "_ak72")
len(chat_elements)

In [8]:
# CHAT CLICK
chat_elements[0].click()

In [9]:
# SMALL SCROLL ON CHATS LIST
chat_list_element = bot.driver.find_element(By.ID, "pane-side")
current_scroll = bot.driver.execute_script("return arguments[0].scrollTop", chat_list_element)
bot.driver.execute_script("arguments[0].scrollTop = arguments[0].scrollTop + 300", chat_list_element)