Skip to content

Using The Kafka Command Line ⌨️

Lyes Sefiane edited this page Jan 18, 2023 · 1 revision

Table Of Contents

Kafka Client Scripts

Kafka Single Node

  • Docker Compose YML File
version: '3.9'

services:
  zookeeper:
    image: bitnami/zookeeper:latest
    container_name: zookeeper
    restart: unless-stopped
    ports:
      - ${ZOOKEEPER_PORT}:${ZOOKEEPER_PORT}
    environment:
      - ALLOW_ANONYMOUS_LOGIN=${ALLOW_ANONYMOUS_LOGIN}

  kafka:
    image: bitnami/kafka:2.7.0
    container_name: kafka-broker
    restart: unless-stopped
    ports:
      - ${KAFKA_PORT_1}:${KAFKA_PORT_1}
      - ${KAFKA_PORT_2}:${KAFKA_PORT_2}
    depends_on:
      - zookeeper      
    environment:
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP_INTERNAL},EXTERNAL:${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP_EXTERNAL}
      - KAFKA_CFG_LISTENERS=INTERNAL:${KAFKA_CFG_LISTENERS_INTERNAL},EXTERNAL:${KAFKA_CFG_LISTENERS_EXTERNAL}
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL:${KAFKA_CFG_ADVERTISED_LISTENERS_INTERNAL},EXTERNAL:${KAFKA_CFG_ADVERTISED_LISTENERS_EXTERNAL}
      - KAFKA_CFG_ZOOKEEPER_CONNECT=${KAFKA_CFG_ZOOKEEPER_CONNECT}
      - KAFKA_INTER_BROKER_LISTENER_NAME=${KAFKA_INTER_BROKER_LISTENER_NAME}
      - ALLOW_PLAINTEXT_LISTENER=${ALLOW_PLAINTEXT_LISTENER}
  • Environment Variables
KAFKA_HOST=kafka
ZOOKEEPER_HOST=zookeeper
ZOOKEEPER_PORT=2181
KAFKA_PORT_1=9092
KAFKA_PORT_2=29092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP_INTERNAL=PLAINTEXT
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP_EXTERNAL=PLAINTEXT
KAFKA_CFG_LISTENERS_INTERNAL=//:29092
KAFKA_CFG_LISTENERS_EXTERNAL=//:9092
KAFKA_CFG_ADVERTISED_LISTENERS_INTERNAL=//${HOST}:${KAFKA_PORT_2}
KAFKA_CFG_ADVERTISED_LISTENERS_EXTERNAL=//localhost:${KAFKA_PORT_1}
KAFKA_CFG_ZOOKEEPER_CONNECT=${ZOOKEEPER_HOST}:${ZOOKEEPER_PORT}
KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
ALLOW_ANONYMOUS_LOGIN=yes
ALLOW_PLAINTEXT_LISTENER=yes
  • Docker Compose UP
lyes-s ( β—₯β—£_β—’β—€ ) ~/Documents/Kafka $ docker-compose --env-file .env up -d
Creating network "kafka_default" with the default driver
Pulling zookeeper (bitnami/zookeeper:latest)...
latest: Pulling from bitnami/zookeeper
Digest: sha256:792ef04ed5dcda699ffa367df42438b35268a3cf282246e77940628dd5faec5e
Status: Downloaded newer image for bitnami/zookeeper:latest
Pulling kafka (bitnami/kafka:2.7.0)...
2.7.0: Pulling from bitnami/kafka
Digest: sha256:f2b0570a5c14687e05997df4f051a78378bfb1299ac824eee6a1ad6dc209d4d7
Status: Downloaded newer image for bitnami/kafka:latest
Creating zookeeper ...
Creating zookeeper ... done
Creating kafka-broker ...
Creating kafka-broker ... done
  • Docker Container List
