Skip to content

luszczynski/kafka-cheat-sheet

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

35 Commits
 
 

Repository files navigation

Kafka Cheat Sheet

Here you have some useful commands for kafka.

Tested on Kafka 2.5

Goals

  • âś“ Add most useful commands for kafka

  • âś“ Add kafkacat commands

  • ❏ Add commands output

  • âś“ Add commands for Kafka on Kubernetes

Table of Contents

Pre-req

First, set some kafka environment vars.

# For Kafka running on top of VMs/Bare Metal
KAFKA_BIN=/opt/kafka/bin
ZOOKEEPER_HOST=zookeeper-host:2181
BROKER_HOST=broker-host:9092

# For Kafka running on top of Kubernetes (Using strimzi)
KAFKA_NAMESPACE=kafka-demo
ZOOKEEPER_HOST=localhost:2181
BROKER_HOST=localhost:9092
ZOOKEEPER_POD=$(kubectl -n $KAFKA_NAMESPACE get pods -l app.kubernetes.io/name=zookeeper -o=jsonpath='{.items[0].metadata.name}')
KAFKA_BROKER_POD=$(kubectl -n $KAFKA_NAMESPACE get pods -l app.kubernetes.io/name=kafka -o=jsonpath='{.items[0].metadata.name}')

Zookeeper Operations

You need to whitelist all the commands bellow.

zookeeper.properties
4lw.commands.whitelist=stat,ruok,reqs,envi,dump,conf,cons,srvr,wchs,wchc,dirs,wchp,mntr,isro
  • If using Zookeeper Auth (SASL)

# Zookeeper Auth
export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/jaas.conf"
jass.conf
Client {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       username="test"
       password="test";
};
  • If using SSL/TLS on Zookeeper + SASL

export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/jaas.conf -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty -Dzookeeper.client.secure=true -Dzookeeper.ssl.trustStore.location=/tmp/kafka.server.truststore -Dzookeeper.ssl.trustStore.password=mypass -Dzookeeper.ssl.trustStore.type=PKCS12"
Note
Remember to change your zookeeper port on the ZOOKEEPER_HOST if necessary

Get runtime conf

# For VMs
echo conf | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo conf | curl telnet://localhost:2181"

Get runtime environments

# For VMs
echo envi | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo envi | curl telnet://localhost:2181"

Health Check

# For VMs
echo stats | curl telnet://$ZOOKEEPER_HOST
echo ruok | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo stats | curl telnet://localhost:2181"
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo ruok | curl telnet://localhost:2181"

Connections

# For VMs
echo reqs | curl telnet://$ZOOKEEPER_HOST
echo cons | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo reqs | curl telnet://localhost:2181"
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo cons | curl telnet://localhost:2181"

Details of the server

# For VMs
echo srvr | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo srvr | curl telnet://localhost:2181"

Brief info about watches

# For VMs
echo wchs | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo wchs | curl telnet://localhost:2181"

Details about watches

# For VMs
echo wchc | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo wchs | curl telnet://localhost:2181"

Snapshots info

# For VMs
echo dirs | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo dirs | curl telnet://localhost:2181"

Monitoring vars

# For VMs
echo mntr | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo mntr | curl telnet://localhost:2181"

Get read-only or read-write mode

# For VMs
echo isro | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo isro | curl telnet://localhost:2181"

Get Process

# For VMs
jps | grep QuorumPeerMain

# For kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $ZOOKEEPER_POD -- bash -c "ps aux | grep QuorumPeerMain"

Broker Operations

List active brokers

# For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/ids

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids
kafkacat -b $BROKER_HOST -L

List Broker Controller

# For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST get /controller

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 get /controller

List broker details

# For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/ids/{id}

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids/{id}
kafkacat -b $BROKER_HOST -L

List topics

# For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/topics

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/topics
kafkacat -b $BROKER_HOST -L -t <my-topic>

Change Broker Config

