##**PARTE 2 - PROJETO API/TELEGRAM**##

Imagine que você deseja monitorar mensagens enviadas a um grupo específico no Telegram e armazená-las de forma estruturada na nuvem. Para isso, é necessário capturar os dados em tempo real, organizá-los e processá-los de maneira eficiente. Este código implementa uma solução automatizada utilizando AWS Lambda, S3 e PyArrow para coletar, transformar e armazenar mensagens do Telegram em formato Parquet , garantindo escalabilidade e otimização no armazenamento e análise dos dados.

#**Organização do Código**#
O código está estruturado em três partes principais:

- Configuração e Autenticação no Telegram

Obtenha o token de acesso via getpass().
Consulte informações do bot e configure o Webhook para receber mensagens.
Processamento e Armazenamento no AWS S3

- Uma função Lambda( lambda_handler) é responsável por receber mensagens do Telegram via AWS API Gateway .
O código verifica se a mensagem pertence ao grupo correto e salva os dados brutos em um bucket S3 no formato JSON.
Transformação e Escrita dos Dados em Parquet

- Outra função Lambda processa os arquivos JSON armazenados.
Utilize PyArrow para converter os dados para Parquet , melhorando a eficiência de consulta.

* Os dados estruturados são enviados de volta ao S3 , organizados por dados.
Criação da Tabela no AWS Athena

Define uma tabela externa de chamada telegram, armazenada no formato Parquet .
Permite o particionamento por dados , otimizando consultas.

In [None]:
from getpass import getpass

token=getpass()

··········


In [None]:
import json
import requests

url_base = f'https://api.telegram.org/bot{token}'

In [None]:
response = requests.get(url=f'{url_base}/getMe')
print(f'{url_base}/getMe')

print(json.dumps(json.loads(response.text), indent=2))

https://api.telegram.org/bot7802121490:AAHmon5meHJunCwGTlZm3qIqXFx4E5fYStQ/getMe
{
  "ok": true,
  "result": {
    "id": 7802121490,
    "is_bot": true,
    "first_name": "M44",
    "username": "m44_ebac_bot",
    "can_join_groups": true,
    "can_read_all_group_messages": false,
    "supports_inline_queries": false,
    "can_connect_to_business": false,
    "has_main_web_app": false
  }
}


In [None]:
response=requests.get(url=f'{url_base}/getUpdates')

print(json.dumps(json.loads(response.text), indent=2))

{
  "ok": false,
  "error_code": 409,
  "description": "Conflict: can't use getUpdates method while webhook is active; use deleteWebhook to delete the webhook first"
}


***INGESTÃO***

In [None]:
import os
import json
import logging
from datetime import datetime, timezone, timedelta

import boto3

def lambda_handler(event: dict, context: dict) -> dict:
 """
 Recebe uma mensagem do Telegram via AWS API Gateway, verifica no seu conteúdo se foi produzida em um determinado grupo e a escreve, em seu formato original JSON, em um bucket do AWS S3.
 """

 # Vars de ambiente
 BUCKET = os.environ['AWS_S3_BUCKET']
 TELEGRAM_CHAT_ID = int(os.environ['TELEGRAM_CHAT_ID'])

# Vars lógicas
 tzinfo = timezone(offset=timedelta(hours=-3))
 date = (datetime.now(tzinfo) - timedelta(days=0)).strftime('%Y-%m-%d')
 timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

 filename = f'{timestamp}.json'

# Código principal
 client = boto3.client('s3')


 try:
  message = json.loads(event["body"])
  chat_id = message["message"]["chat"]["id"]

  if chat_id == TELEGRAM_CHAT_ID:

    with open(f'/tmp/{filename}', mode='w', encoding='utf8') as fp:
      json.dump(message, fp)

    client.upload_file(f'/tmp/{filename}', BUCKET, f'telegram/context_date={date}/{filename}')

 except Exception as exc:
    logging.error(msg=exc)
    return dict(statusCode="500")

 else:
    return dict(statusCode="200")


ModuleNotFoundError: No module named 'boto3'

In [None]:
aws_api_gateway_url = getpass()

··········


* setWebhook

In [None]:
response = requests.get(url=f'{url_base}/setWebhook?url={aws_api_gateway_url}')
print(f'{url_base}/setWebhook?url={aws_api_gateway_url}')

