# Crawler de Contatos dos Grupos WhatsApp

Este notebook busca todos os contatos dos grupos do WhatsApp usando a Evolution API e salva em Excel e CSV.

## Estrat√©gia Otimizada (v2)

A vers√£o anterior usava `fetchAllGroups` que sincroniza com o WhatsApp e causa timeout com muitos grupos.

**Nova estrat√©gia:**
1. Usa `/chat/findChats` para obter lista de chats do **cache local** (muito mais r√°pido)
2. Filtra apenas grupos (JIDs terminando em `@g.us`)
3. Busca participantes de cada grupo individualmente com **delays anti-ban**

**Configura√ß√µes anti-ban:**
- Delay de 2-5 segundos entre requisi√ß√µes de participantes
- Timeout de 30s por grupo (fallback gracioso)

In [6]:
import os
import asyncio
import httpx
import pandas as pd
from pathlib import Path
from typing import List, Dict
from datetime import datetime

# Verificar e instalar openpyxl se necess√°rio (para salvar Excel)
try:
    import openpyxl
except ImportError:
    print("üì¶ Instalando openpyxl...")
    import subprocess
    subprocess.check_call(["pip", "install", "openpyxl"])
    import openpyxl
    print("‚úÖ openpyxl instalado")

# Configura√ß√µes da Evolution API
EVOLUTION_URL = os.getenv("EVOLUTION_API_URL", "http://localhost:8080")
EVOLUTION_API_KEY = os.getenv("EVOLUTION_API_KEY")
INSTANCE = os.getenv("EVOLUTION_INSTANCE", "Revoluna")

if not EVOLUTION_API_KEY:
    raise ValueError("EVOLUTION_API_KEY n√£o configurada. Configure no .env ou vari√°veis de ambiente.")

print(f"‚úÖ Evolution API URL: {EVOLUTION_URL}")
print(f"‚úÖ Instance: {INSTANCE}")
print(f"‚úÖ API Key: {EVOLUTION_API_KEY[:10]}..." if len(EVOLUTION_API_KEY) > 10 else "‚úÖ API Key configurada")


‚úÖ Evolution API URL: http://localhost:8080
‚úÖ Instance: Revoluna
‚úÖ API Key: c24ecc525f...