lyes-s ( β—₯β—£_β—’β—€ ) ~/Documents/Kafka $ docker container ls -a
CONTAINER ID   IMAGE                      COMMAND                  CREATED          STATUS          PORTS                                                  NAMES
9cabe77834ff   bitnami/kafka:2.7.0        "/opt/bitnami/script…"   36 minutes ago   Up 36 minutes   0.0.0.0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp       kafka-broker
73efedc1be7f   bitnami/zookeeper:latest   "/opt/bitnami/script…"   36 minutes ago   Up 36 minutes   2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper
  • Kafka is installed under opt/bitnami/kafka
lyes-s ( β—₯β—£_β—’β—€ ) ~/Documents/Kafka $ winpty docker exec -it kafka-broker bash

I have no name!@201c324bb18f:/$ cd opt/bitnami/kafka/
  • Let's explore the content
I have no name!@201c324bb18f:/opt/bitnami/kafka$ ls
LICENSE  NOTICE  bin  config  libs  licenses  logs  site-docs
  • The config directory contains configuration that can be modified to suit the specific setup. Any configuration changes would required a restart of Kafka (use host volumes)
I have no name!@201c324bb18f:/opt/bitnami/kafka$ ls config/
connect-console-sink.properties    connect-file-sink.properties    connect-mirror-maker.properties  kraft                server.properties       zookeeper.properties
connect-console-source.properties  connect-file-source.properties  connect-standalone.properties    log4j.properties     tools-log4j.properties
connect-distributed.properties     connect-log4j.properties        consumer.properties              producer.properties  trogdor.conf
  • The Logs directory contains log files generated by Kafka and are useful for troubleshooting. In my case, the directory is empty
I have no name!@201c324bb18f:/opt/bitnami/kafka$ ls logs
  • The bin directory contains a number of shell scripts for Kafka management, publishing and subscribing.
I have no name!@201c324bb18f:/opt/bitnami/kafka$ ls bin/
connect-distributed.sh        kafka-console-consumer.sh    kafka-features.sh            kafka-reassign-partitions.sh        kafka-topics.sh                  zookeeper-server-start.sh
connect-mirror-maker.sh       kafka-console-producer.sh    kafka-get-offsets.sh         kafka-replica-verification.sh       kafka-transactions.sh            zookeeper-server-stop.sh
connect-standalone.sh         kafka-consumer-groups.sh     kafka-leader-election.sh     kafka-run-class.sh                  kafka-verifiable-consumer.sh     zookeeper-shell.sh
kafka-acls.sh                 kafka-consumer-perf-test.sh  kafka-log-dirs.sh            kafka-server-start.sh               kafka-verifiable-producer.sh
kafka-broker-api-versions.sh  kafka-delegation-tokens.sh   kafka-metadata-shell.sh      kafka-server-stop.sh                trogdor.sh
kafka-cluster.sh              kafka-delete-records.sh      kafka-mirror-maker.sh        kafka-storage.sh                    windows
kafka-configs.sh              kafka-dump-log.sh            kafka-producer-perf-test.sh  kafka-streams-application-reset.sh  zookeeper-security-migration.sh

Creating a Topic

  1. Logging into the Kafka Container
lyes-s ( β—₯β—£_β—’β—€ ) ~/Documents/Kafka $ docker exec -it kafka-broker bash
I have no name!@1359069276ee:/$
  1. Navigate to the Kafka Scripts directory
