# Pipeline e Análise de Dados de Mensagens do Telegram

[Rafael Barbosa](https://www.linkedin.com/in/barbosa89/)
<br>
[Marcelo Barbosa](https://www.linkedin.com/in/marcelogomesbarbosa)
<br>
[André Perez](https://www.linkedin.com/in/andremarcosperez/)
<br>
[Mariane Neiva](https://www.linkedin.com/in/mariane-neiva/)


---

A seguir um modelo de pipeline com guia de configuração utilizando Amazon Web Services, Python e SQL para ingerir, extrair, transformar e carregar dados de mensagens de um chat do Telegram, seguindo o modelo da imagem a seguir.

![](https://github.com/rafie-b/Data-Warehouse-AWS-Pipeline-Chat-API/blob/main/repo-Data-Warehouse-AWS-Pipeline-Chat-API/Telegram%20Data%20Pipeline%20Schema.png?raw=true)

Insights importantes são inferidos a partir dos dados de mensagens de chats, identificando gargalos mais comuns e áreas para melhoria, guiando o desempenho de agentes, promovendo ofertas personalizadas, gerando leads de forma eficiente, identificando tendências e entendendo necessidades dos clientes, avaliando a satisfação do cliente, e claro melhorando a comunicação e colaboração entre equipes.

Este passo a passo processa dados transacionais (o que acontece), em dados analíticos (por que e como acontece) e deve ter seus parâmetros configurados.


## 0/3\. Fonte de Dados

Dados de mensagens captadas por um [**Telegram bot**](https://core.telegram.org/bots/api) via `API token` disponibilizado no `BotFather` do Telegram. [Crie e configure um Telegram bot API token aqui.](https://github.com/rafie-b/Profession-Data-Analyst/blob/main/Pipeline-Data-API-Telegram-bot-Profissao_Analista_de_dados_M42_Exercicio_pt1_Rafael_Barbosa.ipynb) Guarde o "token" em "base_url" usando a célula de código Python abaixo.

In [1]:
### Salva o API token em url_base
# A `url` base é comum a todos os métodos da API
from getpass import getpass
import requests

print("insert Telegram bot API Token")
token = getpass()

base_url = f'https://api.telegram.org/bot{token}'
print("base_url stored")

insert Telegram bot API Token


 ··············································


base_url stored


In [6]:
### `getMe` retorna informações sobre o bot
import json
import requests

response = requests.get(url=f'{base_url}/getMe')
print(json.dumps(json.loads(response.text), indent=2))

{
  "ok": true,
  "result": {
    "id": 7084060512,
    "is_bot": true,
    "first_name": "ebac_m42_bot",
    "username": "ebacm42_bot",
    "can_join_groups": false,
    "can_read_all_group_messages": false,
    "supports_inline_queries": false,
    "can_connect_to_business": false
  }
}


In [8]:
### `getUpdates` revela a estrutura de dados de mensagens captadas pelo bot
import json
import requests

response = requests.get(url=f'{base_url}/getUpdates')
print(json.dumps(json.loads(response.text), indent=2))

{
  "ok": true,
  "result": [
    {
      "update_id": 827865390,
      "message": {
        "message_id": 68,
        "from": {
          "id": 7162819601,
          "is_bot": false,
          "first_name": "Rafie",
          "language_code": "en"
        },
        "chat": {
          "id": -1001993072220,
          "title": "EBAC M42 BOT",
          "type": "supergroup"
        },
        "date": 1717976196,
        "text": "up up"
      }
    }
  ]
}


0.1. Guarde o número encontrado no 'id' do item 'chat' do `getUpdates` para configurar a variável de ambiente do `AWS Lambda`.

# 1/3. Ingestão

**1.1.** `AWS S3 bucket` com sufixo `-raw` para **dados crus**;

**1.2.** `AWS Lambda` **função Python** que recebe e armazena dados crus no formato JSON, no `AWS S3 bucket` sufixo `-raw`;

1.2.1. variáveis de ambiente para **nome do bucket de dados crus** (AWS_S3_BUCKET) e para **id do chat do Telegram** (TELEGRAM_CHAT_ID) em "Configuração > Variáveis de Ambiente > Editar" no painel da função;

1.2.2. permissão para escrita no bucket de dados crus em "Configuração > Permissôes > Nome da Função [<Função>]" no painel da função e no `AWS IAM` "Adicionar permissões > Anexar políticas" para o `AWS S3 bucket` sufixo `-raw`;


```python
### código Python da função `AWS Lambda` criada para dados crus:

# persistir as mensagens captadas pelo bot do Telegram em um bucket do AWS S3
# Recebe a mensagem no parâmetro event;
# Verifica se a mensagem tem origem no grupo do Telegram correto;
# Persiste a mensagem no formato JSON no bucket do AWS S3;
# Retorna uma mensagem de sucesso (código de retorno HTTP igual a 200) a API de bots do Telegram.

import os
import json
import logging
from datetime import datetime, timezone, timedelta

import boto3


def lambda_handler(event: dict, context: dict) -> dict:

  '''
  Recebe uma mensagens do Telegram via AWS API Gateway, verifica no
  seu conteúdo se foi produzida em um determinado grupo e a escreve, 
  em seu formato original JSON, em um bucket do AWS S3.
  '''

  # vars de ambiente

  BUCKET = os.environ['AWS_S3_BUCKET']
  TELEGRAM_CHAT_ID = int(os.environ['TELEGRAM_CHAT_ID'])

  # vars lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = datetime.now(tzinfo).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  filename = f'{timestamp}.json'

  # código principal

  client = boto3.client('s3')

  try:

    message = json.loads(event["body"])
    chat_id = message["message"]["chat"]["id"]

    if chat_id == TELEGRAM_CHAT_ID:

      with open(f"/tmp/{filename}", mode='w', encoding='utf8') as fp:
        json.dump(message, fp)

      client.upload_file(f'/tmp/{filename}', BUCKET, f'telegram/context_date={date}/{filename}')

  except Exception as exc:
      logging.error(msg=exc)
      return dict(statusCode="500")

  else:
      return dict(statusCode="200")
```

**1.3.** `AWS API Gateway` **API REST** com sufixo `-api` para acionar a função `AWS Lambda` criada para receber e armazenar dados crus do `Telegram bot API` no `AWS S3 bucket` sufixo `-raw`;

1.3.1. Criar método tipo "POST", integração "Função do Lambda", "Integração do proxy do Lambda" ativa, escolher a função do `AWS Lambda` criada;

1.3.2. Implantar API, Estágio "Novo estágio", Nome do estágio "dev", guardar "Invocar URL" em "aws_api_gateway_url" usando a célula de código Python abaixo;

> Nota: não disponibilize o endereço da API gerada.

**1.4.** `setWebhook` via `Telegram bot API` para redirecionar as informações de mensagens captadas pelo bot para o endereço "aws_api_gateway_url" usando a célula de código Python abaixo;

> Nota: não disponibilize o token da API de bots do Telegram.

In [4]:
### guarda "Invocar URL" em "aws_api_gateway_url"

from getpass import getpass

print("Insert AWS API Gateway Invoke URL")
aws_api_gateway_url = getpass()
print("aws_api_gateway_url stored")

Insert AWS API Gateway Invoke URL


 ··························································


aws_api_gateway_url stored


In [9]:
### configura o webhook para redirecionar as mensagens para a url do AWS API Gateway

import json
import requests

response = requests.get(url=f'{base_url}/setWebhook?url={aws_api_gateway_url}')
print(json.dumps(json.loads(response.text), indent=2))

{
  "ok": true,
  "result": true,
  "description": "Webhook was set"
}


In [None]:
# ### este trecho apaga e redefine o Webhook
# import json
# import requests

# requests.get(url=f'{base_url}/deleteWebhook')
# print("Webhook reseted")

In [10]:
### getWebhookInfo retorna as informações sobre o webhook configurado

import json
import requests

response = requests.get(url=f'{base_url}/getWebhookInfo')
print(json.dumps(json.loads(response.text), indent=2))

{
  "ok": true,
  "result": {
    "url": "https://hppmtvbty7.execute-api.sa-east-1.amazonaws.com/dev",
    "has_custom_certificate": false,
    "pending_update_count": 0,
    "max_connections": 40,
    "ip_address": "54.233.185.14"
  }
}


## 2/3\. ETL

**2.1.** `AWS S3 bucket` com o sufixo `-enriched` para armazenar **dados enriquecidos**;

**2.2.** `AWS Lambda` **função Python** que transforma o JSON do dia anterior (D-1) do bucket sufixo `-raw` e salva em um arquivo PARQUET particionado por dia no bucket sufixo `-enriched`;

2.2.1. variáveis de ambiente para **nome do bucket de dados crus** (AWS_S3_BUCKET) e **nome do bucket de dados enriquecidos** (AWS_S3_ENRICHED) em "Configuração > Variáveis de Ambiente > Editar" no painel da função;

2.2.2. permissão para escrita no bucket de dados enriquecidos em "Configuração > Permissôes > Nome da Função [<Função>]" no painel da função e no `AWS IAM` "Adicionar permissões > Anexar políticas" para o `AWS S3 bucket` sufixo `-enriched`;

2.2.3. tempo limite para 5 minutos em "Configuração > Configuração geral > Editar" no painel da função;

2.2.4. camada Python PyArrow em "Código > Camadas > Adicionar uma camada > criar uma nova camada > Fazer upload de um arquivo do Amazon S3 ([carregar arquivo para Python compatível](https://github.com/awslabs/aws-data-wrangler/releases) em um `AWS S3 bucket` exclusivo) > inserir Link do URL do Amazon S3 > Tempos de execução compatíveis > Camadas personalizadas > Escolher > Versão 1 > Adicionar" no painel da função;

```python
### código Python da função `AWS Lambda` criada para dados enriquecidos:

# Lista todos os arquivos JSON de uma única participação da camada crua de um bucket do AWS S3;
# Para cada arquivo listado:
# Faz o download do arquivo e carrega o conteúdo da mensagem;
# Executa uma função de data wrangling;
# Cria uma tabela do PyArrow e a contatena com as demais.
# Persiste a tabela no formato Parquet na camada enriquecida em um bucket do AWS S3

import os
import json
import logging
from datetime import datetime, timedelta, timezone

import boto3
import pyarrow as pa
import pyarrow.parquet as pq


def lambda_handler(event: dict, context: dict) -> bool:

  '''
  Diariamente é executado para compactar as diversas mensagensm, no formato
  JSON, do dia anterior, armazenadas no bucket de dados cru, em um único 
  arquivo no formato PARQUET, armazenando-o no bucket de dados enriquecidos
  '''

  # vars de ambiente

  RAW_BUCKET = os.environ['AWS_S3_BUCKET']
  ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']

  # vars lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  # código principal

  table = None
  client = boto3.client('s3')

  try:

      response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix=f'telegram/context_date={date}')

      for content in response['Contents']:

        key = content['Key']
        client.download_file(RAW_BUCKET, key, f"/tmp/{key.split('/')[-1]}")

        with open(f"/tmp/{key.split('/')[-1]}", mode='r', encoding='utf8') as fp:

          data = json.load(fp)
          data = data["message"]

        parsed_data = parse_data(data=data)
        iter_table = pa.Table.from_pydict(mapping=parsed_data)

        if table:

          table = pa.concat_tables([table, iter_table])

        else:

          table = iter_table
          iter_table = None
          
      pq.write_table(table=table, where=f'/tmp/{timestamp}.parquet')
      client.upload_file(f"/tmp/{timestamp}.parquet", ENRICHED_BUCKET, f"telegram/context_date={date}/{timestamp}.parquet")

      return True
  
  except Exception as exc:
      logging.error(msg=exc)
      return False
    

# data wrangling

def parse_data(data: dict) -> dict:

  date = datetime.now().strftime('%Y-%m-%d')
  timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

  parsed_data = dict()

  for key, value in data.items():

      if key == 'from':
          for k, v in data[key].items():
              if k in ['id', 'is_bot', 'first_name']:
                parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key == 'chat':
          for k, v in data[key].items():
              if k in ['id', 'type']:
                parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key in ['message_id', 'date', 'text']:
          parsed_data[key] = [value]

  if not 'text' in parsed_data.keys():
    parsed_data['text'] = [None]

  return parsed_data
```

**2.3.** `AWS EventBridge` para acionar a função `AWS Lambda` criada para transformar o JSON do dia anterior e armazenar o PARQUET no `AWS S3 bucket` sufixo `-enriched` todos os dias 00h (GMT-3) em "Criar regra > Nome, Programação > Continuar a criar regra > cron(0 3 * * ? *) > Próximo > Serviço AWS, Função do Lambda, Função (sufixo `-enriched`) > Próximo (x2) > Criar regra";

## 3/3\. Apresentação

**3.1.** Tabela com os dados enriquecidos utilizando SQL no `AWS Athena`;

```sql
# Consulta SQL para criar tabela com os dados enriqeucidos particionados por dia:

CREATE EXTERNAL TABLE `telegram`(
  `message_id` bigint, 
  `user_id` bigint, 
  `user_is_bot` boolean, 
  `user_first_name` string, 
  `chat_id` bigint, 
  `chat_type` string, 
  `text` string, 
  `date` bigint)
PARTITIONED BY ( 
  `context_date` date)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<bucket-enriquecido>/<pasta-particionada>'
```

![](https://github.com/rafie-b/Data-Warehouse-AWS-Pipeline-Chat-API/blob/main/repo-Data-Warehouse-AWS-Pipeline-Chat-API/athena-create-table.png?raw=true)

**3.2.** Carrega as partições na tabela criada;

```sql
MSCK REPAIR TABLE `telegram`;
```

![](https://github.com/rafie-b/Data-Warehouse-AWS-Pipeline-Chat-API/blob/main/repo-Data-Warehouse-AWS-Pipeline-Chat-API/athena-repair-table.png?raw=true)

3.2.1. Para atualizar a tabela também é possível utilizar o comando a seguir para reduzir possíveis custos de consulta;
 ```sql
 ALTER TABLE <nome-tabela> ADD PARTITION <coluna-partição> = <valor-partição>
  ```

**3.3.** Dados analíticos;

- Quantidade de mensagens por dia;

```sql
SELECT 
  context_date, 
  count(1) AS "message_amount" 
FROM "telegram" 
GROUP BY context_date 
ORDER BY context_date DESC
```

![](https://github.com/rafie-b/Data-Warehouse-AWS-Pipeline-Chat-API/blob/main/repo-Data-Warehouse-AWS-Pipeline-Chat-API/athena-select-message_amount.png?raw=true)
![](https://github.com/rafie-b/Data-Warehouse-AWS-Pipeline-Chat-API/blob/main/repo-Data-Warehouse-AWS-Pipeline-Chat-API/graph-select-message_amount.png?raw=true)

- Quantidade de mensagens por usuário por dia.

```sql
SELECT 
  user_id, 
  user_first_name, 
  context_date, 
  count(1) AS "message_amount" 
FROM "telegram" 
GROUP BY 
  user_id, 
  user_first_name, 
  context_date 
ORDER BY context_date DESC
```

![](https://github.com/rafie-b/Data-Warehouse-AWS-Pipeline-Chat-API/blob/main/repo-Data-Warehouse-AWS-Pipeline-Chat-API/athena-select-user-message_amount.png?raw=true)

- Média do tamanho das mensagens por usuário por dia.

```sql
SELECT 
  user_id, 
  user_first_name, 
  context_date,
  CAST(AVG(length(text)) AS INT) AS "average_message_length" 
FROM "telegram" 
GROUP BY 
  user_id, 
  user_first_name, 
  context_date 
ORDER BY context_date DESC
```

![](https://github.com/rafie-b/Data-Warehouse-AWS-Pipeline-Chat-API/blob/main/repo-Data-Warehouse-AWS-Pipeline-Chat-API/athena-select-user-message_lenght.png?raw=true)
![](https://github.com/rafie-b/Data-Warehouse-AWS-Pipeline-Chat-API/blob/main/repo-Data-Warehouse-AWS-Pipeline-Chat-API/graph-select-user-message_lenght.png?raw=true)

- Quantidade de mensagens por hora por dia da semana por número da semana.

```sql
WITH 
parsed_date_cte AS (
    SELECT 
        *, 
        CAST(date_format(from_unixtime("date"),'%Y-%m-%d %H:%i:%s') AS timestamp) AS parsed_date
    FROM "telegram" 
),
hour_week_cte AS (
    SELECT
        *,
        EXTRACT(hour FROM parsed_date) AS parsed_date_hour,
        EXTRACT(dow FROM parsed_date) AS parsed_date_weekday,
        EXTRACT(week FROM parsed_date) AS parsed_date_weeknum
    FROM parsed_date_cte
)
SELECT
    parsed_date_hour,
    parsed_date_weekday,
    parsed_date_weeknum,
    count(1) AS "message_amount" 
FROM hour_week_cte
GROUP BY
    parsed_date_hour,
    parsed_date_weekday,
    parsed_date_weeknum
ORDER BY
    parsed_date_weeknum,
    parsed_date_weekday
```

![](https://github.com/rafie-b/Data-Warehouse-AWS-Pipeline-Chat-API/blob/main/repo-Data-Warehouse-AWS-Pipeline-Chat-API/athena-select-hour-week-message_amount.png?raw=true)

-  Total de vezes que cada palavra única aparece em todas as mensagens

```sql
WITH words AS (
  SELECT word
  FROM "telegram",
  UNNEST(split("text", ' ')) AS t(word)
)
SELECT word, COUNT(*) as count
FROM words
GROUP BY word
ORDER BY count DESC;
```

![](https://github.com/rafie-b/Data-Warehouse-AWS-Pipeline-Chat-API/blob/main/repo-Data-Warehouse-AWS-Pipeline-Chat-API/athena-select-count-unnest-words.png?raw=true)
![](https://github.com/rafie-b/Data-Warehouse-AWS-Pipeline-Chat-API/blob/main/repo-Data-Warehouse-AWS-Pipeline-Chat-API/graph-select-count-unnest-words.png?raw=true)