https://api.telegram.org/bot7802121490:AAHmon5meHJunCwGTlZm3qIqXFx4E5fYStQ/setWebhook?url=arn:aws:lambda:sa-east-1:977099027448:function:modulo-44-ebac-raw


* getWebhookinfo

In [None]:
response = requests.get(url=f'{url_base}/getWebhookInfo')
print(f'{url_base}/getWebhookInfo')
print(json.dumps(json.loads(response.text), indent=2))

https://api.telegram.org/bot7802121490:AAHmon5meHJunCwGTlZm3qIqXFx4E5fYStQ/getWebhookInfo
{
  "ok": true,
  "result": {
    "url": "",
    "has_custom_certificate": false,
    "pending_update_count": 4
  }
}


In [None]:
import os
import json
import logging
from datetime import datetime, timezone, timedelta
import boto3
import pyarrow as pa
import pyarrow.parquet as pq

def lambda_handler(event: dict, context: dict) -> bool:
 """
 Recebe uma mensagem do Telegram via AWS API Gateway, verifica no seu conteúdo se foi produzida em um determinado grupo,
 processa os dados e os escreve em formato Parquet em um bucket do AWS S3.
 """

# Variáveis de ambiente
 RAW_BUCKET = os.environ['AWS_S3_BUCKET']
 TELEGRAM_CHAT_ID = int(os.environ['AWS_S3_ENRICHED'])

# Variáveis lógicas
 tzinfo = timezone(offset=timedelta(hours=-3))
 date = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d')
 timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

# Código principal
 client = boto3.client('s3')
 table = None

try:
# Lista objetos no bucket S3
  response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix=f'telegram/context_date={date}')

  for content in response['Contents']:

    key = content['key']
    client.download_file(RAW_BUCKET, key, f"/tmp/{file_name}")

            # Abre o arquivo JSON e carrega os dados
    with open(f"/tmp/{file_name}", mode='r', encoding='utf8') as fp:

      data = json.load(fp)
      data = data["message"]

# Parseia os dados
    parsed_data = parse_data(data=data)
    iter_table = pa.Table.from_pydict(mapping=parsed_data)

# Concatena as tabelas
    if table:

     table = pa.concat_tables([table, iter_table])

    else:

      table = iter_table
      iter_table = None

  pq.write_table(table=table, where=parquet_file)
  client.upload_file(parquet_file, RAW_BUCKET, f'telegram/context_date={date}/{timestamp}.parquet')


In [None]:
def parse_data(data: dict) -> dict:

 date = datetime.now().strftime('%Y-%m-%d')
 timestamp = datetime.now().strftime('%Y%m%d%H%M%S%f')


 parsed_data = dict()

 for key, value in data.items():

   if key == 'from':
      for k, v in data[key].items():
          if k in ['is_bot', 'first_name']:
            parsed_data[f"user_{k}"] = v

   elif key == 'chat':
      for k, v in data[key].items():
          if k in ['id', 'type']:
            parsed_data[f"chat_{k}"] = v

   elif key in ['message_id', 'date', 'text']:
      parsed_data[key] = [value]

 if 'text' not in parsed_data.keys():
   parsed_data['text'] = [None]

 return parsed_data

* AWS Athena

In [None]:
CREATE EXTERNAL TABLE `telegram` (
    `message_id` BIGINT,
    `user_id` BIGINT,
    `user_is_bot` BOOLEAN,
    `user_first_name` STRING,
    `chat_id` BIGINT,
    `chat_type` STRING,
    `text` STRING,
    `date` BIGINT
)
PARTITIONED BY (`context_date` DATE)
STORED AS PARQUET
LOCATION 's3://modulo-44-ebac-raw/telegram/'
TBLPROPERTIES (
    'parquet.compression'='SNAPPY'
);


In [None]:
MSCK REPAIR TABLE `telegram`;

##**INSIGHTS**##

- Este código cria um pipeline eficiente para capturar mensagens do Telegram, armazená-las na nuvem e disponibilizá-las para análise no AWS Athena . Ele demonstra boas práticas de ETL (Extract, Transform, Load) , utilizando ferramentas modernas para manipulação de dados de forma escalável. Essa abordagem pode ser aplicada a diversos casos, como monitoramento de grupos, análise de padrões e integração com sistemas de BI.