<a href="https://colab.research.google.com/github/jespimentel/zapy_notebook/blob/main/zapy_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **zapy**
José Eduardo de Souza Pimentel (2022)
- - - 

### Ferramenta forense para a análise de metadados do WhatsApp

### Introdução

O WhatsApp fornece por e-mail os metadados das comunicações realizadas pelo app, em atendimento à ordem judicial de afastamento do sigilo telemático.

O programa lê os e-mails que contêm tais metadados e organiza toda a informação numa planilha Excel e em gráficos para facilitar a análise pelo investigador. 

Também consulta a api IPAPI (https://ipapi.com) para a obtenção de informações adicionais relacionadas aos IPs coletados, tais como 'hostname', 'latitude', 'longitude', 'cidade' e 'região', podendo plotá-las no mapa (Folium).

### Uso

1. Mova para os arquivos 'eml' para a pasta de trabalho ('/content/').
    > Opcionalmente, com o uso do MS Outlook, exporte um lote de emails no formato 'txt'. O programa dará preferência à leitura desse tipo de arquivo.
    > Certifique-se de que o arquivo txt gerado ou os e-mails carregados dizem respeito ao mesmo alvo.
1. Rode a aplicação.
- O programa perguntará:
    - se você deseja restringir a consulta da API; e
    - se você deseja gerar gráficos para análise dos dados coletados.

### Observações

- As sugestões para o aprimoramento do programa são muito bem-vindas.
- Encontre a atualização deste "notebook" e a versão em script em: github/jespimentel

In [46]:
# Configurações do usuário

chave = 'XXXXXXXXXXXXXXXXXXXXXXXX' # Obtenha a sua chave pessoal em ipapi.com
path = '/content/'

In [47]:
# Importação das bibliotecas necessárias

import os, re
import requests, json

import pandas as pd
import matplotlib.pyplot as plt

from pathlib import Path

from email import policy
from email.parser import BytesParser

In [48]:
# Funções

def faz_saudacao(orgao):
  """Lê arquivo .env e saúda o usuário"""
  print (f'Bem vindo ao(à) {orgao}!\n')
  print("Selecione a pasta de e-mails (arquivos 'eml') ou arquivos texto ('txt')")
  return

In [None]:
# Incício do programa

faz_saudacao('Promotoria de Justiça de Piracicaba')
caminho_para_arquivos = path
path = Path(caminho_para_arquivos)
eml_files = list(path.glob('*.eml'))
txt_files = list(path.glob('*.txt'))
texts = []

# Prioriza arquivos "txt" se existentes. Não sendo o caso, lê os arquivos "eml"

if len(txt_files) !=0:
  print(f'Foram encontrados {len(txt_files)} arquivos "txt" na pasta escolhida.')
  print('Arquivos encontrados:')
  for arq in txt_files:
    print(arq)
  print('Processando os arquivos texto...')
  for file in txt_files:
    with open (file, 'r', encoding='ISO-8859-1') as f:
      text = f.read()
    texts.append(text)

elif len(eml_files) !=0:
  print(f'Foram encontrados {len(eml_files)} e-mails na pasta escolhida.')
  print('Arquivos encontrados:')
  for arq in eml_files:
    print(arq)
  print('Processando os arquivos de e-mail...')
  for file in eml_files:
    with open (file, 'rb') as fp:
      msg = BytesParser(policy=policy.default).parse(fp)
    text = msg.get_body(preferencelist=('plain')).get_content()
    texts.append(text)

else:
  print('Não encontrei arquivos para o processamento.')
  print('Até breve!')
  os.system('pause')
  exit()

texto = ''
for text in texts:
  texto = texto + text

# Tratamento do texto

texto = texto.replace('Message Id', 'Message_Id')
texto = texto.replace('Group Id', 'Group_Id')
texto = texto.replace('Sender Ip', 'Sender_Ip')
texto = texto.replace('Sender Port', 'Sender_Port')
texto = texto.replace('Sender Device', 'Sender_Device')
texto = texto.replace('Message Style', 'Message_Style')
texto = texto.replace('Message Size', 'Message_Size')

features = ['Timestamp', 'Message_Id', 'Sender', 'Recipients', 'Group_Id', 'Sender_Ip', 'Sender_Port', \
            'Sender_Device', 'Type', 'Message_Style', 'Message_Size']

# REGEX
# Cada conjunto de parenteses () define um grupo, gerando uma lista como resultado do "findall"
# O uso do "?:" anula essa funcionalidade, fazendo com que cada item da lista corresponda a uma mensagem
# RegEx podem ser testadas em https://regex101.com/

pattern = r"\sTimestamp.+(?:\n.+)+\s(?:Message_Size\s+[0-9]+)\n" # Padrão para cada uma das mensagens
mensagens = re.findall(pattern, texto, re.MULTILINE) # Gera a lista de mensagens

# Criação da lista de dicionários (um para cada mensagem individual)

lista_msg=[]
for mensagem in mensagens:
  item = {}
  for feature in features:
    padrao = fr'({feature}\s.+)' # f-string com raw
    res = re.findall(padrao, mensagem, re.MULTILINE)
    if res !=[]:
      elementos = res[0].split(' ', maxsplit=1)
      item[elementos[0].strip()] = elementos[1].strip() 
  lista_msg.append(item)

In [50]:
# Criação do dataframe com as mensagens extraídas

df = pd.DataFrame(lista_msg, columns=features)
print(f'Foram encontradas {len(df)} mensagens.')

Foram encontradas 392 mensagens.


In [None]:
# Exiba parte do dataframe

df.head(5)

In [None]:
# Identificação do alvo

criterio = (df.Message_Style == 'individual')&(df.Sender_Ip.notnull())
alvo = df[criterio]['Sender'].value_counts() 
alvo = alvo.to_string().split()[0]
alvo

In [None]:
# Cálculos e análises

# Mensagens invidivuais enviadas por tipo

criterio = (df.Message_Style == 'individual')
qtd_msg_env = df[criterio].groupby(['Recipients', 'Type'])['Type'].count()
qtd_msg_env

In [None]:
# Mensagens individuais recebidas por tipo

criterio = (df.Message_Style == 'individual')
qtd_msg_receb = df[criterio].groupby(['Sender', 'Type'])['Type'].count()
qtd_msg_receb

In [None]:
# Mensagens individuais enviadas por tipo após o 'unstack' e totalização

qtd_msg_env_desempilhada = qtd_msg_env.unstack(fill_value=0)
qtd_msg_env_desempilhada = qtd_msg_env_desempilhada.drop(qtd_msg_env_desempilhada.columns[0], axis=1)
qtd_msg_env_desempilhada['total'] = qtd_msg_env_desempilhada.sum(axis=1)
qtd_msg_env_desempilhada = qtd_msg_env_desempilhada.sort_values(by='total', ascending=False)
qtd_msg_env_desempilhada

In [None]:
# Mensagens individuais recebidas por tipo após o 'unstack' e totalização

qtd_msg_receb_desempilhada = qtd_msg_receb.unstack(fill_value=0)
qtd_msg_receb_desempilhada = qtd_msg_receb_desempilhada.drop(qtd_msg_receb_desempilhada.columns[0], axis=1)
qtd_msg_receb_desempilhada['total'] = qtd_msg_receb_desempilhada.sum(axis=1)
qtd_msg_receb_desempilhada = qtd_msg_receb_desempilhada.sort_values(by='total', ascending=False)
qtd_msg_receb_desempilhada

In [None]:
# Quantidade de mensagens em grupo

df_part_grupos = df['Group_Id'].value_counts().to_frame()
df_part_grupos

In [None]:
# Criação de Dataframe de participação nos grupos

criterio = df['Group_Id'].notna()
recipients_grupos = df[criterio].Recipients
cels_grupos = []
for i, cel in recipients_grupos.iteritems():
  cels = cel.split(',')
  for num in cels:
    cels_grupos.append(num)
cels_grupos_dict = {}
cels_unicos_grupos = set(cels_grupos)
for n in cels_unicos_grupos:
  cels_grupos_dict[n] = cels_grupos.count(n) 

df_grupos = pd.DataFrame.from_dict(cels_grupos_dict, orient='index', columns=['Ocorrências']).sort_values(by='Ocorrências', \
  ascending=False)

df_grupos

In [59]:
# DF com Sender_IP

df_com_ips = df[df['Sender_Ip'].notna()]
ips = df_com_ips.Sender_Ip.value_counts()
ips_lista = ips.index.to_list()

print(f'Foram encontrados {len(ips_lista)} IPs diversos.')
resposta = input('Deseja restringir a consulta à API? <s/n>')
if resposta.lower() == 's':
  cond = True
  while (cond):
    num = input ('Qtde. consultas: ')
    if num.isdigit() and int(num)>0 and int(num)<=len(ips_lista):
      cond = False
      num = int(num)
      ips_lista = ips_lista[:num]

Foram encontrados 3 IPs diversos.
Deseja restringir a consulta à API? <s/n>n


In [60]:
# Consulta à API da IPAPI
# Cria a lista com as informações de IP obtidas nas requisições
# Documentação da API: https://ipapi.com/quickstart 

operadoras = []
for ip in ips_lista:
  elemento = {}
  try:
    dados = requests.get (f'http://api.ipapi.com/api/{ip}?access_key={chave}&hostname=1')
    dados_json = json.loads(dados.content)
    elemento = {'ip': dados_json['ip'], 'hostname' : dados_json['hostname'], 'latitude': dados_json['latitude'], 
              'longitude': dados_json['longitude'],'city': dados_json['city'], 'region_name': dados_json['region_name']}
    operadoras.append(elemento)
  except:
    resposta = 'API s/ resp.'
    elemento = {'ip': ip, 'hostname' : resposta, 'latitude': resposta, 'longitude': resposta,'city': resposta, 'region_name': resposta}
    operadoras.append(elemento)


In [69]:
# Criação do dataframe de operadoras com os dados obtidos da API

df_operadoras = pd.DataFrame(operadoras)
df_operadoras = df_operadoras.set_index('ip')
df_operadoras

Unnamed: 0_level_0,hostname,latitude,longitude,city,region_name
ip,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
187.183.47.232,bbb72fe8.virtua.com.br,-22.413401,-47.569569,Rio Claro,São Paulo
2804:18:18c5:4eab:964:1b81:3021:3174,2804:18:18c5:4eab:964:1b81:3021:3174,-23.547499,-46.636108,São Paulo,São Paulo
2804:18:10c1:a66a:25ff:300e:506e:caa5,2804:18:10c1:a66a:25ff:300e:506e:caa5,-23.547499,-46.636108,São Paulo,São Paulo


In [None]:
# Merge de df com df_operadoras

merged = pd.merge(df, df_operadoras, how='outer', left_on = 'Sender_Ip', right_on = 'ip')
merged = merged.drop(columns='ip', axis=1)
merged = merged.set_index('Timestamp')
merged

In [64]:
# Criação de planilha Excel para resumir o trabalho

print('Gravando a planilha...')
with pd.ExcelWriter('resumo.xlsx') as writer:
    merged.to_excel(writer, sheet_name='Geral')
    qtd_msg_env_desempilhada.to_excel(writer, sheet_name='Msg enviadas')
    qtd_msg_receb_desempilhada.to_excel(writer, sheet_name='Msg recebidas')
    df_part_grupos.to_excel(writer, sheet_name='Part. grupos')
    df_grupos.to_excel(writer, sheet_name='Msg nos grupos')
    ips.to_excel(writer, sheet_name='IPs - n. de acessos')
    df_operadoras.to_excel(writer, sheet_name='Provedores')
print('Planilha gerada.')

Gravando a planilha...
Planilha gerada.


In [None]:
# Gráficos

resp = input('Deseja gerar os gráficos?(s/n)')
if resp != 's':
  exit()
else:
  # Gráfico de pizza: Tipos de mensagens enviadas pelo alvo
  conta_msg_graf = qtd_msg_env_desempilhada.drop(columns='total')
  criterio = (conta_msg_graf.index == alvo)
  conta_msg_serie = conta_msg_graf[criterio].loc[alvo]
  conta_msg_serie.plot(kind='pie', title= f'Tipos de mensagens enviadas pelo alvo {alvo}', legend=True, figsize =(10, 10), autopct='%1.0f%%')
  path_do_arquivo = os.path.join('tipos-msg-alvo.png' )
  plt.savefig(path_do_arquivo)
  plt.show()

  # Gráfico de barras: Quantidade e tipos de mensagens individuais
  criterio = (conta_msg_graf.index != alvo)
  if len(conta_msg_graf[criterio])>30:
    conta_msg_graf[criterio].head(30).plot.bar(stacked=True, title='Top 30 - qtde e tipos de msg individuais', figsize=(16,8))
  else:
    conta_msg_graf[criterio].plot.bar(stacked=True, title='Qtde e tipos de msg individuais', figsize=(16,8))
  path_do_arquivo = os.path.join('qtd-tipo-msg-ind.png')
  plt.savefig(path_do_arquivo)
  plt.show()

  # Gráfico de barras: Participação nos grupos (quantidades de mensagens)
  if len(df['Group_Id'].value_counts()) != 0: 
    df['Group_Id'].value_counts().plot(kind = 'bar', title='Participação nos Grupos (qtde. de msg)', figsize=(16,8),legend=True)
    path_do_arquivo = ('part-grupos.png')
    plt.savefig(path_do_arquivo)
    plt.show()
  else:
    print('Sem participação em grupos')

print('Programa concluído.')

In [66]:
# Cria o mapa para a plotagem da localização das operadoras

import folium
mapa = folium.Map([ -23.5489, -46.6388 ]) # Lat, Long de São Paulo/SP

In [67]:
# Cria os marcadores

for op in operadoras:
  mapa.add_child(
      folium.Marker(
          location=[op['latitude'], op['longitude']],
          popup = f"""
                  IP : {op['ip']}
                  Hostname: {op['hostname']}\n
                  Cidade: {op['city']}\n
                  Região: {op['region_name']}\n              
                  """ 
      )
  )

In [68]:
# Exibe o mapa com os marcadores
mapa