<img src="https://raw.githubusercontent.com/andre-marcos-perez/ebac-course-utils/main/media/logo/newebac_logo_black_half.png" alt="ebac-logo">

---

# **Módulo** | Pipeline de Dados do Telegram II
Caderno de **Exercícios**<br> 
Professor [André Perez](https://www.linkedin.com/in/andremarcosperez/) <br>
Aluno [Jean Matheus](https://www.linkedin.com/in/jean-matheus-sm/)

---

# **Tópicos**

<ol type="1">
  <li>Ingestão;</li>
  <li>ETL;</li>
  <li>Apresentação;</li>
  <li>Storytelling.</li>
</ol>

---

# **Exercícios**

## 1\. Ingestão

1.1. Crie um `bucket` no `AWS S3` para o armazenamento de dados crus, não se esqueça de adicionar o sufixo `-raw`.

1.2. Crie uma função no `AWS Lambda` para recebimento das mensagens e armazenamento no formato JSON no `bucket` de dados crus. Não se esqueça de configurar as variáveis de ambiente e de adicionar as permissão de interação com `AWS S3` no `AWS IAM`.

> **Nota**: Para testar a função com evento do próprio `AWS Lambda`, substitua o código `message = json.loads(event["body"])` por `message = event`. Lembre-se que o primeiro só faz sentido na integração com o `AWS API Gateway`.

In [2]:
from getpass import getpass

token = getpass()

In [3]:
import json

base_url = f'https://api.telegram.org/bot{token}'

In [4]:
import requests

response = requests.get(url=f'{base_url}/getMe')

print(json.dumps(json.loads(response.text), indent=2))

{
  "ok": true,
  "result": {
    "id": 6090177567,
    "is_bot": true,
    "first_name": "jm_ebac_bot",
    "username": "jm_ebac_bot",
    "can_join_groups": false,
    "can_read_all_group_messages": false,
    "supports_inline_queries": false
  }
}


In [1]:
import os
import json
import logging
from datetime import datetime, timezone, timedelta

import boto3

def lambda_handler(event: dict, context: dict) -> dict:


    # variavéis de ambiente

    BUCKET = os.environ['AWS_S3_BUCKET']
    TELEGRAM_CHAT_ID = int(os.environ['TELEGRAM_CHAT_ID'])

    # variáveis lógicas

    tzinfo = timezone(offset=timedelta(hours=-3))
    date = datetime.now(tzinfo).strftime('%Y-%m-%d')
    timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

    filename = f'{timestamp}.json'

    # código principal

    client = boto3.client('s3')

    try:

        message = json.loads(event['body'])
        chat_id = message['message']['chat']['id']

        if chat_id == TELEGRAM_CHAT_ID:

            with open(f'/tmp/{filename}', mode='w', encoding='utf8') as fp:
                json.dump(message,fp)

            client.upload_file(f'/tmp/{filename}', BUCKET, f'telegram/context_date={date}/{filename}')

    except Exception as exc:
        logging.error(msg=exc)
        return dict(statusCode="500")

    else:
        return dict(statusCode="200")

1.3. Crie uma API no `AWS API Gateway` a conecte a função do `AWS Lambda`, conforme apresentado na aula.

> **Nota**: não disponibilize o endereço da API gerada.

In [5]:
aws_api_gateway_url = getpass()

1.4. Configura o *webhook* do *bot* através do método `setWebhook` da API de *bots* do **Telegram**. utilize o endereço da API criada no `AWS API Gateway`. Utilize o método `getWebhookInfo` para consultar a integração.

> **Nota**: não disponibilize o *token* de acesso ao seu *bot* da API de *bots* do **Telegram**.

In [6]:
response = requests.get(url=f'{base_url}/setWebhook?url={aws_api_gateway_url}')

print(json.dumps(json.loads(response.text), indent=2))

{
  "ok": true,
  "result": true,
  "description": "Webhook was set"
}


In [8]:
response = requests.get(url=f'{base_url}/getWebhookInfo')

print(json.dumps(json.loads(response.text), indent=2))

{
  "ok": true,
  "result": {
    "url": "https://7q4f6p210e.execute-api.sa-east-1.amazonaws.com/dev",
    "has_custom_certificate": false,
    "pending_update_count": 0,
    "last_error_date": 1685386028,
    "last_error_message": "Wrong response from the webhook: 500 Internal Server Error",
    "max_connections": 40,
    "ip_address": "18.231.22.79"
  }
}


## 2\. ETL

2.1. Crie um `bucket` no `AWS S3` para o armazenamento de dados enriquecidos, não se esqueça de adicionar o sufixo `-enriched`.

2.2. Cria uma função no `AWS Lambda` para processar as mensagens JSON de uma única partição do dia anterior (D-1), armazenadas no *bucket* de dados crus. Salve o resultado em um único arquivo PARQUET, também particionado por dia. Não se esqueça de configurar as variáveis de ambiente, de adicionar as permissão de interação com `AWS S3` no `AWS IAM`, de configurar o *timeout* e de adicionar a *layer* com o código do pacote Python PyArrow.

> **Nota**: Para testar a função, substitua o código `date = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d')` por `date = (datetime.now(tzinfo) - timedelta(days=0)).strftime('%Y-%m-%d')`, permitindo assim o processamento de mensagens de um mesmo dia.

2.3. Crie uma regra no `AWS Event Bridge` para executar a função do `AWS Lambda` todo dia a meia noite no horário de Brasília (GMT-3).

In [None]:
import os
import json
import logging
from datetime import datetime, timedelta, timezone

import boto3
import pyarrow as pa
import pyarrow.parquet as pq

def lambda_handler(event: dict, context: dict) -> bool:

    # variável de ambiente
    RAW_BUCKET = os.environ['AWS_S3_BUCKET']
    ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']

    # variável lógica
    tzinfo = timezone(offset=timedelta(hours=-3))
    date = (datetime.now(tzinfo)- timedelta(days=0)).strftime('%Y-%m-%d')  #day  = (1: dia anterior, 0: dia atual)
    timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

    # código principal
    table = None
    client = boto3.client('s3')

    try:

        response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix=f'telegram/context_date={date}')

        for content in response['Contents']:

            key = content['Key']
            client.download_file(RAW_BUCKET, key, f"/tmp/{key.split('/')[-1]}")

            with open(f"/tmp/{key.split('/')[-1]}", mode='r', encoding='utf8') as fp:

                data = json.load(fp)
                data = data['message']

            parsed_data = parse_data(data=data)
            iter_table = pa.Table.from_pydict(mapping=parsed_data)

            if table:

                table = pa.concat_tables([table, iter_table])

            else:

                table = iter_table
                iter_table = None

        pq.write_table(table=table, where=f'/tmp/{timestamp}.parquet')
        client.upload_file(f'/tmp/{timestamp}.parquet', ENRICHED_BUCKET, f'telegram/context_date={date}/{timestamp}.parquet')

        return True
    
    except Exception as exc:
        logging.error(msg=exc)

        return False
        

In [None]:
def parse_data(data: dict) -> dict:
    date = datetime.now().strftime('%Y-%m-%d')
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    parsed_data = dict()

    for key, value in data.items():

        if key == 'from':
            for k, v in data[key].items():
                if k in ['id', 'is_bot','first_name']:
                    parsed_data[f'{key if key == "chat" else "user"}_{k}'] = [v]

        elif key == 'chat':
            for k, v in data[key].items():
                if k in ['id', 'type']:
                    parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

        elif key in ['message_id', 'date', 'text']:
          parsed_data[key] = [value]

    if not 'text' in parsed_data.keys():
        #parsed_data['text'] = [None] Não funciona
        parsed_data['text'] = ['']  #funcionou

    return parsed_data


## 3\. Apresentação

3.1. Cria a tabela no `AWS Athena` que aponte para os dados armazenados no bucket enriquecido do `AWS S3`.

3.2. Execute o comando `MSCK REPAIR TABLE <nome-tabela>` para carregar as partições.

3.3. Execute as consultas SQL para explorar os dados.

>Sugestão para onde pode ser usado o chatbot e os dados armazenados

### Consultas SQL para exploração dos dados extraidos

* Criando tabela no AWS Athena
```sql
CREATE EXTERNAL TABLE `telegram`(
  `message_id` bigint, 
  `user_id` bigint, 
  `user_is_bot` boolean, 
  `user_first_name` string, 
  `chat_id` bigint, 
  `chat_type` string, 
  `text` string, 
  `date` bigint)
PARTITIONED BY ( 
  `context_date` date)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://ebac-jean-m44-datalake-enriched/telegram/'

```

* Adicionando uma partição (de uma nova data a tabela original)
```sql
MSCK REPAIR TABLE `telegram`;
```

* Quantidades de mensagens por dia
```sql
SELECT context_date, count(1) AS "message_amount"
FROM "telegram"
GROUP BY context_date
ORDER BY context_date DESC
```
![sql1](https://github.com/jeanmatheuss/projeto-final-EBAC/blob/main/img/sql1.png?raw=true)

* Quantidade de mensagens por usuário por dia
```sql
SELECT user_id, user_first_name, context_date, count(1) AS "message_amount"
FROM "telegram"
GROUP BY user_id, user_first_name, context_date
ORDER BY context_date DESC
```
![sql2](https://github.com/jeanmatheuss/projeto-final-EBAC/blob/main/img/sql2.png?raw=true)

* Média do tamanho das mensagens por usuário
```sql
SELECT user_id, user_first_name, context_date, CAST(AVG(length(text)) AS INT) AS "avg_msg_length"
FROM "telegram"
GROUP BY user_id, user_first_name, context_date
ORDER BY context_date DESC
```
![sql3](https://github.com/jeanmatheuss/projeto-final-EBAC/blob/main/img/sql3.png?raw=true)

* Quantidade de mensagens por hora por dia da semana por número da semana
```sql
WITH 
parsed_date_cte AS (
        SELECT 
            *,
            CAST(date_format(from_unixtime("date"), '%Y-%m-%d %H:%i:%s') AS timestamp) AS parsed_date
        FROM "telegram"
),
hour_week_cte AS (
        SELECT
            *,
            EXTRACT(hour FROM parsed_date) AS parsed_date_hour,
            EXTRACT(dow FROM parsed_date) AS parsed_date_weekday,
            EXTRACT(week FROM parsed_date) AS parsed_date_weeknum
        FROM parsed_date_cte
)
SELECT parsed_date_hour, parsed_date_weekday, parsed_date_weeknum, count(1) AS "message_amount"
FROM hour_week_cte
GROUP BY parsed_date_hour, parsed_date_weekday, parsed_date_weeknum
ORDER BY  parsed_date_weekday, parsed_date_weeknum
```
![sql4](https://github.com/jeanmatheuss/projeto-final-EBAC/blob/main/img/sql4.png?raw=true)

## 4\. Storytelling

# Utilizando Chatbots do Telegram para Aperfeiçoar o Atendimento ao Cliente

## Introdução

Os chatbots, uma forma de inteligência artificial (IA) conversacional, estão desempenhando um papel crescente na interação entre empresas e clientes. Neste artigo, exploraremos como os chatbots podem ser uma ferramenta valiosa para aprimorar o atendimento ao cliente e fornecer uma visão detalhada de como aproveitar os chatbots do Telegram, juntamente com sua API, para criar um serviço eficaz.

### O Que é um Chatbot?

Antes de entrarmos em detalhes, é fundamental entender o que é um chatbot. Um chatbot é um programa de computador projetado para interagir com humanos por meio de conversas textuais ou faladas. Eles podem automatizar tarefas, responder a perguntas, coletar informações e até mesmo realizar transações. Os chatbots estão se tornando cada vez mais comuns em diversas indústrias, proporcionando eficiência e melhorias significativas na experiência do cliente.

### Por que usar um Chatbot?

Imagine uma empresa que deseja melhorar o atendimento ao cliente. Ela enfrenta desafios, como fornecer respostas rápidas a perguntas frequentes e gerenciar consultas de clientes em grande escala. É aqui que um chatbot entra em jogo. Os chatbots podem ser implementados em sites, aplicativos móveis ou plataformas de mensagens, como o Telegram, para fornecer assistência imediata e personalizada aos clientes. Isso economiza tempo, aumenta a satisfação do cliente e permite que os agentes humanos se concentrem em questões mais complexas.

## Etapas do Projeto

Para implementar com sucesso um chatbot no Telegram, é necessário seguir algumas etapas fundamentais:

Ingestão de Dados: As mensagens capturadas pelo bot são ingeridas por meio da API web de bots do Telegram, que fornece os dados no formato JSON.

ETL (Extração, Transformação e Carregamento): Os dados ingeridos são então submetidos a um processo de ETL para limpar, transformar e preparar os dados para análise.

Apresentação dos Dados: Após o ETL, os dados são armazenados em um formato adequado para análise. Aqui, podem ser usadas ferramentas de armazenamento de dados, como bancos de dados ou sistemas de armazenamento em nuvem.

## Exploração dos Dados

Nesta etapa, exploraremos as fontes e destinos dos dados:

Fonte: Os dados são provenientes da API de bots do Telegram, acessados por meio do método "getUpdates". Esses dados incluem mensagens de texto, informações do usuário, horários e muito mais.

Destino: Os dados são armazenados em um banco de dados ou sistema de armazenamento em nuvem (AWS S3). A análise dos dados é realizada por meio de consultas SQL (AWS Athena) e visualizações para extrair informações valiosas.

------

## Contexto
Neste projeto desenvolveremos um chatbot para o Telemgram que capta mensagens via API do Telegram para captar mensagens dos usuários, com isso em mente podemos utilizar esse bot para diversas funcionalidades, como por exemplo uma empresa que deseja monitorar as mensagens para obter alguns insights que possam ajudar a melhorar a experiência do usuário, monitorar sua satisfação com o serviço fornecido e gerar informações úteis para a empresa.

>ChatBot Telegram: é uma ferramenta usada para interagir com o usuário com regras pré progamadas para execução de tarefas simples.

![img](https://github.com/jeanmatheuss/projeto-final-EBAC/blob/main/img/img_arquitetura.png?raw=true)

#### Passos para Configuração

1.Criamos um bot no Telegram e obtviemos as credenciais necessárias.   

2.Configure um webhook para redirecionar as mensagens para a API web criada no AWS API Gateway. 

3.Configure o AWS API Gateway para receber os dados redirecionados e encaminhá-los ao AWS Lambda.  

4.Crie uma função no AWS Lambda para processar e armazenar as mensagens no formato JSON. 

5.Configure permissões apropriadas para que o AWS Lambda possa acessar o AWS S3.  

6.Crie um bucket no AWS S3 para armazenar as mensagens capturadas.  

7.Teste o fluxo completo: envie mensagens ao bot do Telegram e verifique se elas são capturadas, processadas e armazenadas corretamente no AWS S3. 



### Extração e consultas no SQL

Agora extraimos as mensagens obtidas e que foram armazenadas no AWS S3, usando o AWS Athena para essas consultas