<!-- Projeto Desenvolvido na Data Science Academy - www.datascienceacademy.com.br -->
### <font color='blue'>Monitoramento de Criptomoedas em Tempo Real com Kafka, MongoDB e Streamlit</font>
### <font color='blue'>Criação do Producer Kafka</font>

In [None]:
# Imports
import json
import time 
import requests
import pandas as pd
from kafka import KafkaProducer
from datetime import datetime
from json import dumps
from configparser import RawConfigParser

In [None]:
# Carrega o arquivo de configuração
config_local = RawConfigParser()
config_local.read("config.conf")

Inicialização do Kafka Producer
<!-- Projeto Desenvolvido na Data Science Academy - www.datascienceacademy.com.br -->
- O KafkaProducer é inicializado com bootstrap_servers apontando para os brokers do Kafka e value_serializer é usado para serializar dados (objetos Python) no formato JSON e codificá-los em bytes antes de enviá-los ao Kafka.

In [None]:
# Definimos as configurações
server = config_local['Host']['ip']
port = config_local['Host']['port']
server = [f"{server}:{port}"]

Codificamos as mensagens em UTF-8 para que elas possam ser enviadas corretamente ao Kafka e qualquer consumidor que leia essas mensagens possa decodificá-las como UTF-8 JSON.

In [None]:
# Cria o producer
producer = KafkaProducer(bootstrap_servers = server, value_serializer = lambda x:dumps(x).encode('utf-8'))

In [None]:
type(producer)

In [None]:
# Define a função para obter dados de criptomoedas através da API
def cryptoApi():
    
    # Obtém a URL da API de criptomoedas a partir da configuração local
    #url = config_local['CryptoCoinAPI']['url']
    url = f"{config_local['CryptoCoinAPI']['url']}?apiKey={config_local['CryptoCoinAPI']['api_key']}"
    
    # Faz uma requisição GET para a URL da API
    response = requests.get(url)      
    
    # Converte a resposta da API para o formato JSON
    resultado = response.json()          
    
    # Captura o timestamp atual no formato "YYYY-MM-DD HH:MM:SS"
    current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    # Inicializa um dicionário para armazenar os dados do top 10
    top_10 = {}
    
    # Adiciona o timestamp atual ao dicionário
    top_10['timestamp'] = current_timestamp
    
    # Adiciona os dados das 10 principais criptomoedas retornadas pela API
    top_10['data'] = resultado['data'][:10]   
    
    # Envia os dados do top 10 para o tópico Kafka
    producer.send('crypto-topic', value = top_10)
    
    # Retorna os dados do top 10 como resultado da função
    return top_10

In [None]:
# Chama a função 25 vezes com um intervalo de 15 segundos entre cada chamada.
# Altere a condição para while(True) para buscar continuamente dados em tempo real.
# Fique atento aos limites da API.

counter = 0
#while(True):
while counter < 25:

    # Contador
    counter += 1
    
    print(f"Chamando a Função Para Extrair Dados em Tempo Real. Contador: {counter}")

    # Tratamento de erros
    try:
        cryptoApi()
    except Exception as e:
        print(f"Erro ao chamar cryptoApi: {e}")
    
    print("Aguardando 15 segundos...")
    time.sleep(15)

# Fim