# Projeto de Pipeline de Dados do Telegram

Este projeto visa relacionar o mundo de chatbots em aplicativos como o Telegram com computação em nuvem. No caso, é realizada uma **etapa transacional** para capturar dados da API do Telegram de bots pelo AWS API Gateway, e a partir disto uma **etapa analítica** realizada em nuvem na AWS com três etapas: ingestão, ETL e apresentação. Por fim, no AWS Athena são feitas consultas para estimar quantidade de mensagens por dia no grupo do Telegram, quantidade de mensagens por hora e dia da semana, e média de tamanho das mensagens.

# 1. Contexto

Cada vez mais aplicativos e sites costumam contar com a presença de *chatbots*. Exemplos são Telegram, WhatsApp, Facebook Messenger, Discord e Microsoft Teams, para nomear alguns. Suas funções são amplas, como atendimento ao cliente, ajuda em compras, agendamentos, entretenimento. 

<img src='https://github.com/mateus-miguel/projeto-pipeline-telegram/blob/main/imagens/chatbot_img.jpg?raw=true'/>

Nesse sentido, torna-se útil armazenar mensagens desses chats em base de dados para serem analisadas. Por exemplo, analisar quais mensagens ou dúvidas são mais frequentes, quais respostas são mais esperadas, qual a média de tamanho das mensagens, quais usuários costumam mandar mais mensagens, em quais grupos, e assim por diante.

Em geral, as mensagens são fornecidas por API's para uso externo, e envolvem formatos de dados semi-estruturados como JSON. Para a seleção da parte útil desses dados para consultas de forma estruturada, é preciso realizar um *data wrangling* para colocá-los em formato tabular, além de outros processos para reduzir o custo de escaneamento. Isto pode ser feito por uma arquitetura em nuvem, como na AWS (Amazon Web Services), que permite realizar consultas por SQL ao fim do processo.

# 2. Arquitetura

O projeto se divide em duas partes como citado. A primeira é a **parte transacional**, que lida com o grupo do Telegram e a API de bots, já a segunda é a **parte analítica**, que contém as etapas de ingestão, ETL e apresentação.

<img src='https://raw.githubusercontent.com/mateus-miguel/projeto-pipeline-telegram/main/imagens/aws_telegram_pipeline_v2.png' width='700'/>

A primeira etapa consiste na criação do *bot* do Telegram e sua adição no grupo específico. Nesta etapa, o *bot* vai coletar todas as mensagens mandadas pelos usuários ou si mesmo para a Telegram Bot API, que pode ser 

Na segunda etapa começamos pela **ingestão**, onde é criado um *webhook* pelo AWS API Gateway de forma a conectar com a Telegram Bot API. Desta forma, toda nova mensagem mandada no grupo é automaticamente mandada para o ambiente em nuvem da AWS. Então as mensagens passam por uma função AWS Lambda que armazena os arquivos em formato JSON original num bucket AWS S3 cru (*raw*).

Em seguida, temos a etapa de **ETL** que cria um gatilho no AWS Event Bridge às 00:00 BRT para manipular os dados do *datalake* de arquivos JSON. Aqui, é feito um *data wrangling* para extrair apenas as chaves úteis dos arquivos JSON, e armazenar o conjunto de todos as mensagens num só arquivo diário no formato Apache Parquet. O armazenamento é feito de forma **particionada** por dia, assim como feito na etapa de ingestão, mas agora num Bucket S3 enriquecido (*enriched*). Este formato é **orientado a colunas**, então reduz bastante o custo de escaneamento dos dados.

Por fim, é feita a etapa de **apresentação**, na qual outro gatilho do AWS Event Bridge é disparado às 02:00 BRT de forma a reparar a tabela de dados *telegram* com novas partições do *datalake* enriquecido com os arquivos Apache Parquet. Com isto, são feitas consultas SQL de interesse no AWS Athena na tabela particionada, analisando métricas sobre o grupo, usuários e mensagens enviadas.

# 3. Análise Exploratória de Dados

A Telegram Bot API permite a comunicação de informações de mensagens, usuários e *bots* de um grupo com códigos externos. Existem alguns métodos que podem ser usados no Python para recuperar essas informações em formato JSON. Um deles é o método **getMe** que retorna informações sobre o *bot* do grupo. Já o método **getUpdates** retorna todas as informações com listas das mensagens mandadas em um grupo, disponíveis por até 24 horas. Para o acesso, é necessário um API token relacionado ao *bot*. Isso envolve usar linhas de código do tipo:

