
# **PIPELINE DE DADOS - BOT TELEGRAM**
***Notebook que tem como finalidade ser entregue para a atividade final do curso de Análise de Dados, da EBAC (Escola Britânica de artes Criativas).***

Repositório do Github com as informações / imagens que usei para o notebook:
- https://github.com/matheus10-2022/pipeline_dados-telegram

Meu Linkedin:
- https://www.linkedin.com/in/matheus-henrique-dlm/

## Contexto:
(Fictício para fins acadêmicos)

Simularei uma situação de uma Análise de Dados através da captação de mensagens de um **chatbot** criado no **Telegram**, junto ao **Pipeline de Dados**. 

A situação será de uma micro-empresa que fornece serviços a uma pequena rede de clientes, e que está com dúvidas das mensagens recebidas pelos clientes em um determinado dia. Mostrarei o exemplo na parte final, apartir dos Analytics.

#### Chatbot?

Um chatbot é um tipo de software que interage com usuários através de conversas automatizadas em plataformas de mensagens. Uma aplicação comum de chatbots é o seu uso no atendimento ao cliente, onde, de maneira geral, ajudam clientes a resolver problemas ou esclarecer dúvidas recorrentes antes mesmo que um atendente humano seja acionado.

#### Telegram?

Telegram é uma plataforma de mensagens instantâneas freeware (distribuído gratuitamente) e, em sua maioria, open source. É muito popular entre desenvolvedores por ser pioneiro na implantação da funcionalidade de criação de chatbots, que, por sua vez, permitem a criação de diversas automações.

#### Arquitetura?

Uma atividade analítica de interesse é a de realizar a análise exploratória de dados enviadas a um chatbot para responder perguntas como:
- Qual o horário que os usuários mais acionam o bot?
- Qual o problema ou dúvida mais frequente?
- O bot está conseguindo resolver os problemas ou esclarecer as dúvidas?
- Entre outras.

#### Pipeline de Dados?