I have no name!@1359069276ee:/$ cd opt/bitnami/kafka/bin/
I have no name!@1359069276ee:/opt/bitnami/kafka/bin$ ls -la
total 172
drwxrwxr-x 1 root root  4096 May 20 16:34 .
drwxrwxr-x 1 root root  4096 Jun 24 16:59 ..
-rwxrwxr-x 1 root root  1423 May 20 13:34 connect-distributed.sh
-rwxrwxr-x 1 root root  1396 May 20 13:34 connect-mirror-maker.sh
-rwxrwxr-x 1 root root  1420 May 20 13:34 connect-standalone.sh
-rwxrwxr-x 1 root root   861 May 20 13:34 kafka-acls.sh
-rwxrwxr-x 1 root root   873 May 20 13:34 kafka-broker-api-versions.sh
-rwxrwxr-x 1 root root   860 May 20 13:34 kafka-cluster.sh
-rwxrwxr-x 1 root root   864 May 20 13:34 kafka-configs.sh
-rwxrwxr-x 1 root root   945 May 20 13:34 kafka-console-consumer.sh
-rwxrwxr-x 1 root root   944 May 20 13:34 kafka-console-producer.sh
-rwxrwxr-x 1 root root   871 May 20 13:34 kafka-consumer-groups.sh
-rwxrwxr-x 1 root root   948 May 20 13:34 kafka-consumer-perf-test.sh
-rwxrwxr-x 1 root root   871 May 20 13:34 kafka-delegation-tokens.sh
-rwxrwxr-x 1 root root   869 May 20 13:34 kafka-delete-records.sh
-rwxrwxr-x 1 root root   866 May 20 13:34 kafka-dump-log.sh
-rwxrwxr-x 1 root root   863 May 20 13:34 kafka-features.sh
-rwxrwxr-x 1 root root   865 May 20 13:34 kafka-get-offsets.sh
-rwxrwxr-x 1 root root   870 May 20 13:34 kafka-leader-election.sh
-rwxrwxr-x 1 root root   863 May 20 13:34 kafka-log-dirs.sh
-rwxrwxr-x 1 root root   873 May 20 13:34 kafka-metadata-shell.sh
-rwxrwxr-x 1 root root   862 May 20 13:34 kafka-mirror-maker.sh
-rwxrwxr-x 1 root root   959 May 20 13:34 kafka-producer-perf-test.sh
-rwxrwxr-x 1 root root   874 May 20 13:34 kafka-reassign-partitions.sh
-rwxrwxr-x 1 root root   874 May 20 13:34 kafka-replica-verification.sh
-rwxrwxr-x 1 root root 10600 May 20 13:34 kafka-run-class.sh
-rwxrwxr-x 1 root root  1370 Jun 24 16:59 kafka-server-start.sh
-rwxrwxr-x 1 root root  1361 May 20 13:34 kafka-server-stop.sh
-rwxrwxr-x 1 root root   860 May 20 13:34 kafka-storage.sh
-rwxrwxr-x 1 root root   945 May 20 13:34 kafka-streams-application-reset.sh
-rwxrwxr-x 1 root root   863 May 20 13:34 kafka-topics.sh
-rwxrwxr-x 1 root root   879 May 20 13:34 kafka-transactions.sh
-rwxrwxr-x 1 root root   958 May 20 13:34 kafka-verifiable-consumer.sh
-rwxrwxr-x 1 root root   958 May 20 13:34 kafka-verifiable-producer.sh
-rwxrwxr-x 1 root root  1714 May 20 13:34 trogdor.sh
drwxrwxr-x 1 root root  4096 May 20 16:34 windows
-rwxrwxr-x 1 root root   867 May 20 13:34 zookeeper-security-migration.sh
-rwxrwxr-x 1 root root  1393 May 20 13:34 zookeeper-server-start.sh
-rwxrwxr-x 1 root root  1366 May 20 13:34 zookeeper-server-stop.sh
-rwxrwxr-x 1 root root  1019 May 20 13:34 zookeeper-shell.sh
  1. Creating new Topics : tweets & alerts

Topic : tweets

I have no name!@1359069276ee:/opt/bitnami/kafka/bin$         
>        ./kafka-topics.sh \
>             --zookeeper zookeeper:2181 \
>             --create \
>             --topic kafka.learning.tweets \
>             --partitions 1 \
>             --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic kafka.learning.tweets.

Topic : alerts

I have no name!@1359069276ee:/opt/bitnami/kafka/bin$         
>         ./kafka-topics.sh \
>             --zookeeper zookeeper:2181 \
>             --create \
>             --topic kafka.learning.alerts \
>             --partitions 1 \
>             --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic kafka.learning.alerts.
  1. Listing Topics
I have no name!@9cabe77834ff:/opt/bitnami/kafka/bin$         
>         ./kafka-topics.sh \
>             --zookeeper zookeeper:2181 \
>             --list
kafka.learning.alerts
kafka.learning.tweets