```
import json
from getpass import getpass

token = getpass()
base_url = f'https://api.telegram.org/bot{token}'
 
response = requests.get(url=f'{base_url}/getMe')
```

onde *token* é o API token do *bot* obtido do Telegram.


In [1]:
import os
import json

with open('/kaggle/input/data-getupdates/getMe.json', mode='r', encoding='utf8') as f:
    message = json.loads(f.read())
    print(json.dumps(
        message, indent=2
    ))

{
  "ok": true,
  "result": {
    "id": 6654125182,
    "is_bot": true,
    "first_name": "m42_ebac_bot",
    "username": "m42_pipeline_bot",
    "can_join_groups": false,
    "can_read_all_group_messages": false,
    "supports_inline_queries": false
  }
}


No método **getUpdates** temos informações mais relevantes das mensagens. Algumas chaves são **obrigatórias**, por exemplo first_name, is_bot, date, text. Mas, outras chaves são **opcionais** como last_name e username. Com um arquivo JSON de resposta do **getUpdates** armazenado como exemplo, podemos lê-lo para ver seu formato:

In [2]:
with open('/kaggle/input/data-getupdates/telegram.json', mode='r', encoding='utf8') as f:
    message = json.loads(f.read())
    print(json.dumps(
        message, indent=2
    ))