Um pipeline de dados é uma série de etapas de processamento para preparar dados corporativos para análise. As organizações têm um grande volume de dados de várias fontes, como aplicativos, dispositivos de Internet das Coisas (IoT) e outros canais digitais. No entanto, os dados brutos são inúteis; eles devem ser movidos, classificados, filtrados, reformatados e analisados para business intelligence. Um pipeline de dados inclui várias tecnologias para verificar, resumir e encontrar padrões nos dados para informar as decisões de negócios. Pipelines de dados bem organizados oferecem suporte a vários projetos de big data, como visualizações de dados, análises exploratórias de dados e tarefas de machine learning. (Fonte: AMAZON AWS - https://aws.amazon.com/pt/what-is/data-pipeline/)


- *Irei construir um pipeline de dados que ingira, processe, armazene e exponha mensagens de um grupo do Telegram para que profissionais de dados possam realizar análises. A arquitetura proposta é dividida em duas: **transacional**, no Telegram, onde os dados são produzidos, e **analítica**, na Amazon Web Services (AWS), onde os dados são analisados.*

![](https://github.com/matheus10-2022/pipeline_dados-telegram/blob/main/contexto%20projeto_final.png?raw=true)

- O Telegram representa a fonte de dados transacionais. Mensagens enviadas por usuários em um grupo são capturadas por um bot e redirecionadas via webhook do backend do aplicativo para um endpoint (endereço web que aceita requisições HTTP) exposto pelo AWS API Gateway. As mensagens trafegam no corpo ou payload da requisição.

#### AWS | Ingestão
Uma requisição HTTP com o conteúdo da mensagem em seu payload é recebia pelo AWS API Gateway que, por sua vez, as redireciona para o AWS Lambda, servindo assim como seu gatilho. Já o AWS Lambda recebe o payload da requisição em seu parâmetro event, salva o conteúdo em um arquivo no formato JSON (original, mesmo que o payload) e o armazena no AWS S3 particionado por dia.

#### AWS | ETL
Uma vez ao dia, o AWS Event Bridge aciona o AWS Lambda que processa todas as mensagens do dia anterior (atraso de um dia ou D-1), denormaliza o dado semi-estruturado típico de arquivos no formato JSON, salva o conteúdo processado em um arquivo no formato Apache Parquet e o armazena no AWS S3 particionado por dia.

#### AWS | Apresentação
Por fim, uma tabela do AWS Athena é apontada para o bucket do AWS S3 que armazena o dado processado: denormalizado, particionado e orientado a coluna. Profissionais de dados podem então executar consultas analíticas (agregações, ordenações, etc.) na tabela utilizando o SQL para a extração de insights.

## Criação do Bot:

Com a conta criada no Telegram, abri o chat com o **Botfather** para a criação do BOT (\start):
- Digitei **\newbot**
- Digitei o **nome do bot**, e logo após o **nome de usuário (precisa terminar com o sufixo _bot)**
- OBS: É preciso salvar o token de acesso a API HTTP em um **local seguro**.

![](https://github.com/matheus10-2022/pipeline_dados-telegram/blob/main/passos-criar_bot-2.PNG?raw=true)

Para ativar o BOT:
- Iniciei o chat com o BOT, e digitei **\start**.

### Criação do grupo com o Bot
Com o grupo criado, adicionei o BOT como administrador para receber todas as mensagens do grupo. Uma outra opção seria desabilitar o seu modo de privacidade.

Abra o chat com o BotFather:
- Digite **/mybots**;
- Selecione o bot pelo seu nome de usuário;
- Selecione **Bot Settings**;
- Selecione **Allow Groups?**;
- Selecione **Turn groups off**.

![](https://raw.githubusercontent.com/matheus10-2022/pipeline_dados-telegram/1dbe257493689abc2e859adc9062c1b4178adb6d/permissoes-bot1.PNG)

![](https://raw.githubusercontent.com/matheus10-2022/pipeline_dados-telegram/1dbe257493689abc2e859adc9062c1b4178adb6d/permissoes-bot2.PNG)

> **OBS:** Na imagem já fiz os passos, após todos, essa será a tela final.

### Bot API:
As mensagens captadas por um bot podem ser acessadas via API. **A única informação necessária é o token de acesso fornecido pelo BotFather na criação do bot.**
> **OBS:** A documentação completa da API pode ser encontrada neste [link](https://core.telegram.org/bots/api).

Usarei um código para pegar a URL em formato oculto, como uma senha. Para isso irei usar a lib **GETPASS**:


In [None]:
from getpass import getpass

token = getpass()

- A url base é comum a todos os métodos da API:

In [None]:
import json

import requests

base_url = f'https://api.telegram.org/bot{token}'

**Método getMe**
- Retorna informações sobre o bot:



In [None]:
response = requests.get(url=f'{base_url}/getMe')
print(f'{base_url}/getMe')

print(json.dumps(json.loads(response.text), indent=2))

**Método getUpdates**
- Retorna as mensagens captadas pelo bot:

In [None]:
response = requests.get(url=f'{base_url}/getUpdates')

print(json.dumps(json.loads(response.text), indent=2))

## Dados

- ***Antes de avançar para etapa analítica, vamos trabalhar na manipulação dos dados de mensagens do Telegram.***

### Mensagem:
Uma mensagem recuperada via API é um dado semi-estruturado no formato JSON com algumas chaves mandatórias e diversas chaves opcionais, estas últimas presentes (ou não) dependendo do tipo da mensagem. Por exemplo, mensagens de texto apresentam a chave **text** enquanto mensagens de áudio apresentam a chave **audio**. Neste projeto vamos focar em mensagens do tipo texto, ou seja, vamos ingerir as chaves mandatórias e a chave **text**.

> ***OBS***: A lista completa das chaves disponíveis pode ser encontrada na documentação neste [link](https://core.telegram.org/bots/api#message).

In [None]:
# Exemplo:
%%writefile telegram.json
{
    "update_id": 123,
    "message": {
        "message_id": 1,
        "from": {
            "id": 321,
            "is_bot": false,
            "first_name": "Andre"
        },
        "chat": {
            "id": -789,
            "type": "group"
        },
        "date": 1640995200,
        "text": "Ola, mundo!"
    }
}

- **Descrição:**

![](https://github.com/matheus10-2022/pipeline_dados-telegram/blob/main/descricao-texto_json.PNG?raw=true)


## Ingestão

A etapa de ingestão é responsável, como seu o próprio nome diz, **pela ingestão dos dados transacionais em ambientes analíticos**. De maneira geral, o dado ingerido é persistido no formato mais próximo do original, ou seja, nenhuma transformação é realizada em seu conteúdo ou estrutura (schema). Como exemplo, dados de uma API web que segue o formato REST (representational state transfer) são entregues, logo, persistidos, no formato JSON.

> ***OBS***: Persistir os dados em seu formato original traz muitas vantagens, como a possibilidade de reprocessamento.

Ela pode ser conduzida de duas formas:

- ***Batch***: blocos de dados são ingeridos em uma frequência bem definida, geralmente na escala de horas ou dias;
- ***Streaming***: dados são ingeridos conforme são produzidos e disponibilizados.

No projeto, as mensagens capturadas pelo bot podem ser ingeridas através da API web de bots do Telegram, portanto são fornecidos no formato JSON. Como o Telegram retem mensagens por apenas 24h em seus servidores, a ingestão via streaming é a mais indicada. Para que seja possível esse tipo de ingestão seja possível, vamos utilizar um webhook (gancho web), ou seja, vamos redirecionar as mensagens automaticamente para outra API web.

Sendo assim, precisamos de um serviço da AWS que forneça um API web para receber os dados redirecionados, o AWS API Gateway (documentação neste [link](https://docs.aws.amazon.com/pt_br/apigateway/latest/developerguide/welcome.html)). Dentre suas diversas funcionalidades, o AWS API Gateway permite o redirecionamento do dado recebido para outros serviços da AWS. Logo, vamos conecta-lo ao AWS Lambda, que pode sua vez, irá armazenar o dado em seu formato original (JSON) em um bucket do AWS S3.

Portanto, irei:
- Criar um bucket no ***AWS S3***, uma função no ***AWS lambda*** e uma API web no ***AWS API gateway***, além de configurar o webhook da API de BOTS do ***Telegram***.

### AWS S3:
Criei um bucket com o nome ***modulofinal-ebac-datalake-raw***. Como padrão, irei deixar o sufixo ***-raw*** em seu nome.

![](https://github.com/matheus10-2022/pipeline_dados-telegram/blob/main/bucket-aws-datalake.PNG?raw=true)

> ***OBS:*** um data lake é o nome dado a um repositório de um grande volume dados. É organizado em zonas que armazenam replicadas dos dados em diferentes níveis de processamento. A nomenclatura das zonas variam, contudo, as mais comuns são: ***raw e enriched ou bronze, silver e gold***.

### AWS lambda:
Aqui o ***AWS Lambda*** tem a função de ativamente persistir as mensagens captadas pelo bot do ***Telegram*** em um ***bucket do AWS S3***.

* Código da função:


In [None]:
import os
import json
import logging
from datetime import datetime, timezone, timedelta

import boto3


def lambda_handler(event: dict, context: dict) -> dict:

  '''
  Recebe uma mensagens do Telegram via AWS API Gateway, verifica no
  seu conteúdo se foi produzida em um determinado grupo e a escreve, 
  em seu formato original JSON, em um bucket do AWS S3.
  '''

  # vars de ambiente

  BUCKET = os.environ['AWS_S3_BUCKET']
  TELEGRAM_CHAT_ID = int(os.environ['TELEGRAM_CHAT_ID'])

  # vars lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = datetime.now(tzinfo).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  filename = f'{timestamp}.json'

  # código principal

  client = boto3.client('s3')
  
  try:

    message = json.loads(event["body"])
    chat_id = message["message"]["chat"]["id"]

    if chat_id == TELEGRAM_CHAT_ID:

      with open(f"/tmp/{filename}", mode='w', encoding='utf8') as fp:
        json.dump(message, fp)

      client.upload_file(f'/tmp/{filename}', BUCKET, f'telegram/context_date={date}/{filename}')

  except Exception as exc:
      logging.error(msg=exc)
      return dict(statusCode="500")

  else:
      return dict(statusCode="200")

> ***OBS:*** Precisamos ainda fazer dois ajustes: 

- ***Variáveis de ambiente:*** 

Note que o código exige a configuração de ***duas variáveis de ambiente***: ***AWS_S3_BUCKET*** com o nome do bucket do AWS S3 e ***TELEGRAM_CHAT_ID*** com o id do chat do grupo do Telegram. Fiz esse ajuste nas configurações do lambda:

![](https://github.com/matheus10-2022/pipeline_dados-telegram/blob/main/variaveis_ambiente_lambda.PNG?raw=true)

- ***Permissões***

Por fim, é preciso adicionar a permissão de escrita no bucket do AWS S3 para a função do AWS Lambda no AWS IAM.


### AWS API Gateway

Aqui o ***AWS API Gateway*** tem a função de receber as mensagens captadas pelo bot do Telegram, enviadas via webhook, e iniciar uma função do AWS Lambda, passando o conteúdo da mensagem no seu parâmetro event. Para tanto eu criei uma API e configurei como gatilho da função do AWS Lambda.

![](https://github.com/matheus10-2022/pipeline_dados-telegram/blob/main/api-gateway.PNG?raw=true)


Após o deploy da API e com o seu endereço web em mãos, configurei o webhook para redirecionar as mensagens para a url do ***AWS API Gateway***. 

In [None]:
aws_api_gateway_url = getpass()  
# coloquei o código nessa variável, utilizando aquela lib de 
# deixar a variável oculta.

***SetWebhook***

- Configura o redirecionamento das mensagens captadas pelo bot para o endereço web do parâmetro ***url***.

> ***OBS:*** os métodos ***getUpdates*** e ***setWebhook*** são mutualmente exclusivos, ou seja, enquanto o webhook estiver ativo, o método getUpdates não funcionará. Para desativar o webhook, basta utilizar o método deleteWebhook.

In [None]:
response = requests.get(url=f'{base_url}/setWebhook?url={aws_api_gateway_url}')

print(json.dumps(json.loads(response.text), indent=2))

***getWebhookInfo***

- Retorna as informações sobre o webhook configurado.

In [None]:
response = requests.get(url=f'{base_url}/getWebhookInfo')

print(json.dumps(json.loads(response.text), indent=2))

## ETL (extração, transformação e carregamento)

No projeto, as mensagens de um único dia, persistidas na camada cru, serão compactas em um único arquivo, orientado a coluna e comprimido, que será persistido em uma camada enriquecida. Além disso, durante este processo, o dado também passará por etapas de data wrangling.

Para isso, utilizei uma função do ***AWS Lambda*** como motor de processamento e um bucket do ***AWS S3*** como camada enriquecida (-enriched) para a persistência do dado processado. Para garantir a recorrência, configurei uma regra do ***AWS Event Bridge*** como gatilho diário da função.

### AWS S3:

Aqui o AWS S3 tem a função de passivamente armazenar as mensagens processadas de um dia em um único arquivo no formato Parquet. No caso criei o bucket ***modulofinal-ebac-datalake-enriched***, com o -enriched como padrão no fim.

![](https://github.com/matheus10-2022/pipeline_dados-telegram/blob/main/bucket-aws-datalake-enriched.PNG?raw=true)


### AWS Lambda:

Aqui o AWS Lambda tem a função de ativamente processar as mensagens captadas pelo bot do Telegram, persistidas na camada cru no bucket do AWS S3, e persisti-las na camada enriquecida, também em um bucket do AWS S3.

**A função criada terá como objetivo:**
- Listar todos os arquivos JSON de uma única participação da camada crua de um bucket do AWS S3;
- Para cada arquivo listado:
- Faz o download do arquivo e carrega o conteúdo da mensagem;
- Executa uma função de data wrangling;
- Cria uma tabela do PyArrow e a contatena com as demais.
- Persiste a tabela no formato Parquet na camada enriquecida em um bucket do AWS S3.

In [None]:
import os
import json
import logging
from datetime import datetime, timedelta, timezone

import boto3
import pyarrow as pa
import pyarrow.parquet as pq


def lambda_handler(event: dict, context: dict) -> bool:

  '''
  Diariamente é executado para compactar as diversas mensagens, no formato
  JSON, do dia anterior, armazenadas no bucket de dados cru, em um único 
  arquivo no formato PARQUET, armazenando-o no bucket de dados enriquecidos
  '''

  # vars de ambiente

  RAW_BUCKET = os.environ['AWS_S3_BUCKET']
  ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']

  # vars lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = (datetime.now(tzinfo) - timedelta(days=0)).strftime('%Y-%m-%d')
  # No date posso mudar a quantidade de dias de 1 para 0. 
  # Nesse caso mudei pois não queria pegar as mensagens do dia anterior.
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  # código principal

  table = None
  client = boto3.client('s3')

  try:

      response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix=f'telegram/context_date={date}')

      for content in response['Contents']:

        key = content['Key']
        client.download_file(RAW_BUCKET, key, f"/tmp/{key.split('/')[-1]}")

        with open(f"/tmp/{key.split('/')[-1]}", mode='r', encoding='utf8') as fp:

          data = json.load(fp)
          data = data["message"]

        parsed_data = parse_data(data=data)
        iter_table = pa.Table.from_pydict(mapping=parsed_data)

        if table:

          table = pa.concat_tables([table, iter_table])

        else:

          table = iter_table
          iter_table = None
          
      pq.write_table(table=table, where=f'/tmp/{timestamp}.parquet')
      client.upload_file(f"/tmp/{timestamp}.parquet", ENRICHED_BUCKET, f"telegram/context_date={date}/{timestamp}.parquet")

      return True
  
  except Exception as exc:
      logging.error(msg=exc)
      return False

- Também o código do ***Data Wrangling:***


In [None]:
def parse_data(data: dict) -> dict:

  date = datetime.now().strftime('%Y-%m-%d')
  timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

  parsed_data = dict()

  for key, value in data.items():

      if key == 'from':
          for k, v in data[key].items():
              if k in ['id', 'is_bot', 'first_name']:
                parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key == 'chat':
          for k, v in data[key].items():
              if k in ['id', 'type']:
                parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key in ['message_id', 'date', 'text']:
          parsed_data[key] = [value]

  if not 'text' in parsed_data.keys():
    parsed_data['text'] = [None]

  return parsed_data

Aqui também temos que fazer os mesmos ajustes das ***variáveis de ambiente*** e das ***permissões***, como foi feito no Lambda do arquivo -raw. Além disso também existe outras 2 configurações:

- Recursos
O timeout padrão de funcões do AWS Lambda é de 3 segundos. Para a função, vamos aumentar o tempo ***para 5 minutos***, principalmente para lidar com o IO (input/output) de arquivos do ***AWS S3***.

![](https://github.com/matheus10-2022/pipeline_dados-telegram/blob/main/config-recursos-lambda.PNG?raw=true)

- Camadas
Por fim, note que o código da função utiliza o pacote Python PyArrow. Contudo, o ambiente padrão do AWS Lambda possui poucos pacotes externos instalado, como o pacote Python boto3, logo o PyArrow não será encontrado e a execução da função falhará. Existem algumas formas de adicionar pacotes externos no ambiente de execução do AWS Lambda, um deles é a criação de camadas ou layers, onde podemos fazer o upload dos pacotes Python direto na plataforma ou através de um bucket do AWS S3. Então acabei seguindo a última opção, onde:

Crio um bucket no ***AWS S3*** (com o final -layer);

Faço o upload do código do pacote Python do PyArrow (download neste [link](https://github.com/aws/aws-sdk-pandas/releases)) ***nome do arquivo que usei: awswrangler-layer-3.1.0-py3.8-arm64.zip***);

Crio o layer e conecto na função.


### AWS Event Bridge

Aqui o  o ***AWS Event Bridge*** tem a função de ativar diariamente a função de ETL do AWS Lambda, funcionando assim como um scheduler.

## Apresentação

- Na etapa de apresentação, o AWS Athena tem função de entregar o dados através de uma interface SQL para os usuários do sistema analítico. Para criar a interface, basta criar uma tabela externa sobre o dado armazenado na camada mais refinada da arquitetura, a camada enriquecida.

In [None]:
CREATE EXTERNAL TABLE `telegram`(
  `message_id` bigint, 
  `user_id` bigint, 
  `user_is_bot` boolean, 
  `user_first_name` string, 
  `chat_id` bigint, 
  `chat_type` string, 
  `text` string, 
  `date` bigint)
PARTITIONED BY ( 
  `context_date` date)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<bucket-enriquecido>/' # Aqui será o nome do bucket que é enriched

## Analytics

- Com isso, os dados já estão na tabela. Irei aqui simular situações no meu banco de dados atual no AWS ATHENA, que foram mensagens que eu digitei no Bot, nos dias 23 e 24/05/2023.

***Exemplo com as primeiras 10 linhas (separei em 2 fotos por causa do tamanho):***

![](https://github.com/matheus10-2022/pipeline_dados-telegram/blob/main/query1-exemplo-part1.PNG?raw=true)

![](https://github.com/matheus10-2022/pipeline_dados-telegram/blob/main/query1-exemplo-part2.PNG?raw=true)

- Código usado:
> ***SELECT * FROM "telegram" ORDER BY "message_id" LIMIT 10;***





### Storytelling

Como você pode ter visto, fiz uma simulação e a ingestão de algumas mensagens para o bot, simulando um atendimento a clientes pelo Bot de um provedor de televisão, em um determinado dia. Vamos dar uma olhada inicial nas mensagens recebidas no dia 24: 

![](https://github.com/matheus10-2022/pipeline_dados-telegram/blob/main/query2-mensagens-dia_24.PNG?raw=true)
 - Código usado:
> ***SELECT message_id, user_id, user_first_name, text FROM "telegram" WHERE DAY(context_date) = 24 LIMIT 20;***

> ***OBS:*** A simulação ocorreu apenas por mim, onde simulei mensagens de pessoas diferentes. A princípio iremos desconsiderar o campo "*user_first_name*".

Nessa semana, houve a entrada de novos canais no catálogo da empresa. Tal mudança ocorre de forma automática para todos os clientes, porém, havíamos recebido uma ligação isolada de um cliente, onde ele apontava se o canal **"PARAMOUNT+"**iria entrar no catálogo, sendo que já estava. Com isso, percebemos que podia estar havendo algumas dúvidas com outros clientes, mas pelo chatbot.

Podemos verificar nas mensagens do dia, se em alguma há o uso do nome desse canal:

![](https://github.com/matheus10-2022/pipeline_dados-telegram/blob/main/query3-paramount+.PNG?raw=true)

- Código usado:
> ***SELECT message_id, user_id, text FROM "telegram" WHERE DAY(context_date) = 24 and text LIKE '%PARAMOUNT+%;***

>***OBS:*** Realizando o count nessa tabela, percebemos que 8 clientes mencionaram, onde em sua maioria tinha dúvidas a respeito do canal. Com isso podemos concluir que a inserção desse canal para os clientes causou uma certa dificuldade, deixando-os meio confusos. 

- Uma das soluções seria mover a posição desse canal dentro do catálogo, ou mesmo criar algum aviso geral aos clientes a respeito desses e outros canais novos que foram inseridos.

Vamos dar uma olhada para verificar na quantidade de mensagens recebidas no dia que estamos fazendo a análise:

![](https://github.com/matheus10-2022/pipeline_dados-telegram/blob/main/query4-quant_mensagens.PNG?raw=true)

- Código usado:
> ***SELECT COUNT(*) AS quantidade_mensagens FROM "telegram" WHERE DAY(context_date) = 24;***

- Podemos observar que houve 17 mensagens no período analisado, ou seja, as reclamações com o motivo da entrada no novo canal representaram quase 50% das mensagens totais do dia!

Após uma reunião semana passada, decidimos que nosso chatbot devia receber uma média de tamanho de mensagens de ***no máximo 65***. Vamos verificar se a média do dia escolhido foi atingida:

![](https://github.com/matheus10-2022/pipeline_dados-telegram/blob/main/query5-tamanho-mensagens.PNG?raw=true)

- Código usado:
> ***SELECT 
  context_date,
  CAST(AVG(length(text)) AS INT) AS "media_tamanho_mensagens" 
  FROM "telegram" WHERE DAY(context_date) = 24
  GROUP BY 
  context_date 
  ORDER BY context_date DESC;***
  
  - Podemos concluir que o tamanho médio das mensagens recebidas pelo bot, está dentro da meta estabelecida.

## Conclusão

Com isso, podemos concluir que a criação de uma Pipeline de Dados junto ao Bot do Telegram é uma ferramenta muito rica e cheia de oportunidades! Várias análises podem ser feitas apartir de vários contextos, e principalmente automações podem ser reproduzidas, para maximizar o desempenho do obejtivo a ser alcançado.