## Kafka

### 1. Crear un topic desde consola

In [None]:
!bash /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

### 2. Listar topics desde consola

In [None]:
!bash /opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

### 3. Listar configuración de un topic desde consola

In [None]:
!bash /opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

### 4. Insertar datos en un topic desde consola

In [None]:
!echo "testing" | bash /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

### 5. Leer el registro insertado desde consola

In [None]:
!bash /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

### 6. Insertar registros en un topic desde python
Docuentación KafkaProducer: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

In [None]:
# Vamos a utilizar la librería kafka-python
# Para ello tenemos que tenerla instalada en nuestra distribución de python
#!pip install kafka-python

# Importamos la librería que hemos instalado
from kafka import KafkaProducer

In [None]:
# Creamos un productor 
# Le indicamos cual es el kafka en el que queremos insertar
# Insertamos 100 registros en el topic test

producer = KafkaProducer(bootstrap_servers='localhost:9092')
for _ in range(10):
    producer.send('test2', b'some_message_bytes')

### 7. Leemos los 10 registros insertados anteriormente desde python
Documentación KafkaConsumer: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

In [None]:
from kafka import KafkaConsumer
consumer = KafkaConsumer('test2', 
                         bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest', 
                         enable_auto_commit=True)
for msg in consumer:
    print (msg)

In [None]:
my_group_consumer = KafkaConsumer('test2', 
                         bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest', 
                         enable_auto_commit=True,
                         group_id='my-group')
for msg in my_group_consumer:
    print (msg)

### 8. Producir mensajes en formato JSON

In [None]:
import json
json_producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
json_producer.send('jsontest', {'foo': 'bar'})


### 9. Consumir mensajes en formato JSON

In [None]:
json_consumer = KafkaConsumer('jsontest', 
                         bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest', 
                         enable_auto_commit=True)
for msg in json_consumer:
    print (msg)

### 10. Uso de la key del mensaje 

In [None]:
key_producer = KafkaProducer(key_serializer=str.encode)
key_producer.send('keytest', key='ping', value=b'1234')

In [None]:
key_consumer = KafkaConsumer('keytest', 
                         bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest', 
                         enable_auto_commit=True)
for msg in key_consumer:
    print ('Key:   ' + str(msg.key)) 
    print ('Value: ' + str(msg.value))

### 11. Borrar topics desde consola

In [None]:
!bash /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

In [None]:
!bash /opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

### 12. Compactando topics
<br><br>
![png](../images/kafka-log-compaction.png)
<br><br>
![png](../images/kafka-log-compaction-process.png) 
<br><br>

* min.cleanable.dirty.ratio -> https://docs.confluent.io/current/installation/configuration/topic-configs.html#min-cleanable-dirty-rati

* delete.retention.ms -> https://docs.confluent.io/current/installation/configuration/topic-configs.html#delete-retention-ms

* segment.ms -> https://docs.confluent.io/current/installation/configuration/topic-configs.html#segment-ms

In [None]:
!bash /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic compacted4 --replication-factor 1 --partitions 1 --config min.insync.replicas=1 --config cleanup.policy=compact --config delete.retention.ms=1 --config min.cleanable.dirty.ratio=0.01 --config segment.ms=1
!bash /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type=topics --entity-name=compacted4 --alter --add-config cleanup.policy=compact,delete.retention.ms=1,min.cleanable.dirty.ratio=0.01,segment.ms=1
    

In [None]:
!echo "key1:testing1" | bash /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic compacted5 --property "parse.key=true" --property "key.separator=:" 

In [None]:
!bash /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic compacted5 --from-beginning --property print.key=true --property key.separator=":"

In [None]:
!echo "key2:testing1" | bash /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic compacted5 --property "parse.key=true" --property "key.separator=:" 

In [None]:
!bash /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic compacted5 --from-beginning --property print.key=true --property key.separator=":"

In [None]:
!echo "key2:testing2" | bash /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic compacted5 --property "parse.key=true" --property "key.separator=:" 

In [None]:
!bash /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic compacted5 --from-beginning --property print.key=true --property key.separator=":"