Change log cleaner threads.

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker id> \
    --alter \
    --add-config log.cleaner.threads=2

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker id> \
    --alter \
    --add-config log.cleaner.threads=2

Describe broker dynamic config

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker id> \
    --describe

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker id> \
    --describe

Delete broker config

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker id> \
    --alter \
    --delete-config log.cleaner.threads

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker id> \
    --alter \
    --delete-config log.cleaner.threads

Change cluster-wide dynamic config

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-default \
    --alter \
    --add-config log.cleaner.threads=2

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-default \
    --alter \
    --add-config log.cleaner.threads=2

Describe cluster-wide dynamic config

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-default \
    --describe

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-default \
    --describe

Disable hostname verification

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker-id> \
    --alter \
    --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker-id> \
    --alter \
    --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

Topic Operations

List topics using kafka-topics.sh

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --list \
    --zookeeper $ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --list \
    --zookeeper $ZOOKEEPER_HOST
# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --bootstrap-server $BROKER_HOST \
    --list

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --list \
    --bootstrap-server $BROKER_HOST

Describe topic

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --topic <topic_name> \
    --describe

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --topic <topic_name> \
    --describe
kafkacat -b $BROKER_HOST -L -t <topic_name>

Describe topic configs

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --describe

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --describe

Delete topic config

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --delete-config <config>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --delete-config <config>

Copy topic to another topic in the same cluster

kafkacat -C -b $BROKER_HOST -t <topic_name> -e | kafkacat -P -b $BROKER_HOST -t <topic-name2>

Copy topic to another topic in another cluster

kafkacat -C -b $BROKER_HOST -t <topic_name> -e | kafkacat -P -b $BROKER_HOST2 -t <topic-name>

Move topic to another broker

Create json necessary

topics-to-move.json
{"topics": [{"topic": "topic1"},
            {"topic": "topic2"}],
"version":1
}

Generate plan to move to brokers

generate plan to move to broker 5 and 6
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --topics-to-move-json-file topics-to-move.json \
    --broker-list "5,6" \
    --generate

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --delete-config <config>
Note
save the results from the command above to cluster-reassignment.json

Move to broker 5 and 6

move to broker 5 and 6
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file cluster-reassignment.json \
    --execute

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file cluster-reassignment.json \
    --execute

Verify status

verify status
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file cluster-reassignment.json \
    --verify

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file cluster-reassignment.json \
    --verify

Create topic

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --create \
    --zookeeper $ZOOKEEPER_HOST \
    --replication-factor 1 \
    --partitions 1 \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --create \
    --zookeeper $ZOOKEEPER_HOST \
    --replication-factor 1 \
    --partitions 1 \
    --topic <topic_name>

Create topic with config

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --bootstrap-server $BROKER_HOST \
    --create \
    --topic <topic_name> \
    --partitions 1 \
    --replication-factor 1 \
    --config max.message.bytes=64000 \
    --config flush.messages=1

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --bootstrap-server $BROKER_HOST \
    --create \
    --topic <topic_name> \
    --partitions 1 \
    --replication-factor 1 \
    --config max.message.bytes=64000 \
    --config flush.messages=1

Increase replication factor of __consumer_offsets

Create replication plan

