# Diplomatura en Ciencia de Datos, Aprendizaje Automático y sus Aplicaciones
## Despliegue de Sistemas Predictivos",

### Facultad de Matemática Astronomía Física y Computación
## Universidad Nacional de Córdoba

<img src="http://program.ar/wp-content/uploads/2018/07/logo-UNC-FAMAF.png" alt="Drawing" style="width:80%;"/>

### Setup your Kafka cluster with Zookeeper

see `kafka/docker-compose.yml`

## Install dependencies for python project

In [1]:
!pip install kafka-python

You should consider upgrading via the '/home/ssulca/development/deployment-pm/kafka-kickstart/venv/bin/python -m pip install --upgrade pip' command.[0m


## Library documentation

https://kafka-python.readthedocs.io/en/master/usage.html

## Consumer / Producer with python
Constants to be used by admin, consumer and producer

In [1]:
import logging

In [2]:
# puerto para comunicarme con kafka
KAFKA_PORT = '9092'

# KAFKA_DNS = 'localhost'
KAFKA_DNS = 'localhost'
KAFKA_SERVER_URI = f'{KAFKA_DNS}:{KAFKA_PORT}'
TOPIC = 'dummy_topic'
PARTITIONS = 3
REPLICATION_FACTOR = 1

In [3]:
# Create a topic with an admin client
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError

admin_client = KafkaAdminClient(
    bootstrap_servers=KAFKA_SERVER_URI, 
    client_id='adminZero'
)

topic_list = []
topic_list.append(NewTopic(name=TOPIC, num_partitions=PARTITIONS, replication_factor=REPLICATION_FACTOR))
try:
    admin_client.create_topics(new_topics=topic_list, validate_only=False)
except TopicAlreadyExistsError:
    logging.warning(f"Topic {TOPIC} already created. Creation ommited")



In [4]:
# List all topics and find the 'dummy_topic'

from kafka import KafkaConsumer
consumer = KafkaConsumer(group_id=None, bootstrap_servers=[KAFKA_SERVER_URI])
consumer.topics()



{'__confluent.support.metrics', 'dummy_topic'}

### Produce a message

In [5]:
# Produce a message

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_URI])

# Asynchronous by default
message = input("Message to be sent: ")
FROM = 'uniqueID' # This is useful to ensure order in reading.
future = producer.send(TOPIC, key=bytes(FROM, 'utf-8'), value=bytes(message, 'utf-8'))

Message to be sent:  hello


In [6]:
# Consume just one message
from kafka import KafkaConsumer

def do_some_stuff(message):
    print(f"{message.topic.upper()} Partition: {message.partition} Offset:{message.offset} key={message.key} value={message.value}")


# Open consumer connection
consumer = KafkaConsumer(TOPIC,
                         group_id="group team",
                         auto_offset_reset='latest',
                         enable_auto_commit=False, # We are going to commit manually after doing some stuff.
                         bootstrap_servers=[KAFKA_SERVER_URI])
    
# Read from consumer
message = next(consumer)
# Operate with message 
do_some_stuff(message)
# Commit the offset
consumer.commit()
# Close the consumer connection
consumer.close()

DUMMY_TOPIC Partition: 2 Offset:2 key=None value=b'heelp'


ERROR:kafka.conn:<BrokerConnection node_id=1 host=127.0.0.1:9092 <connected> [IPv4 ('127.0.0.1', 9092)]>: socket disconnected
ERROR:kafka.conn:<BrokerConnection node_id=1 host=127.0.0.1:9092 <connected> [IPv4 ('127.0.0.1', 9092)]>: socket disconnected
ERROR:kafka.conn:<BrokerConnection node_id=1 host=127.0.0.1:9092 <connected> [IPv4 ('127.0.0.1', 9092)]>: socket disconnected
ERROR:kafka.conn:<BrokerConnection node_id=1 host=127.0.0.1:9092 <connected> [IPv4 ('127.0.0.1', 9092)]>: socket disconnected
ERROR:kafka.conn:<BrokerConnection node_id=1 host=127.0.0.1:9092 <connected> [IPv4 ('127.0.0.1', 9092)]>: socket disconnected
ERROR:kafka.conn:<BrokerConnection node_id=1 host=127.0.0.1:9092 <connected> [IPv4 ('127.0.0.1', 9092)]>: socket disconnected
ERROR:kafka.conn:<BrokerConnection node_id=1 host=127.0.0.1:9092 <connected> [IPv4 ('127.0.0.1', 9092)]>: socket disconnected
ERROR:kafka.conn:<BrokerConnection node_id=1 host=127.0.0.1:9092 <connected> [IPv4 ('127.0.0.1', 9092)]>: socket disco