Exploring Topics

  • Getting details about Topics
I have no name!@9cabe77834ff:/opt/bitnami/kafka/bin$         
>        ./kafka-topics.sh \
>             --zookeeper zookeeper:2181 \
>             --describe
Topic: kafka.learning.alerts    PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: kafka.learning.alerts    Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
Topic: kafka.learning.tweets    PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: kafka.learning.tweets    Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

Publishing Messages to Topics

I have no name!@9cabe77834ff:/opt/bitnami/kafka/bin$         
>       ./kafka-console-producer.sh \
              # We run this command from inside the docker
              # So we are using an internal host and port.
>             --bootstrap-server localhost:29092 \
>             --topic kafka.learning.tweets
# Prompt to write a message
>This is my first tweet !

Consuming Messages from Topics

I have no name!@9cabe77834ff:/opt/bitnami/kafka/bin$         
>      ./kafka-console-consumer.sh \
>             --bootstrap-server localhost:29092 \
>             --topic kafka.learning.tweets \
# If the consumer does not already have
# an established offset to consume
# from, start with the earliest
# message present in the log rather
# than the latest message.
>             --from-beginning
This is my first tweet !
^CProcessed a total of 1 messages

Topic Management

  • Deleting Topics
I have no name!@9cabe77834ff:/opt/bitnami/kafka/bin$         
>      ./kafka-topics.sh \
>             --zookeeper zookeeper:2181 \
>             --delete \
>             --topic kafka.learning.alerts
Topic kafka.learning.alerts is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

Kafka Setup

Zookeeper

lyes-s ( β—₯β—£_β—’β—€ ) ~/Documents/Kafka $ docker exec -it zookeeper bash
I have no name!@73efedc1be7f:/$ cd opt/bitnami/zookeeper/bin/
I have no name!@73efedc1be7f:/opt/bitnami/zookeeper/bin$ ls -la
total 84
drwxr-xr-x 2 root root  4096 May  5 11:00 .
drwxr-xr-x 1 root root  4096 Jun 27 03:17 ..
-rwxr-xr-x 1 root root   232 May  5 10:52 README.txt
-rwxr-xr-x 1 root root  1978 May  5 10:52 zkCleanup.sh
-rwxr-xr-x 1 root root  1115 May  5 10:52 zkCli.cmd
-rwxr-xr-x 1 root root  1576 May  5 10:52 zkCli.sh
-rwxr-xr-x 1 root root  1810 May  5 10:52 zkEnv.cmd
-rwxr-xr-x 1 root root  3613 May  5 10:52 zkEnv.sh
-rwxr-xr-x 1 root root  4559 May  5 10:52 zkServer-initialize.sh
-rwxr-xr-x 1 root root  1243 May  5 10:52 zkServer.cmd
-rwxr-xr-x 1 root root 11591 May  5 10:52 zkServer.sh
-rwxr-xr-x 1 root root   988 May  5 10:52 zkSnapShotToolkit.cmd
-rwxr-xr-x 1 root root  1377 May  5 10:52 zkSnapShotToolkit.sh
-rwxr-xr-x 1 root root   987 May  5 10:52 zkSnapshotComparer.cmd
-rwxr-xr-x 1 root root  1374 May  5 10:52 zkSnapshotComparer.sh
-rwxr-xr-x 1 root root   996 May  5 10:52 zkTxnLogToolkit.cmd
-rwxr-xr-x 1 root root  1385 May  5 10:52 zkTxnLogToolkit.sh
  1. Execute Zookeeper Client