{
  "ok": true,
  "result": [
    {
      "update_id": 187921657,
      "message": {
        "message_id": 3,
        "from": {
          "id": 479372888,
          "is_bot": false,
          "first_name": "Mateus",
          "last_name": "Miguel",
          "username": "mateusmmiguel"
        },
        "chat": {
          "id": -4055988830,
          "title": "M42 Ebac Group",
          "type": "group",
          "all_members_are_administrators": true
        },
        "date": 1697323306,
        "text": "Ol\u00e1, mundo!"
      }
    },
    {
      "update_id": 187921658,
      "message": {
        "message_id": 4,
        "from": {
          "id": 479372888,
          "is_bot": false,
          "first_name": "Mateus",
          "last_name": "Miguel",
          "username": "mateusmmiguel"
        },
        "chat": {
          "id": -4055988830,
          "title": "M42 Ebac Group",
          "type": "group",
          "all_members_are_administrators": true
        },
        "date"

Como o arquivo JSON é semi-estruturado, a ideia é fazer um *data wrangling* para extrair apenas as informações obrigatórias das mensagens e armazená-las em um formato estruturado, tabular. Com isso, vai ser possível usar o AWS Athena para consultas SQL padronizadas, sobre a tabela 'telegram'. A função *parse_data* é capaz de fazer isto, basicamente percorrendo as chaves e listas 

```
from datetime import datetime, timezone, timedelta

def parse_data(data: dict) -> dict:
    # Função que faz o data wrangling dos arquivos JSON
    parsed_data = dict()
    
    for key, value in data.items():
        if key == 'from':
            for k, v in data[key].items():
                if k in ['id', 'is_bot', 'first_name']:
                    parsed_data[f'user_{k}'] = [v]
        
        elif key == 'chat':
            for k, v in data[key].items():
                if k in ['id', 'type']:
                    parsed_data[f'chat_{k}'] = [v]
                    
        elif key in ['message_id', 'text']:
            parsed_data[key] = [value]
            
        elif key == 'date':
            tzinfo = timezone(offset=timedelta(hours=-3))
            parsed_data[key] = [value]
            parsed_data['timestamp'] = [datetime.fromtimestamp(value, tzinfo).strftime('%Y-%m-%d %H:%M:%S')]
            
    if not 'text' in parsed_data.keys():
        parsed_data['text'] = ['']
        
    return parsed_data
```

## 3.1 - Ingestão

### 3.1.1 - AWS API Gateway

O serviço em nuvem AWS API Gateway permite a criação de API's de forma escalável. No contexto deste projeto, é preciso conectar a Telegram Bot API com uma REST API criada neste serviço. No momento da implantação, é retornada a variável *aws_api_gateway_url* que contém a URL da REST API. Para conectá-la ao Telegram, é preciso usar o método **setWebhook**, já possuindo a variável *token* do *bot* do Telegram.

```
import json
import requests
from getpass import getpass

token = getpass()
base_url = f'https://api.telegram.org/bot{token}'
```

Então é usado o método 

`response = requests.get(url=f'{base_url}/setWebhook?url={aws_api_gateway_url}'`

Que cria um *webhook* entre as API's e retorna um JSON do formato:

In [3]:
with open('../input/data-getupdates/setwebhook.json', mode='r', encoding='utf8') as f:
    message = json.loads(f.read())
    print(json.dumps(
        message, indent=2
    ))

{
  "ok": true,
  "result": true,
  "description": "Webhook was set"
}


Por fim, o método **getWebhookInfo** permite obter informações sobre o *webhook* criado entre as duas API's. Em específico, retorna informações sobre status, URL e endereço de IP.

In [4]:
with open('../input/data-getupdates/webhookInfo.json', mode='r', encoding='utf8') as f:
    message = json.loads(f.read())
    print(json.dumps(
        message, indent=2
    ))

{
  "ok": true,
  "result": {
    "url": "https://biewk7y6uh.execute-api.sa-east-1.amazonaws.com/dev",
    "has_custom_certificate": false,
    "pending_update_count": 0,
    "max_connections": 40,
    "ip_address": "52.67.105.244"
  }
}


### 3.1.2 - AWS Lambda Raw

Com o *webhook* criado, agora os arquivos JSON das mensagens do Telegram são armazenados em um AWS S3 bucket armazenado na variável de ambiente AWS_S3_BUCKET cujo valor é o caminho do bucket respectivo. Outra variável de ambiente é TELEGRAM_CHAT_ID que armazena o id do chat da API, para conferir se os dados estão sendo fornecidos pelo chat correto.

Além disso, a biblioteca *datetime* permite colocar os horários no fuso horário local (BRT) pelo uso de *timezone* e *timedelta*, com diferença de -03:00 horas em relação ao UTC. Isso deixa a manipulação de datas adequada aos horários das mensagens como visualizado no grupo do Telegram.

Este processo é feito pela seguinte função AWS Lambda:

```
import os
import json
import logging
from datetime import datetime, timezone, timedelta

import boto3

def lambda_handler(event: dict, context: dict) -> dict:
  """
  Recebe uma mensagem do Telegram via AWS API Gateway, verifica
  se seu conteúdo foi produzido em determinado grupo e escreve
  em seu formato original JSON, em um bucket AWS S3
  """

  # variáveis de ambiente

  BUCKET = os.environ['AWS_S3_BUCKET']
  TELEGRAM_CHAT_ID = int(os.environ['TELEGRAM_CHAT_ID'])

  # variáveis lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = datetime.now(tzinfo).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  filename = f'{timestamp}.json'

  # código principal

  client = boto3.client('s3')

  try:
    message = json.loads(event['body'])
    # message = event
    chat_id = message['message']['chat']['id']

    if chat_id == TELEGRAM_CHAT_ID:
      with open(f'/tmp/{filename}', mode='w', encoding='utf8') as f:
        json.dump(message, f)
      client.upload_file(f'/tmp/{filename}', BUCKET, f'telegram/context_date={date}/{filename}')
      
  except Exception as exc:
    logging.error(msg=exc)
    return dict(statusCode='500')
    
  else:
    return dict(statusCode='200')
```

## 3.2 - ETL

Nesta etapa, é programado um gatilho às 00:00 BRT (fuso horário local) no AWS EventBridge por meio da expressão cron (0 3 \* \* ? \*). O seu objetivo é disparar uma função AWS Lambda que realiza o *data wrangling* dos arquivos JSON do dia anterior inteiro para apenas um arquivo Apache Parquet. Portanto, envolve duas variáveis de ambiente AWS_S3_RAW e AWS_S3_ENRICHED dos dois buckets envolvidos, o primeiro com os arquivos JSON armazenados por partições diárias e o segundo com arquivos PARQUET particionados também diariamente. Por meio do AWS IAM são garantidas permissões de acesso como AmazonS3FullAccess para a interação da função com os buckets.

```
import os
import json
import logging
from datetime import datetime, timedelta, timezone

import boto3
import pyarrow as pa
import pyarrow.parquet as pq

def lambda_handler(event: dict, context: dict) -> bool:
    """
    Função que puxa os arquivos do dia anterior do raw bucket S3
    e realiza um data wrangling de todos os arquivos JSON para persistir
    como tabela do formato .parquet dentro do enriched bucket S3
    """
    
    # variáveis de ambiente
    
    RAW_BUCKET = os.environ['AWS_S3_RAW']
    ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']
    
    # variáveis lógicas
    
    tzinfo = timezone(offset=timedelta(hours=-3))
    date = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d') # dia anterior, com offset de timedelta(days=1)
    timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')
    
    # código principal
    
    table = None
    client = boto3.client('s3')
    
    try:
        # listando arquivos JSON do Raw Bucket pela pasta do dia anterior
        response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix=f'telegram/context_date={date}')
        
        for content in response['Contents']:
            key = content['Key']
            arquivo = key.split('/')[-1]
            client.download_file(RAW_BUCKET, key, f'/tmp/{arquivo}')
            
            with open(f'/tmp/{arquivo}', mode='r', encoding='utf8') as f:
                data = json.load(f)
                data = data['message']

            # É feito data wrangling para formato tabular, e então usado o PyArrow para criar uma table .parquet
            parsed_data = parse_data(data=data) # data wrangling
            iter_table = pa.Table.from_pydict(mapping=parsed_data)
            
            if table:
                table = pa.concat_tables([table, iter_table]) # concatenação dos arquivos JSON diários em forma tabular
            else:
                table = iter_table
                iter_table = None
                
        pq.write_table(table=table, where=f'/tmp/{timestamp}.parquet')
        client.upload_file(f'/tmp/{timestamp}.parquet', ENRICHED_BUCKET, f'context_date={date}/{timestamp}.parquet')
            
        return True
        
    except Exception as exc:
        logging.error(msg=exc)
        return False 
```

Na função AWS Lambda acima, além das permissões e variáveis de ambiente, é preciso instalar uma *Layer* que permita o uso de bibliotecas como PyArrow. Isto porque as funções AWS Lambda normalmente têm acesso a poucos pacotes, em geral nativos do Python. Porém, para criar os arquivos Apache Parquet é preciso o uso de pyarrow e pyarrow.parquet. Através do repositório GitHub [AWS SDK for pandas (awswrangler)](http://https://github.com/aws/aws-sdk-pandas/releases) podemos baixar o arquivo ZIP relacionado com a versão do Python escolhida para a função AWS Lambda, no caso do projeto Python 3.8. Como este arquivo é grande da ordem de megabytes (MB), é antes adicionado a um bucket do AWS S3 para então ser criada uma nova *Layer* apontando para ele.

Na etapa de criação da tabela Apache Parquet, é usada a função `pa.Table.from_pydict(mapping=parsed_data)` onde `parsed_data` é o dicionário Python retornado após o *data wrangling* extraindo e formatando o nome das colunas de interesse. É feito um laço de repetição sobre todos os arquivos acessados pela função `S3.Client.list_objects_v2()`, de forma a ir concatenando as tabelas PyArrow por `pa.concat_tables([table, iter_table])` até reunir todos os JSON de um mesmo dia. 

Através do pacote PyArrow podemos visualizar como fica um arquivo Apache Parquet das partições após executada a função AWS Lambda com o *data wrangling* adequado. O exemplo abaixo contém as variáveis `last_name` e `username` que são opcionais, e não são usadas no projeto final.

In [5]:
import pyarrow as pa
import pyarrow.parquet as pq

table = pq.read_table('../input/data-getupdates/20231016163208693232.parquet')

In [6]:
table

pyarrow.Table
message_id: int64
user_id: int64
user_is_bot: bool
user_first_name: string
user_last_name: string
user_username: string
chat_id: int64
chat_type: string
date: int64
timestamp: string
text: string
----
message_id: [[9,10,11]]
user_id: [[479372888,479372888,479372888]]
user_is_bot: [[false,false,false]]
user_first_name: [["Mateus","Mateus","Mateus"]]
user_last_name: [["Miguel","Miguel","Miguel"]]
user_username: [["mateusmmiguel","mateusmmiguel","mateusmmiguel"]]
chat_id: [[-4055988830,-4055988830,-4055988830]]
chat_type: [["group","group","group"]]
date: [[1697472788,1697473103,1697473104]]
timestamp: [["2023-10-16 13:13:08","2023-10-16 13:18:23","2023-10-16 13:18:24"]]
...

## 3.3 - Apresentação

