# Consumer - ETL | Kafka

Este notebook tem por objetivo conectar no kafka, consumir a mensagem do ano selecionado e enviar os dados brutos e tratados para o Postgres.

In [None]:
# !pip install confluent-kafka psycopg2-binary

In [None]:
from confluent_kafka import Consumer, KafkaError, TopicPartition
import psycopg2
import pandas as pd
from sqlalchemy import create_engine

In [None]:
ano="2022"

In [None]:
def f_cria_tabela(tabela):
    # Configurações de conexão com o PostgreSQL
    tb = tabela
    db_conf = {
        'host': 'postgres',
        'database': 'f1',
        'user': 'admin',
        'password': 'admin'
    }
    conn = psycopg2.connect(**db_conf)
    cur = conn.cursor()
    # cria a tabela, se nao existir (f1_schema.laps_{ano})
    query_create = f"""CREATE TABLE IF NOT EXISTS {tb}
    (
        col_1 text COLLATE pg_catalog."default"
    )
    TABLESPACE pg_default;
    ALTER TABLE IF EXISTS {tb}
        OWNER to admin;
    TRUNCATE TABLE {tb};
    """
    cur.execute(query_create)
    conn.commit()
    cur.close()

def f_trunca_tabela(tabela):
    # Configurações de conexão com o PostgreSQL
    tb = tabela
    db_conf = {
        'host': 'postgres',
        'database': 'f1',
        'user': 'admin',
        'password': 'admin'
    }
    conn = psycopg2.connect(**db_conf)
    cur = conn.cursor()
    # cria a tabela, se nao existir (f1_schema.laps_{ano})
    query_trunc = f"TRUNCATE TABLE {tb};"
    cur.execute(query_trunc)
    conn.commit()
    cur.close()
    

In [None]:
# kafka configuration
consumer_config = {
    'bootstrap.servers': 'kafka2:9093',  # Endereço do(s) broker(s) Kafka
    'group.id': 'consumo-teste',        # Identificador do grupo de consumidores
    'auto.offset.reset': 'earliest',         # Lê todas as mensagens disponíveis no tópico
    'client.id': 'consumidor_do_kafka'            # nome do client conectado
}
consumer = Consumer(consumer_config)
topic = f'f1-{ano}'  # Substitua pelo nome do seu tópico Kafka
partition = 0
offset = 0  # colocando 0, vmaos consumir sempre desde o inicio

# Atribua a partição e o offset ao consumidor
consumer.assign([TopicPartition(topic, partition, offset)])
# consumer.subscribe([topic])

# Configurações de conexão com o PostgreSQL
db_config = {
    'host': 'postgres',
    'database': 'f1',
    'user': 'admin',
    'password': 'admin'
}
connection = psycopg2.connect(**db_config)
cursor = connection.cursor()
    
# cria a tabela, se nao existir
f_cria_tabela(f"f1_schema.laps_{ano}")
# limpa a tabela de destino
f_trunca_tabela(f"f1_schema.tb_laps_{ano}")

contador = 0
while (contador <= 10):
    msg = consumer.poll(1)
    if msg is None:
        contador = contador + 1
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print('Fim da Partição')
        else:
            print('Erro no Consumidor: {}'.format(msg.error()))
    else:
        print('Recebido: {}'.format(msg.value()))
        # Pega o valor da mensagem
        data = msg.value()
    
        # Inserir dados brutos no Postgres
        insert_query = f"INSERT INTO f1_schema.laps_{ano} (col_1) VALUES (%s);"
        valor = data.decode('utf-8')
        cursor.execute(insert_query, (valor, ))
        connection.commit()
        
        # Inserir dados tratados no Postgres
        registro=[]
        registro.append(valor.replace('\n','').split(';'))
        df = pd.DataFrame(registro, columns=['raceId', 'driverId', 'lap', 'position', 'time', 'milliseconds'])
        df[['raceId','driverId', 'lap', 'position', 'milliseconds']] = df[['raceId','driverId', 'lap', 'position', 'milliseconds']].apply(pd.to_numeric)
        engine = create_engine('postgresql://admin:admin@postgres:5432/f1') 
        df.to_sql(name=f'tb_laps_{ano}', con=engine, schema='f1_schema', if_exists='append',index=False)

        contador = 0
        print(contador)

consumer.close()