In [None]:
class EvolutionContactsCrawler:
    """Cliente para buscar contatos dos grupos via Evolution API.
    
    Estrat√©gia otimizada:
    1. Usa /chat/findChats para obter lista de chats do cache local (r√°pido)
    2. Filtra apenas grupos (@g.us)
    3. Busca participantes por grupo individualmente com delays anti-ban
    """
    
    def __init__(self):
        self.base_url = EVOLUTION_URL.rstrip("/")
        self.api_key = EVOLUTION_API_KEY
        self.instance = INSTANCE
        self.headers = {"apikey": self.api_key}
    
    async def testar_conexao(self) -> bool:
        """Testa se a conex√£o com a Evolution API est√° funcionando."""
        url = f"{self.base_url}/instance/fetchInstances"
        
        async with httpx.AsyncClient() as client:
            try:
                response = await client.get(url, headers=self.headers, timeout=10.0)
                response.raise_for_status()
                print(f"‚úÖ Conex√£o com Evolution API OK (Status: {response.status_code})")
                return True
            except httpx.ConnectError as e:
                print(f"‚ùå Erro de conex√£o: N√£o foi poss√≠vel conectar em {self.base_url}")
                print(f"   Verifique se a Evolution API est√° rodando e a URL est√° correta")
                return False
            except httpx.HTTPStatusError as e:
                print(f"‚ùå Erro HTTP {e.response.status_code}: {e.response.text[:200]}")
                return False
            except Exception as e:
                print(f"‚ùå Erro ao testar conex√£o: {type(e).__name__}: {e}")
                return False
    
    async def buscar_chats_cached(self) -> List[Dict]:
        """
        Busca todos os chats do cache local via /chat/findChats.
        Muito mais r√°pido que fetchAllGroups pois n√£o sincroniza com WhatsApp.
        """
        url = f"{self.base_url}/chat/findChats/{self.instance}"
        
        print(f"üîó URL: {url}")
        
        async with httpx.AsyncClient(timeout=60.0) as client:
            try:
                response = await client.get(url, headers=self.headers)
                response.raise_for_status()
                chats = response.json()
                
                if isinstance(chats, list):
                    print(f"‚úÖ {len(chats)} chats encontrados no cache")
                    return chats
                elif isinstance(chats, dict):
                    # Pode retornar como {"chats": [...]} ou {"data": [...]}
                    chats_list = chats.get("chats") or chats.get("data") or []
                    print(f"‚úÖ {len(chats_list)} chats encontrados no cache")
                    return chats_list
                else:
                    print(f"‚ö†Ô∏è Formato inesperado: {type(chats)}")
                    return []
                    
            except Exception as e:
                print(f"‚ö†Ô∏è Erro ao buscar chats: {type(e).__name__}: {e}")
                return []
    
    async def filtrar_grupos(self, chats: List[Dict]) -> List[Dict]:
        """
        Filtra apenas os grupos (JID termina com @g.us).
        """
        grupos = []
        for chat in chats:
            # O chat pode ter 'id' ou 'remoteJid'
            jid = chat.get("id") or chat.get("remoteJid") or ""
            if jid.endswith("@g.us"):
                grupos.append({
                    "id": jid,
                    "nome": chat.get("name") or chat.get("subject") or chat.get("pushName") or "Sem nome",
                    "dados_originais": chat
                })
        
        print(f"üìã {len(grupos)} grupos encontrados (de {len(chats)} chats)")
        return grupos
    
    async def buscar_participantes_grupo(self, grupo_jid: str) -> List[Dict]:
        """
        Busca participantes de um grupo espec√≠fico.
        Usa POST /group/participants/{instance} com groupJid no body.
        """
        url = f"{self.base_url}/group/participants/{self.instance}"
        body = {"groupJid": grupo_jid}
        
        async with httpx.AsyncClient(timeout=30.0) as client:
            try:
                # Tentar POST primeiro (mais confi√°vel)
                response = await client.post(url, headers=self.headers, json=body)
                
                if response.status_code == 404 or response.status_code == 405:
                    # Fallback para GET com query param
                    url_get = f"{self.base_url}/group/participants/{self.instance}?groupJid={grupo_jid}"
                    response = await client.get(url_get, headers=self.headers)
                
                response.raise_for_status()
                data = response.json()
                
                # A resposta pode ser lista direta ou objeto com 'participants'
                if isinstance(data, list):
                    return data
                elif isinstance(data, dict):
                    return data.get("participants") or data.get("data") or []
                return []
                
            except Exception as e:
                print(f"      ‚ö†Ô∏è Erro: {type(e).__name__}")
                return []
    
    async def listar_grupos_fallback(self) -> List[Dict]:
        """
        Fallback: tenta fetchAllGroups com timeout muito longo.
        S√≥ usar se buscar_chats_cached n√£o funcionar.
        """
        url = f"{self.base_url}/group/fetchAllGroups/{self.instance}?getParticipants=false"
        
        print(f"‚ö†Ô∏è Usando fallback fetchAllGroups (pode demorar)...")
        print(f"üîó URL: {url}")
        
        # Timeout de 10 minutos - a API pode ser muito lenta
        async with httpx.AsyncClient(timeout=600.0) as client:
            try:
                response = await client.get(url, headers=self.headers)
                response.raise_for_status()
                grupos = response.json()
                
                if isinstance(grupos, list):
                    return grupos
                elif isinstance(grupos, dict):
                    return grupos.get("groups") or grupos.get("data") or []
                return []
                
            except httpx.ReadTimeout:
                print(f"‚ùå Timeout ap√≥s 10 minutos - API muito lenta")
                print(f"üí° Verifique se h√° muitos grupos ou se a Evolution API est√° sobrecarregada")
                return []
            except Exception as e:
                print(f"‚ùå Erro: {type(e).__name__}: {e}")
                return []

# Criar inst√¢ncia do crawler
crawler = EvolutionContactsCrawler()
print("‚úÖ Crawler inicializado")

# Testar conex√£o primeiro
print("\nüîç Testando conex√£o com Evolution API...")
conexao_ok = await crawler.testar_conexao()

