# Python Kafka

There exists a neat Python package for communicating with a running Kafka cluster. You can even admin the cluster using it:

In [1]:
!pip install kafka-python

Collecting kafka-python
  Downloading https://files.pythonhosted.org/packages/75/68/dcb0db055309f680ab2931a3eeb22d865604b638acf8c914bedf4c1a0c8c/kafka_python-2.0.2-py2.py3-none-any.whl (246kB)
[K    100% |████████████████████████████████| 256kB 2.9MB/s eta 0:00:01
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2
[33mYou are using pip version 9.0.1, however version 20.3.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [2]:
params = {
    "bootstrap_servers": "pkc-4r297.europe-west1.gcp.confluent.cloud:9092",
    "security_protocol": "SASL_SSL",
    "sasl_mechanism": "PLAIN",
    "sasl_plain_username": "{KEY}",
    "sasl_plain_password": "{SECRET}"
}

In [3]:
from kafka import KafkaAdminClient
from kafka.admin import NewTopic

MAIN_TOPIC = "main_topic"
admin_client = KafkaAdminClient(**params)
# with AdminClient you can do anything with Kafka
admin_client.delete_topics(admin_client.list_topics())
# replication_factor is defined by cluster's configuration
# we won't use topic partitioning for this example
# thus leaving the number of partitions to 1
admin_client.create_topics([
    NewTopic(
        name=MAIN_TOPIC,
        num_partitions=1,
        replication_factor=3
    )
])

CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic=u'main_topic', error_code=0, error_message=None)])

# Writing Data to Kafka

Mind that one can write only `bytes` to Kafka, not strings!

In [10]:
from kafka import KafkaProducer

print(admin_client.list_topics())
producer = KafkaProducer(**params)
for i in range(30):
    producer.send(MAIN_TOPIC, bytes(i))
print(admin_client.list_topics())

[u'main_topic']
[u'main_topic']


# Reading Data from Kafka

Reading is done with a consumer

In [35]:
from kafka import KafkaConsumer

consumer = KafkaConsumer(**params)

# Topics, Partitions, and Offsets

Messages in Kafka are organised into topics:

In [36]:
consumer.topics()

{u'main_topic'}

Topic can have several partitions with different starting and ending offsets:

In [37]:
from kafka import TopicPartition

partitions = [
    TopicPartition(MAIN_TOPIC, partition)
    for partition in consumer.partitions_for_topic(MAIN_TOPIC)
]
print(consumer.beginning_offsets(partitions))
print(consumer.end_offsets(partitions))

{TopicPartition(topic=u'main_topic', partition=0): 0}
{TopicPartition(topic=u'main_topic', partition=0): 40}


Before reading something from Kafka, one should assign the consumer to a topic and partition:

# Reading from a given offset of a partition

In [38]:
consumer.assign(partitions)

One can read from any offset of the partition:

In [39]:
print(consumer.position(partitions[0]))
consumer.seek(partitions[0], 0)
print(consumer.position(partitions[0]))

40
0


Reading data can be done by batches of any desired size:

In [41]:
for _ in range(10):
    data = consumer.poll(
        timeout_ms=10,
        max_records=1
    )[partitions[0]][0].value
    print(data)
print(consumer.position(partitions[0]))

0
1
2
3
4
5
6
7
8
9
10


In [34]:
!spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.4 /home/user/spark-and-kafka.py

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-624d5fa0-ea66-4164-8e60-0f3a0eb061b6;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.4 in central
	found org.apache.kafka#kafka-clients;0.10.0.1 in central
	found net.jpountz.lz4#lz4;1.3.0 in central
	found org.xerial.snappy#snappy-java;1.1.2.6 in central
	found org.slf4j#slf4j-api;1.7.16 in central
	found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 506ms :: artifacts dl 11ms
	:: modules in use:
	net.jpountz.lz4#lz4;1.3.0 from central in [default]
	org.apache.kafka#kafka-clients;0.10.0.1 from central in [default]
	org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.4 from central in [def

20/12/10 14:28:55 WARN org.apache.kafka.clients.NetworkClient: Bootstrap broker pkc-4r297.europe-west1.gcp.confluent.cloud:9092 disconnected
20/12/10 14:28:55 WARN org.apache.kafka.clients.NetworkClient: Bootstrap broker pkc-4r297.europe-west1.gcp.confluent.cloud:9092 disconnected
20/12/10 14:28:56 WARN org.apache.kafka.clients.NetworkClient: Bootstrap broker pkc-4r297.europe-west1.gcp.confluent.cloud:9092 disconnected
20/12/10 14:28:56 WARN org.apache.kafka.clients.NetworkClient: Bootstrap broker pkc-4r297.europe-west1.gcp.confluent.cloud:9092 disconnected
20/12/10 14:28:56 INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: 
	metric.reporters = []
	metadata.max.age.ms = 300000
	partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
	reconnect.backoff.ms = 50
	sasl.kerberos.ticket.renew.window.factor = 0.8
	max.partition.fetch.bytes = 1048576
	bootstrap.servers = [pkc-4r297.europe-west1.gcp.confluent.cloud:9092]
	ssl.keystore.type =