I have no name!@73efedc1be7f:/opt/bitnami/zookeeper/bin$ zkCli.sh
/opt/bitnami/java/bin/java
Connecting to localhost:2181
2022-06-28 18:36:57,201 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:zookeeper.version=3.8.0-5a02a05eddb59aee6ac762f7ea82e92a68eb9c0f, built on 2022-02-25 08:49 UTC
2022-06-28 18:36:57,202 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:host.name=73efedc1be7f
2022-06-28 18:36:57,202 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:java.version=11.0.15
2022-06-28 18:36:57,203 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:java.vendor=BellSoft
2022-06-28 18:36:57,203 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:java.home=/opt/bitnami/java
2022-06-28 18:36:57,203 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:java.class.path=/opt/bitnami/zookeeper/bin/../zookeeper-server/target/classes:/opt/bitnami/zookeepe
r/bin/../build/classes:/opt/bitnami/zookeeper/bin/../zookeeper-server/target/lib/*.jar:/opt/bitnami/zookeeper/bin/../build/lib/*.jar:/opt/bitnami/zookeeper/bin/../lib/zookeeper-prometheus-
metrics-3.8.0.jar:/opt/bitnami/zookeeper/bin/../lib/zookeeper-jute-3.8.0.jar:/opt/bitnami/zookeeper/bin/../lib/zookeeper-3.8.0.jar:/opt/bitnami/zookeeper/bin/../lib/snappy-java-1.1.7.7.jar
:/opt/bitnami/zookeeper/bin/../lib/slf4j-api-1.7.30.jar:/opt/bitnami/zookeeper/bin/../lib/simpleclient_servlet-0.9.0.jar:/opt/bitnami/zookeeper/bin/../lib/simpleclient_hotspot-0.9.0.jar:/o
pt/bitnami/zookeeper/bin/../lib/simpleclient_common-0.9.0.jar:/opt/bitnami/zookeeper/bin/../lib/simpleclient-0.9.0.jar:/opt/bitnami/zookeeper/bin/../lib/netty-transport-native-unix-common-
4.1.73.Final.jar:/opt/bitnami/zookeeper/bin/../lib/netty-transport-native-epoll-4.1.73.Final.jar:/opt/bitnami/zookeeper/bin/../lib/netty-transport-classes-epoll-4.1.73.Final.jar:/opt/bitna
mi/zookeeper/bin/../lib/netty-transport-4.1.73.Final.jar:/opt/bitnami/zookeeper/bin/../lib/netty-tcnative-classes-2.0.48.Final.jar:/opt/bitnami/zookeeper/bin/../lib/netty-tcnative-2.0.48.F
inal.jar:/opt/bitnami/zookeeper/bin/../lib/netty-resolver-4.1.73.Final.jar:/opt/bitnami/zookeeper/bin/../lib/netty-handler-4.1.73.Final.jar:/opt/bitnami/zookeeper/bin/../lib/netty-common-4
.1.73.Final.jar:/opt/bitnami/zookeeper/bin/../lib/netty-codec-4.1.73.Final.jar:/opt/bitnami/zookeeper/bin/../lib/netty-buffer-4.1.73.Final.jar:/opt/bitnami/zookeeper/bin/../lib/metrics-cor
e-4.1.12.1.jar:/opt/bitnami/zookeeper/bin/../lib/logback-core-1.2.10.jar:/opt/bitnami/zookeeper/bin/../lib/logback-classic-1.2.10.jar:/opt/bitnami/zookeeper/bin/../lib/jline-2.14.6.jar:/op
t/bitnami/zookeeper/bin/../lib/jetty-util-ajax-9.4.43.v20210629.jar:/opt/bitnami/zookeeper/bin/../lib/jetty-util-9.4.43.v20210629.jar:/opt/bitnami/zookeeper/bin/../lib/jetty-servlet-9.4.43
.v20210629.jar:/opt/bitnami/zookeeper/bin/../lib/jetty-server-9.4.43.v20210629.jar:/opt/bitnami/zookeeper/bin/../lib/jetty-security-9.4.43.v20210629.jar:/opt/bitnami/zookeeper/bin/../lib/j
etty-io-9.4.43.v20210629.jar:/opt/bitnami/zookeeper/bin/../lib/jetty-http-9.4.43.v20210629.jar:/opt/bitnami/zookeeper/bin/../lib/javax.servlet-api-3.1.0.jar:/opt/bitnami/zookeeper/bin/../l
ib/jackson-databind-2.13.1.jar:/opt/bitnami/zookeeper/bin/../lib/jackson-core-2.13.1.jar:/opt/bitnami/zookeeper/bin/../lib/jackson-annotations-2.13.1.jar:/opt/bitnami/zookeeper/bin/../lib/
commons-io-2.11.0.jar:/opt/bitnami/zookeeper/bin/../lib/commons-cli-1.4.jar:/opt/bitnami/zookeeper/bin/../lib/audience-annotations-0.12.0.jar:/opt/bitnami/zookeeper/bin/../zookeeper-*.jar:
/opt/bitnami/zookeeper/bin/../zookeeper-server/src/main/resources/lib/*.jar:/opt/bitnami/zookeeper/bin/../conf:
2022-06-28 18:36:57,203 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:java.library.path=/usr/java/packages/lib:/usr/lib64:/lib64:/lib:/usr/lib
2022-06-28 18:36:57,203 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:java.io.tmpdir=/tmp
2022-06-28 18:36:57,203 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:java.compiler=<NA>
2022-06-28 18:36:57,204 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:os.name=Linux
2022-06-28 18:36:57,204 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:os.arch=amd64
2022-06-28 18:36:57,204 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:os.version=5.4.72-microsoft-standard-WSL2
2022-06-28 18:36:57,204 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:user.name=?
2022-06-28 18:36:57,204 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:user.home=?
2022-06-28 18:36:57,204 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:user.dir=/opt/bitnami/zookeeper/bin
2022-06-28 18:36:57,204 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:os.memory.free=1012MB
2022-06-28 18:36:57,205 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:os.memory.max=1024MB
2022-06-28 18:36:57,205 [myid:] - INFO  [main:o.a.z.Environment@98] - Client environment:os.memory.total=1024MB
2022-06-28 18:36:57,207 [myid:] - INFO  [main:o.a.z.ZooKeeper@637] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperM
ain$MyWatcher@2b662a77
2022-06-28 18:36:57,214 [myid:] - INFO  [main:o.a.z.c.X509Util@77] - Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation
2022-06-28 18:36:57,218 [myid:] - INFO  [main:o.a.z.ClientCnxnSocket@239] - jute.maxbuffer value is 1048575 Bytes
2022-06-28 18:36:57,225 [myid:] - INFO  [main:o.a.z.ClientCnxn@1732] - zookeeper.request.timeout value is 0. feature enabled=false
Welcome to ZooKeeper!
2022-06-28 18:36:57,235 [myid:localhost:2181] - INFO  [main-SendThread(localhost:2181):o.a.z.ClientCnxn$SendThread@1171] - Opening socket connection to server localhost/127.0.0.1:2181.
2022-06-28 18:36:57,235 [myid:localhost:2181] - INFO  [main-SendThread(localhost:2181):o.a.z.ClientCnxn$SendThread@1173] - SASL config status: Will not attempt to authenticate using SASL (
unknown error)
2022-06-28 18:36:57,248 [myid:localhost:2181] - INFO  [main-SendThread(localhost:2181):o.a.z.ClientCnxn$SendThread@1005] - Socket connection established, initiating session, client: /127.0
.0.1:45848, server: localhost/127.0.0.1:2181
JLine support is enabled
2022-06-28 18:36:57,259 [myid:localhost:2181] - INFO  [main-SendThread(localhost:2181):o.a.z.ClientCnxn$SendThread@1444] - Session establishment complete on server localhost/127.0.0.1:2181
, session id = 0x10000488fb40007, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
# We are now connected to Zookeeper Client
[zk: localhost:2181(CONNECTED) 0]
  1. Available Nodes
[zk: localhost:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
  1. Brokers Registered With Zookeeper
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
# 1 Kafka broker registered for now
[1001]
  1. Details about a Specific broker
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/1001

{
  "listener_security_protocol_map":{"INTERNAL":"PLAINTEXT","EXTERNAL":"PLAINTEXT"},
  "endpoints":["INTERNAL://kafka:29092","EXTERNAL://localhost:9092"],
  "jmx_port":-1,"features":{},
  "host":"kafka",
  "timestamp":"1656438552566",
  "port":29092,
  "version":5
}
  1. List Of Topics Currently Available
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[__consumer_offsets, kafka.learning.tweets]
  1. Details about a Specific Topic
[zk: localhost:2181(CONNECTED) 6] get /brokers/topics/kafka.learning.tweets
{
  "version":2,
  "partitions":{
    "0":[1001]
  },
  "adding_replicas":{},
  "removing_replicas":{}
}

Kafka Server Properties

lyes-s ( β—₯β—£_β—’β—€ ) ~/Documents/Kafka $ winpty docker exec -it kafka-broker bash
I have no name!@9cabe77834ff:/$ cat /opt/bitnami/kafka/config/server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=-1

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=INTERNAL://:29092,EXTERNAL://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=INTERNAL://kafka:29092,EXTERNAL://localhost:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/bitnami/kafka/data

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=zookeeper:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application start
up.
group.initial.rebalance.delay.ms=0

auto.create.topics.enable=true

inter.broker.listener.name=INTERNAL

sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=
tls.client.auth=required
tls.type=JKS

Kafka Log Directories

I have no name!@9cabe77834ff:/$ cd /bitnami/kafka/data/
I have no name!@9cabe77834ff:/bitnami/kafka/data$ ls -la
total 236
drwxrwxr-x 1 root root 4096 Jun 28 19:04 .
drwxrwxr-x 1 root root 4096 Apr 20  2021 ..
-rw-r--r-- 1 1001 root    0 Jun 28 17:49 .lock
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-0
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-1
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-10
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-11
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-12
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-13
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-14
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-15
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-16
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-17
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-18
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-19
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-2
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-20
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-21
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-22
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-23
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-24
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-25
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-26
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-27
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-28
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-29
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-3
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-30
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-31
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-32
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-33
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-34
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-35
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-36
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-37
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-38
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-39
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-4
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-40
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-41
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-42
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-43
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-44
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-45
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-46
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-47
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-48
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-49
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-5
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-6
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-7
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-8
drwxr-xr-x 2 1001 root 4096 Jun 28 18:13 __consumer_offsets-9
-rw-r--r-- 1 1001 root    4 Jun 28 18:17 cleaner-offset-checkpoint
drwxr-xr-x 2 1001 root 4096 Jun 28 17:51 kafka.learning.tweets-0
-rw-r--r-- 1 1001 root    4 Jun 28 19:03 log-start-offset-checkpoint
-rw-r--r-- 1 1001 root   91 Jun 28 17:49 meta.properties
-rw-r--r-- 1 1001 root 1221 Jun 28 19:03 recovery-point-offset-checkpoint
-rw-r--r-- 1 1001 root 1221 Jun 28 19:04 replication-offset-checkpoint
  • There is a directory for each Topic and for each Partition

Example :

  • We have a data directory kafka.learning.tweets-0 where kafka.learning.tweets is the Topic and 0 is the partition id.
I have no name!@9cabe77834ff:/bitnami/kafka/data/kafka.learning.tweets-0$ ls -la
total 16
drwxr-xr-x 2 1001 root     4096 Jun 28 17:51 .
drwxrwxr-x 1 root root     4096 Jun 28 19:05 ..
-rw-r--r-- 1 1001 root 10485760 Jun 28 17:51 00000000000000000000.index
-rw-r--r-- 1 1001 root       92 Jun 28 18:07 00000000000000000000.log
-rw-r--r-- 1 1001 root 10485756 Jun 28 17:51 00000000000000000000.timeindex
-rw-r--r-- 1 1001 root        8 Jun 28 17:51 leader-epoch-checkpoint
  • Log file content
I have no name!@9cabe77834ff:/bitnami/kafka/data/kafka.learning.tweets-0$ cat 00000000000000000000.log
Pβ˜»β†’β–Ί7οΏ½β˜ΊοΏ½οΏ½βŒ‚Jqβ˜ΊοΏ½οΏ½βŒ‚Jq��������������☺<☺0This is my first tweet 
!I have no name!@9cabe77834ff:/bitnami/kafka/data/kafka.learning.tweets-0$
Clone this wiki locally