# Verificar inst√¢ncias dispon√≠veis
if conexao_ok:
    print("\nüîç Verificando inst√¢ncias dispon√≠veis...")
    url = f"{crawler.base_url}/instance/fetchInstances"
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(url, headers=crawler.headers, timeout=10.0)
            response.raise_for_status()
            instances = response.json()
            
            if isinstance(instances, list):
                instance_names = [inst.get('name', '') for inst in instances if isinstance(inst, dict)]
                print(f"   Inst√¢ncias dispon√≠veis: {instance_names}")
                
                if INSTANCE not in instance_names:
                    print(f"\n‚ö†Ô∏è ATEN√á√ÉO: A inst√¢ncia '{INSTANCE}' n√£o foi encontrada!")
                else:
                    instancia_info = next((inst for inst in instances if inst.get('name') == INSTANCE), None)
                    if instancia_info:
                        status = instancia_info.get('connectionStatus', 'unknown')
                        print(f"\n‚úÖ Inst√¢ncia '{INSTANCE}' encontrada!")
                        print(f"   Status da conex√£o: {status}")
                        if status != 'open':
                            print(f"   ‚ö†Ô∏è A inst√¢ncia n√£o est√° conectada (status: {status})")
        except Exception as e:
            print(f"‚ö†Ô∏è N√£o foi poss√≠vel verificar inst√¢ncias: {e}")

In [None]:
import random

# Configura√ß√µes anti-ban
DELAY_MIN = 2  # segundos entre requisi√ß√µes de participantes
DELAY_MAX = 5  # segundos entre requisi√ß√µes de participantes

async def extrair_contatos_todos_grupos() -> List[Dict]:
    """
    Extrai todos os contatos de todos os grupos.
    
    Estrat√©gia otimizada:
    1. Busca chats do cache local (r√°pido)
    2. Filtra grupos
    3. Busca participantes de cada grupo com delays anti-ban
    """
    
    # 1. Buscar chats do cache (muito mais r√°pido que fetchAllGroups)
    print("üîç Buscando chats do cache local...")
    chats = await crawler.buscar_chats_cached()
    
    # 2. Se n√£o encontrou chats, tentar fallback
    if not chats:
        print("\n‚ö†Ô∏è Cache vazio ou n√£o dispon√≠vel")
        print("üí° Tentando endpoint alternativo (findGroups)...")
        
        # Tentar endpoint de grupos direto
        grupos = await crawler.listar_grupos_fallback()
        
        if not grupos:
            print("\n‚ùå N√£o foi poss√≠vel obter lista de grupos")
            print("üí° Poss√≠veis causas:")
            print("   ‚Ä¢ A inst√¢ncia n√£o est√° conectada ao WhatsApp")
            print("   ‚Ä¢ A inst√¢ncia n√£o participa de nenhum grupo")
            print("   ‚Ä¢ A Evolution API est√° sobrecarregada")
            return []
        
        # Grupos do fallback j√° v√™m com dados b√°sicos
        grupos_filtrados = [
            {"id": g.get("id", ""), "nome": g.get("subject", "Sem nome")}
            for g in grupos
            if g.get("id", "").endswith("@g.us")
        ]
    else:
        # 3. Filtrar apenas grupos
        grupos_filtrados = await crawler.filtrar_grupos(chats)
    
    if not grupos_filtrados:
        print("\n‚ö†Ô∏è Nenhum grupo encontrado!")
        return []
    
    print(f"\n‚úÖ {len(grupos_filtrados)} grupos para processar")
    
    # Listar grupos encontrados
    print("\nüìã Grupos encontrados:")
    for i, grupo in enumerate(grupos_filtrados[:20], 1):  # Mostrar at√© 20
        print(f"  {i}. {grupo['nome'][:50]}")
    if len(grupos_filtrados) > 20:
        print(f"  ... e mais {len(grupos_filtrados) - 20} grupos")
    
    # 4. Buscar participantes de cada grupo com delays
    print("\nüîç Buscando participantes de cada grupo...")
    print(f"   ‚è±Ô∏è Delay entre requisi√ß√µes: {DELAY_MIN}-{DELAY_MAX}s (anti-ban)\n")
    
    todos_contatos = []
    grupos_com_erro = 0
    
    for idx, grupo in enumerate(grupos_filtrados, 1):
        grupo_jid = grupo["id"]
        grupo_nome = grupo["nome"]
        
        print(f"[{idx}/{len(grupos_filtrados)}] {grupo_nome[:40]}...", end=" ")
        
        participantes = await crawler.buscar_participantes_grupo(grupo_jid)
        
        if participantes:
            print(f"‚úÖ {len(participantes)} participantes")
            
            for participante in participantes:
                # Extrair informa√ß√µes do participante
                participante_id = participante.get("id", "")
                push_name = participante.get("pushName", "")
                name = participante.get("name", "")
                
                # Usar pushName se dispon√≠vel, sen√£o name
                nome_contato = push_name or name or ""
                
                # Extrair n√∫mero de telefone do JID
                telefone = ""
                if participante_id and "@s.whatsapp.net" in participante_id:
                    telefone = participante_id.split("@")[0]
                
                if telefone:  # S√≥ adicionar se tem telefone
                    todos_contatos.append({
                        "Nome": nome_contato,
                        "Numero_Telefone": telefone,
                        "Grupo_Origem": grupo_nome,
                        "Grupo_JID": grupo_jid
                    })
        else:
            print(f"‚ö†Ô∏è Sem participantes (ou erro)")
            grupos_com_erro += 1
        
        # Delay anti-ban (exceto no √∫ltimo)
        if idx < len(grupos_filtrados):
            delay = random.uniform(DELAY_MIN, DELAY_MAX)
            await asyncio.sleep(delay)
    
    # Resumo
    print(f"\n" + "="*50)
    print(f"üìä RESUMO")
    print(f"="*50)
    print(f"Grupos processados: {len(grupos_filtrados)}")
    print(f"Grupos com erro: {grupos_com_erro}")
    print(f"Total de contatos coletados: {len(todos_contatos)}")
    
    return todos_contatos

