<a href="https://colab.research.google.com/github/victorescosta/Kafka-Python/blob/main/Kafka_Basics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install kafka-python
#install pyspark and related

In [None]:
#you need java jdk and its dependecies to run kafka properly

In [None]:
#start zookeeper and its configs in a isolated terminal - always in /bin
./bin/zookeeper-server-start.sh ../config/zookeeper.properties 

In [None]:
#run kafka server isolated terminal
./kafka-server-start.sh ../config/server.properties

In [None]:
#creating a kafka topic
./kafka-topics.sh --create-topic Hello-World --bootstrap-server localhost:9092

In [None]:
#writing a message in a topic as a producer. you can create a new topic if you insist
./kafka-console-producer.sh --topic Hello-World --bootstrap-server localhost:9092

In [None]:
#read a message in a topic as a consumer
./kafka-console-consumer.sh --topic Hello-World --from-beginning --bootstrap-server localhost:9092

In [None]:
#see topics info
./kafka-topics.sh --bootstrap-server localhost:9092 --list 

#describing it
./kafka-topics.sh --describe --topic Hello-World --bootstrap-server localhost:9092

In [None]:
#creating topics
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic newtopic --create --partitions 3 --replication-factor 1

In [None]:
#alter partitions. if any error message shows, it means that it worked
./kafka-topics.sh --topic newtopic --bootstrap-server localhost:9092 --alter --partitions 4

In [None]:
#delete a topic
./kafka-topics.sh --delete --topic Hello-World --bootstrap-server localhost:9092

In [None]:
#consuming specific messages (by partitions and offset)
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mymessages --partition 0 --offset 2 --max-messages 1

In [None]:
#starting consumer groups
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mymessages --group consumidores

#list them
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

#describe any of them
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group consumidores

In [None]:
#pick not readed messages from a consumer group. you need first create messages without the consumer group reading it.
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mymessages --group consumidores --from-beginning

In [None]:
#reset offset. you need close the consumer and producers first
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group consumidores --topic mymessages --reset-offsets --to-earliest --execute
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mymessages --group consumidores

In [None]:
#=======>>> creating a kafka-cluster

#first you need to stop any zookeeper service already running, and also a kafka instance
./kafka-server-stop.sh
./zookeeper-server-stop.sh

#delete kafka-logs and zookeeper-logs. you need to go to tmp dir
rm -r kafka*
rm -r zookeeper/

#copy config files. inside kafka/confg dir
cp server.properties server.properties0
cp server.properties server.properties1

#after that, edit it. you need to edit broker.id (1 == properties1 and 2 == properties2 in line 21), 
#port (uncomment line 31. change to 9093 if properties1), and log (log0, line 60 in kafka_2.13-3.1.0).
gedit server.properties0


In [None]:
#restart zookeeper server
./bin/zookeeper-server-start.sh ../config/zookeeper.properties 

#start kafka instances in each separated terminal
./kafka-server-start.sh ../config/server.properties0
./kafka-server-start.sh ../config/server.properties1
./kafka-server-start.sh ../config/server.properties2

#How to check if it worked? Ask for zookeeper
./zookeeper-shell.sh localhost:2181
#in zookeeper shell
ls /brokers/ids
get /brokers/ids/0

In [None]:
#read kafka-logs. you must enter in kafka-log dir (tmp)
/kafka/bin/kafka-dump-log.sh --files 00000000000000000000.log

In [None]:
#compressing data in topic and producer level
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic compress --create --partitions 3 --config compression.type=gzip

#writing compressed messages
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic compress --compression-codec gzip


In [None]:
#Understanding Kafka API. You Need Pip, Jupyter and also Kafka-Python lib installed in your desktop/laptop
sudo apt install python3-pip
sudo -H pip install jupyter
sudo pip install kafka-python

#initialize jupyter notebook
jupyter notebook


In [None]:
#creating a consumer and a producer in python


**Producer** - SEPARATED NOTEBOOK

In [None]:
#importing dependencies and setting up producer with its bootstrap-server
from kafka import KafkaProducer as kp
import random
producer = kp(bootstrap_servers="127.0.0.1:9092")

In [None]:
#generating messages
for x in range(10):
  n = random.random()
  producer.send("mymessages", key=b"Key %d" % x, value=b"TestMessage %f " % n)

**Consumer** - SEPARATED NOTEBOOK

In [None]:
#importing dependencies and setting up consumer with its bootstrap-server
from kafka import KafkaConsumer as kc
consumer = kc("mymessages", bootstrap_servers="127.0.0.1:9092",
              consumer_timeout_ms=1000, group_id="consumers")

In [None]:
#reading messages
for mymessages in consumer:
  print("Topic: ", mymessages.topic)
  print("Partition: ", mymessages.partition)
  print("Key: ", mymessages.key)
  print("Offset: ", mymessages.offset)
  print("Message: ", mymessages.value)
  print("-----------------------------")

In [None]:
#you can also testing it in a kafka shell. first open a kafka console consumer, and after that, try to send messages to it, and also check the jupyter consumer
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mymessages --group consumers


**Real App - You need to run a Kafka-Cluster first (first steps of this tutorial)**

In [None]:
#installing Apache WebServer
sudo apt install apache2

#check if it will worked
sudo systemctl status apache2

#go to apache dir
cd /var/log/apache2/
ls

#in your webbrowser (mine is Mozilla)
http:localhost

#check log file
cat access.log


In [None]:
#create an apache log file in kafka(bin dir). you need 3 kafka instances for 3 replication factor
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic apachelog --create --partitions 3 --replication-factor 3 

In [None]:
#monitoring kafka console consumer
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic apachelog

In [None]:
#creating a apacheconnector.py
touch apacheconnector.py
code -r apacheconnector.py

#write this and save it in your apacheconnector.py file
import time
import re
import datetime
from kafka import KafkaProducer as kp

file = open(r'/var/log/apache2/acess.log', 'r')
regexp = 'REGEX'
producer = kp(bootstrap_servers="127.0.0.1:9092")

while 1:
  line = file.readline()
  if not line:
    time.sleep(5)
  else:
    x = re.match(regexp, line).groups()
    msg = bytes(str(x), encoding='ascii')
    producer.send("apachelog", msg)
    print("Message sent at ", datetime.datetime.now())

#save it and run it in your shell enviroment
python3 apacheconnector.py

#refresh your localhost webpage
#have fun :)

In [None]:
from kafka import KafkaConsumer

# To consume messages
consumer = KafkaConsumer('test-topic',
                         group_id='my_group',
                         bootstrap_servers=['localhost:9092'],
                         auto_commit_enable=True,
                         auto_commit_interval_ms=30 * 1000,
                         auto_offset_reset='smallest')

for message in consumer:
  # raw byte string to be decode
  message = str(message.value.decode('utf-8'))
  logging.info(f"==>>: {str(message)}")
  print("%s: %d: %d: key=%s value=%s" (message.topic, message.partition,
                                       message.offset, message.key,
                                       message.value))
  

*********************
Working in Progress
*********************