reassignment.json
{"version":1,
 "partitions":[
   {"topic":"__consumer_offsets", "partition":0,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":1,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":2,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":3,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":4,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":5,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":6,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":7,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":8,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":9,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":10, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":11, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":12, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":13, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":14, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":15, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":16, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":17, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":18, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":19, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":20, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":21, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":22, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":23, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":24, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":25, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":26, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":27, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":28, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":29, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":30, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":31, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":32, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":33, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":34, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":35, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":36, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":37, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":38, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":39, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":40, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":41, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":42, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":43, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":44, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":45, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":46, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":47, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":48, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":49, "replicas":[104,106,101,105]}
 ]
}

Increase partition

# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file reassignment.json \
    --execute

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file reassignment.json \
    --execute

Verify reassignment

# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file reassignment.json \
    --verify

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file reassignment.json \
    --verify

Alter topic

Alter retention time

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --topic <topic_name>\
    --config retention.ms=1000

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --topic <topic_name>\
    --config retention.ms=1000

Alter min.insync.replicas

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --topic <topic_name> \
    --config min.insync.replicas=2

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --topic <topic_name> \
    --config min.insync.replicas=2

Alter max.message.bytes

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --alter \
    --add-config max.message.bytes=128000

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --alter \
    --add-config max.message.bytes=128000

Delete retention time

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --topic <topic_name> \
    --delete-config retention.ms

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --topic <topic_name> \
    --delete-config retention.ms
# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --alter \
    --delete-config retention.ms

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --alter \
    --delete-config retention.ms

List topics under-replicated

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --under-replicated-partitions

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --under-replicated-partitions

Delete topic

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --delete \
    --zookeeper $ZOOKEEPER_HOST \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --delete \
    --zookeeper $ZOOKEEPER_HOST \
    --topic <topic_name>
# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --bootstrap-server $BROKER_HOST \
    --delete \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --bootstrap-server $BROKER_HOST \
    --delete \
    --topic <topic_name>

Get earliest offset

# For VMs
$KAFKA_BIN/kafka-run-class.sh \
    kafka.tools.GetOffsetShell \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --time -2

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh \
    kafka.tools.GetOffsetShell \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --time -2

Get latest offset

# For VMs
$KAFKA_BIN/kafka-run-class.sh \
    kafka.tools.GetOffsetShell \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --time -1

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh \
    kafka.tools.GetOffsetShell \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --time -1

Partition Operations

Increase partition number

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --alter \
    --topic <topic_name> \
    --partitions 8

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --alter \
    --topic <topic_name> \
    --partitions 8

Increase replication factor

topics.json
{
    "topics": [
        {
            "topic": "test"
        }
    ],
    "version": 1
}
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --generate \
    --broker-list "401,402,601" \
    --topics-to-move-json-file topics.json

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --generate \
    --broker-list "401,402,601" \
    --topics-to-move-json-file topics.json
new-replication-factor.json
{"version":1,"partitions":[{"topic":"topic1","partition":0,"replicas":[5,6,7]}]}
execute new replication factor
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file new-replication-factor.json \
    --execute

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file new-replication-factor.json \
    --execute
verify status of partition reassignment
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file new-replication-factor.json \
    --verify

$KAFKA_BIN/kafka-topics.sh \
    --bootstrap-server $ZOOKEEPER_HOST \
    --topic <topic_name> \
    --describe

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file new-replication-factor.json \
    --verify

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --bootstrap-server $ZOOKEEPER_HOST \
    --topic <topic_name> \
    --describe

Reassign partitions

Create plan

topics.json
{
    "topics": [
        {
            "topic": "test"
        }
    ],
    "version": 1
}
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --generate \
    --broker-list "401,402,601" \
    --topics-to-move-json-file topics.json

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --generate \
    --broker-list "401,402,601" \
    --topics-to-move-json-file topics.json

Save the result of the above command to a file named replicas.json

# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file replicas.json  \
    --execute

$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file replicas.json  \
    --verify

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file replicas.json  \
    --execute

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file replicas.json  \
    --verify

List unavailable partitions

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --unavailable-partitions

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --unavailable-partitions

Force election on all partitions

# For VMs
$KAFKA_BIN/kafka-leader-election.sh \
    --election-type preferred \
    --bootstrap-server $BROKER_HOST \
    --all-topic-partitions

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-leader-election.sh \
    --election-type preferred \
    --bootstrap-server $BROKER_HOST \
    --all-topic-partitions

Force election on specific topic and partition

# For VMs
$KAFKA_BIN/kafka-leader-election.sh \
    --election-type preferred \
    --bootstrap-server $BROKER_HOST \
    --topic <topic name> \
    --partition <partition id>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-leader-election.sh \
    --election-type preferred \
    --bootstrap-server $BROKER_HOST \
    --topic <topic name> \
    --partition <partition id>

Consumer

List consumer groups

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --list \
    --bootstrap-server $BROKER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --list \
    --bootstrap-server $BROKER_HOST

Describe consumer groups

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --describe \
    --group <group_id> \
    --bootstrap-server $BROKER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --describe \
    --group <group_id> \
    --bootstrap-server $BROKER_HOST

Describe all consumer groups

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --describe \
    --bootstrap-server $BROKER_HOST \
    --all-groups

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --describe \
    --bootstrap-server $BROKER_HOST \
    --all-groups

Delete consumer group

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --delete \
    --group <group-id-1> \
    --group <group-id-2>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --delete \
    --group <group-id-1> \
    --group <group-id-2>

Active member in a consumer group

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group-id> \
    --members

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group-id> \
    --members

Partition Assigned to each member

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group_id> \
    --members \
    --verbose

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group_id> \
    --members \
    --verbose

Consumer Group State

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group-id> \
    --state

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group-id> \
    --state

Consuming message

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name>
kafkacat -C -b $BROKER_HOST -t <topic_name>

Consuming message and formatting output

kafkacat -C -b $BROKER_HOST -t <topic_name> -q -f 'Topic %t using partition %p at offset %o has key = %k and value = %S'

Consuming message from the beginning

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name> \
    --from-beginning

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name> \
    --from-beginning

Consuming message from the end

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name>

Consuming message and show output in JSON

kafkacat -b $BROKER_HOST -t <topic_name> -J

Consuming and showing message key

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name> \
    --property print.key=true \
    --property key.separator=,

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name> \
    --property print.key=true \
    --property key.separator=,

Read one message

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name> \
    --max-messages 1

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name> \
    --max-messages 1

Read the last 2 messages from topic and then exit

kafkacat -C -b $BROKER_HOST -t <topic_name> -o -2 -e

Read the last 2 messages from partition 0

kafkacat -C -b $BROKER_HOST -t <topic_name> -o -2 -e -p 0

Read from __consumer_offsets

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic __consumer_offsets \
    --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter' \
    --max-messages 1

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic __consumer_offsets \
    --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter' \
    --max-messages 1

Describe __consumer_offsets

# For VMs
$KAFKA_BIN/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --bootstrap-server $BROKER_HOST \
    --group <group-id> \
    --new-consumer \
    --describe

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --bootstrap-server $BROKER_HOST \
    --group <group-id> \
    --new-consumer \
    --describe

Read from __transaction_state

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter" \
    --topic __transaction_state \
    --from-beginning

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic __transaction_state \
    --from-beginning \
    --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter"

Consume using consumer group

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --topic <topic_name> \
    --bootstrap-server $BROKER_HOST \
    --group <group-id>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --topic <topic_name> \
    --bootstrap-server $BROKER_HOST \
    --group <group-id>

Topics to which group is subscribed

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <group_id> \
    --describe

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <group_id> \
    --describe

Reset offset

Reset to the latest offset

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --reset-offsets \
    --group <group-id> \
    --topic topic1 \
    --to-latest

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --reset-offsets \
    --group <group-id> \
    --topic topic1 \
    --to-latest

Reset offset for a consumer group in a topic

# For VMs
# There are many other resetting options
# --shift-by <positive_or_negative_integer> / --to-current / --to-latest / --to-offset <offset_integer>
# --to-datetime <datetime_string> --by-duration <duration_string>
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <group_id> \
    --topic <topic_name> \
    --reset-offsets \
    --to-earliest \
    --execute

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <group_id> \
    --topic <topic_name> \
    --reset-offsets \
    --to-earliest \
    --execute

Reset offset from all consumer groups

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --all-groups \
    --reset-offsets \
    --topic <topic_name> \
    --to-earliest

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --all-groups \
    --reset-offsets \
    --topic <topic_name> \
    --to-earliest

Forward by 2 for example

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <groud_id> \
    --reset-offsets \
    --shift-by 2 \
    --execute \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <groud_id> \
    --reset-offsets \
    --shift-by 2 \
    --execute \
    --topic <topic_name>

Backward by 2 for example

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <groud_id> \
    --reset-offsets \
    --shift-by -2 \
    --execute \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <groud_id> \
    --reset-offsets \
    --shift-by -2 \
    --execute \
    --topic <topic_name>

Describe consumer group

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group_id>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group_id>

Check offset for consumer group

# For VMs
$KAFKA_BIN/kafka-consumer-offset-checker.sh  \
    --zookeeper $ZOOKEEPER_HOST \
    --group <group_id> \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-offset-checker.sh  \
    --zookeeper $ZOOKEEPER_HOST \
    --group <group_id> \
    --topic <topic_name>

Producer

Send message using file

# For VMs
$KAFKA_BIN/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name> < messages.txt

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name> < messages.txt
kafkacat -P -l -b $BROKER_HOST -t <topic_name> messages.txt

Send message using standard input

# For VMs
$KAFKA_BIN/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name>
kafkacat -P -b $BROKER_HOST -t <topic_name>

Send message using snappy compression

kafkacat -P -b $BROKER_HOST -t <topic_name> -z snappy

Send 200 messages to a topic

seq 200 | kafkacat -P -b $BROKER_HOST -t <topic_name>

Send message using string

# For VMs
echo "My Message" | $KAFKA_BIN/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name>
echo "My Message" | kafkacat -b $BROKER_HOST -t <topic_name>

Send message using headers

echo "My Message" | kafkacat -b $BROKER_HOST -t <topic_name>
echo "My Message" | kafkacat -b $BROKER_HOST -H "header1=value1" -H "header2=value2"

Send message using ack=all

# For VMs
$KAFKA_BIN/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --producer-property acks=all

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --producer-property acks=all

Send message with key

# For VMs
$KAFKA_BIN/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --property parse.key=true \
    --property key.separator=,

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --property parse.key=true \
    --property key.separator=,
Note
Your message should be: <mykey>,<message>. For example: Gus,1000.

Quotas

Add quota for user and client-id

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-name <user> \
    --entity-type clients \
    --entity-name <client-id>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-name <user> \
    --entity-type clients \
    --entity-name <client-id>

Add quota for user

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-name <user>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-name <user>

Add quota for client-id

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type clients \
    --entity-name <client-id>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type clients \
    --entity-name <client-id>

Add default client-id quota for user

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-name <user> \
    --entity-type clients \
    --entity-default

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-name <user> \
    --entity-type clients \
    --entity-default

Add default quota for user

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-default

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-default

Add default quota for client-id

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type clients \
    --entity-default

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type clients \
    --entity-default

Describe quota for user and client-id

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --entity-type users \
    --entity-name <user> \
    --entity-type clients \
    --entity-name <cliente-id>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --entity-type users \
    --entity-name <user> \
    --entity-type clients \
    --entity-name <cliente-id>

Describe quota for a user

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --entity-type users \
    --entity-name <user>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --entity-type users \
    --entity-name <user>

Describe quota for a client

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --entity-type clients \
    --entity-name <client-id>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --entity-type clients \
    --entity-name <client-id>

ACLs

Allow <user1> and <user2> to read and write

# For VMs
$KAFKA_BIN/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:<user1> \
    --allow-principal User:<user2> \
    --allow-host <ip-address1> \
    --allow-host <ip-address2> \
    --operation Read \
    --operation Write \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:<user1> \
    --allow-principal User:<user2> \
    --allow-host <ip-address1> \
    --allow-host <ip-address2> \
    --operation Read \
    --operation Write \
    --topic <topic_name>

Allow all read from topic but <user1>

# For VMs
$KAFKA_BIN/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:* \
    --allow-host * \
    --deny-principal User:<user1> \
    --deny-host <ip-address> \
    --operation Read \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:* \
    --allow-host * \
    --deny-principal User:<user1> \
    --deny-host <ip-address> \
    --operation Read \
    --topic <topic_name>

Allow <user1> to produce on all topics

# For VMs
$KAFKA_BIN/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:<user1> \
    --allow-host <ip-address> \
    --producer --topic *

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:<user1> \
    --allow-host <ip-address> \
    --producer --topic *

Allow <user1> to consume on all topics

# For VMs
$KAFKA_BIN/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:<user1> \
    --allow-host <ip-address> \
    --consume --topic *

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:<user1> \
    --allow-host <ip-address> \
    --consume --topic *

Remove ACL for <user1> and <user2> to read and write

# For VMs
$KAFKA_BIN/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --remove \
    --allow-principal User:<user1> \
    --allow-principal User:<user2> \
    --allow-host <ip-address1> \
    --allow-host <ip-address2> \
    --operation Read \
    --operation Write \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --remove \
    --allow-principal User:<user1> \
    --allow-principal User:<user2> \
    --allow-host <ip-address1> \
    --allow-host <ip-address2> \
    --operation Read \
    --operation Write \
    --topic <topic_name>

List ACLs on specific topic

# For VMs
$KAFKA_BIN/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --list \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --list \
    --topic <topic_name>

List ACLs on all topics

# For VMs
$KAFKA_BIN/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --list \
    --topic *

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --list \
    --topic *

Mirror Maker

Mirror topic

# For VMs
$KAFKA_BIN/kafka-mirror-maker.sh \
    --consumer.config consumer.properties \
    --producer.config producer.properties \
    --whitelist <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-mirror-maker.sh \
    --consumer.config consumer.properties \
    --producer.config producer.properties \
    --whitelist <topic_name>

Delegation Token

Create token

# For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --create \
    --max-life-time-period -1 \
    --command-config client.properties \
    --renewer-principal User:<user>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --create \
    --max-life-time-period -1 \
    --command-config client.properties \
    --renewer-principal User:<user>

Renew token

# For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --renew \
    --renew-time-period -1 \
    --command-config client.properties \
    --hmac ABCDEFGHIJK

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --renew \
    --renew-time-period -1 \
    --command-config client.properties \
    --hmac ABCDEFGHIJK

Expire token

# For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --expire \
    --expiry-time-period -1 \
    --command-config client.properties \
    --hmac ABCDEFGHIJK

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --expire \
    --expiry-time-period -1 \
    --command-config client.properties \
    --hmac ABCDEFGHIJK

Describe token

# For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --command-config client.properties \
    --owner-principal User:<user1>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --command-config client.properties \
    --owner-principal User:<user1>

Performance Test

Producer

# For VMs
$KAFKA_BIN/kafka-producer-perf-test.sh \
    --topic teste \
    --num-records 50000000 \
    --record-size 100 \
    --throughput -1 \
    --producer-props acks=all bootstrap.servers=$BROKER_HOST buffer.memory=67108864 batch.size=8196

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-producer-perf-test.sh \
    --topic teste \
    --num-records 50000000 \
    --record-size 100 \
    --throughput -1 \
    --producer-props acks=all bootstrap.servers=$BROKER_HOST buffer.memory=67108864 batch.size=8196

Consumer

# For VMs
$KAFKA_BIN/kafka-consumer-perf-test.sh \
    --group grupo \
    --print-metrics \
    --show-detailed-stats \
    --topic teste \
    --messages 600000 \
    --broker-list $BROKER_HOST \
    --timeout 1000000

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-perf-test.sh \
    --group grupo \
    --print-metrics \
    --show-detailed-stats \
    --topic teste \
    --messages 600000 \
    --broker-list $BROKER_HOST \
    --timeout 1000000