# Executar extra√ß√£o
contatos = await extrair_contatos_todos_grupos()

In [None]:
# Criar DataFrame
df = pd.DataFrame(contatos)

if df.empty:
    print("‚ö†Ô∏è DataFrame vazio - nenhum contato foi coletado")
else:
    # Remover duplicatas (mesmo n√∫mero em grupos diferentes)
    print(f"üìä Total de registros: {len(df)}")
    print(f"üìä Contatos √∫nicos (por n√∫mero): {df['Numero_Telefone'].nunique()}")

    # Mostrar estat√≠sticas
    print("\nüìà Estat√≠sticas:")
    print(f"  ‚Ä¢ Grupos processados: {df['Grupo_Origem'].nunique()}")
    print(f"  ‚Ä¢ Contatos com nome: {df[df['Nome'] != '']['Nome'].count()}")
    print(f"  ‚Ä¢ Contatos sem nome: {df[df['Nome'] == '']['Nome'].count()}")

    # Mostrar preview
    print("\nüëÄ Preview dos dados:")
    print(df.head(10))

In [None]:
# Verificar se temos contatos antes de salvar
if not contatos or len(contatos) == 0:
    print("‚ö†Ô∏è Nenhum contato para salvar")
else:
    # Criar diret√≥rio de sa√≠da se n√£o existir
    output_dir = Path("../../data/contatos_grupos")
    output_dir.mkdir(parents=True, exist_ok=True)

    # Gerar nome do arquivo com timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    csv_file = output_dir / f"contatos_grupos_{timestamp}.csv"
    excel_file = output_dir / f"contatos_grupos_{timestamp}.xlsx"

    # Salvar CSV
    df.to_csv(csv_file, index=False, encoding='utf-8-sig')
    print(f"‚úÖ CSV salvo: {csv_file}")

    # Salvar Excel
    df.to_excel(excel_file, index=False, engine='openpyxl')
    print(f"‚úÖ Excel salvo: {excel_file}")

    print(f"\nüìÅ Arquivos salvos em: {output_dir.absolute()}")
    print(f"   ‚Ä¢ {csv_file.name}")
    print(f"   ‚Ä¢ {excel_file.name}")

In [None]:
# An√°lise adicional (apenas se temos dados)
if not df.empty:
    print("üìä Contatos por grupo:")
    contatos_por_grupo = df.groupby('Grupo_Origem').size().sort_values(ascending=False)
    print(contatos_por_grupo.head(20))
    if len(contatos_por_grupo) > 20:
        print(f"... e mais {len(contatos_por_grupo) - 20} grupos")

    # Contatos que aparecem em m√∫ltiplos grupos
    print("\nüë• Contatos em m√∫ltiplos grupos:")
    contatos_multiplos = df.groupby('Numero_Telefone').agg({
        'Nome': 'first',
        'Grupo_Origem': lambda x: ', '.join(x.unique()),
        'Numero_Telefone': 'count'
    }).rename(columns={'Numero_Telefone': 'Qtd_Grupos'})
    contatos_multiplos = contatos_multiplos[contatos_multiplos['Qtd_Grupos'] > 1].sort_values('Qtd_Grupos', ascending=False)
    print(f"Total: {len(contatos_multiplos)} contatos aparecem em mais de um grupo")
    if not contatos_multiplos.empty:
        print(contatos_multiplos.head(20))
else:
    print("‚ö†Ô∏è Sem dados para an√°lise")