# Apache Kafka dende Python

Estrictamente non debería considerarse unha base de datos, aínda que ten moitos elementos dunha.

1. Instalamos as librarías necesarias

In [None]:
! conda install -c conda-forge kafka-python python-confluent-kafka

## Produtor

Importamos e configuramos un produtor. **Ollo!** Revisa que se poidan resolver tanto os nomes de host que poñas no array de **bootstrap.servers** e os que recibas na primeira conexión onde se che enviará información do cluster de brokers.

In [11]:
from confluent_kafka import Producer
import socket

conf = {'bootstrap.servers': "kafka.jfsanchez.es:9092,nifi.jfsanchez.es:9092",
        'client.id': socket.gethostname()}

producer = Producer(conf)

### Escrituras asíncronas

Por defecto as escrituras son asíncronas

In [12]:
topic="metamorfosis"
default_key="Autor"
default_value="Franz Kafka"

producer.produce(topic, key=default_key, value=default_value)

### Escrituras síncronas

Tras a chamada ao método **produce()** debemos forzar o envío da mensaxe cun **flush()**

In [13]:
producer.flush()

0

**Problema**: Queremos confirmar a recepción do dato para evitar perdas no cluster. Exemplo: Cáese xusto despois da recepción e non confirmou o dato.

**Solución**: Esperar polas confirmacións (ACK)

⚠️ **LEMBRA**: Os produtores poden pedir recibir confirmacións das escrituras:

- **acks=0**. O produtor non espera e poderían perderse datos
- **acks=1**. O produtor espera pola confirmación do líder (pérdida esporádica de datos cando cae o líder e non hai copias confirmadas).
- **acks=todas**. O produtor espera pola confirmación do líder e das réplicas co que idealmente non habería posbilidade de perda de datos.

O seguinte código esperará ata un segundo pola confirmación (podería seguir coa execución e fallar cando os ack non sexan os esperados).

In [14]:
def confirmacions_acks(erro, mensaxe):
    if erro is not None:
        print(f"Fallou a entrega da mensaxe: {mensaxe}: {erro}")
    else:
        print(f"Mensaxe producida: {mensaxe}")

producer.produce(topic, key=default_key, value=default_value, callback=confirmacions_acks)

# Espera ata un segundo por eventos. De recibir ACKS chámase ao método confirmacion_acks
producer.poll(1)

Mensaxe producida: <cimpl.Message object at 0x7f8e51419dc0>


1

## Consumidor

**Ollo!** Revisa que se poidan resolver tanto os nomes de host que poñas no array de **bootstrap.servers** e os que recibas na primeira conexión onde se che enviará información do cluster de brokers.

### Inicialización

In [23]:
from confluent_kafka import Consumer

conf = {'bootstrap.servers': "kafka.jfsanchez.es:9092,nifi.jfsanchez.es:9092",
        'group.id': "consumer_group_name"}

consumer = Consumer(conf)

### Subscrición básica

In [19]:
try:
    consumer.subscribe(['metamorfosis', 'kafkiano'])
    while True:
        mensaxe = consumer.poll(timeout=1.0)
        if mensaxe is None: continue
        print(mensaxe.key(), mensaxe.value())
finally:
    consumer.close()

None b'Hola'
None b'Hola'


KeyboardInterrupt: 

As versión recentes do módulo de Konfluentic non envían xa o KafkaError._PARTITION_EOF cando estamos ao final da partición.

Se queremos este comportamento, deberemos engadir ao array de configruación (**conf**) o parámetro: **'enable.partition.eof': True**

### Commits síncronos

Confirmación alomenos do líder

In [22]:
def procesa_mensaxe(mensaxe):
    print(mensaxe)

# Confirma polo menos cada 10 mensaxes
MIN_COMMIT_COUNT = 10
try:
    consumer.subscribe(['metamorfosis', 'kafkiano'])
    count = 0
    while True:
        mensaxe = consumer.poll(timeout=1.0)
        if mensaxe is None: continue
        procesa_mensaxe(mensaxe)
        count += 1
        if count % MIN_COMMIT_COUNT == 0:
            consumer.commit(asynchronous=False)
finally:
    consumer.close()

<cimpl.Message object at 0x7f8e26c3cf40>
<cimpl.Message object at 0x7f8e26c3ccc0>
<cimpl.Message object at 0x7f8e26c3cf40>
<cimpl.Message object at 0x7f8e26c3ccc0>
<cimpl.Message object at 0x7f8e26dcbcc0>
<cimpl.Message object at 0x7f8e26c3ccc0>
<cimpl.Message object at 0x7f8e26dcbcc0>
<cimpl.Message object at 0x7f8e26c3ccc0>
<cimpl.Message object at 0x7f8e26dcbec0>
<cimpl.Message object at 0x7f8e26c3ccc0>
<cimpl.Message object at 0x7f8e26dcbcc0>


KeyboardInterrupt: 

### Commits asíncronos

In [24]:
# Confirma polo menos cada 10 mensaxes
MIN_COMMIT_COUNT = 10
try:
    consumer.subscribe(['metamorfosis', 'kafkiano'])
    count = 0
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None: continue
        procesa_mensaxe(msg)
        count += 1
        if count % MIN_COMMIT_COUNT == 0:
            consumer.commit(asynchronous=True)
finally:
    consumer.close()

<cimpl.Message object at 0x7f8e26c3e540>
<cimpl.Message object at 0x7f8e26c3e5c0>
<cimpl.Message object at 0x7f8e26c3e540>
<cimpl.Message object at 0x7f8e26c3e5c0>
<cimpl.Message object at 0x7f8e26c3e540>
<cimpl.Message object at 0x7f8e26dc9bc0>
<cimpl.Message object at 0x7f8e26dc9bc0>
<cimpl.Message object at 0x7f8e26c3e5c0>
<cimpl.Message object at 0x7f8e26dc9bc0>
<cimpl.Message object at 0x7f8e26c3e5c0>
<cimpl.Message object at 0x7f8e26c3e5c0>
<cimpl.Message object at 0x7f8e26c3e540>


KeyboardInterrupt: 

Baseado no código de:
- <https://github.com/javicacheiro/pyspark_course/blob/master/supplementary/kafka/unit_4_kafka_python.md>