Skip to content

Latest commit

 

History

History
2218 lines (1837 loc) · 53.7 KB

README.adoc

File metadata and controls

2218 lines (1837 loc) · 53.7 KB

Kafka Cheat Sheet

Here you have some useful commands for kafka.

Tested on Kafka 2.5

Goals

  • ✓ Add most useful commands for kafka

  • ✓ Add kafkacat commands

  • ❏ Add commands output

  • ✓ Add commands for Kafka on Kubernetes

Table of Contents

Pre-req

First, set some kafka environment vars.

# For Kafka running on top of VMs/Bare Metal
KAFKA_BIN=/opt/kafka/bin
ZOOKEEPER_HOST=zookeeper-host:2181
BROKER_HOST=broker-host:9092

# For Kafka running on top of Kubernetes (Using strimzi)
KAFKA_NAMESPACE=kafka-demo
ZOOKEEPER_HOST=localhost:2181
BROKER_HOST=localhost:9092
ZOOKEEPER_POD=$(kubectl -n $KAFKA_NAMESPACE get pods -l app.kubernetes.io/name=zookeeper -o=jsonpath='{.items[0].metadata.name}')
KAFKA_BROKER_POD=$(kubectl -n $KAFKA_NAMESPACE get pods -l app.kubernetes.io/name=kafka -o=jsonpath='{.items[0].metadata.name}')

Zookeeper Operations

You need to whitelist all the commands bellow.

zookeeper.properties
4lw.commands.whitelist=stat,ruok,reqs,envi,dump,conf,cons,srvr,wchs,wchc,dirs,wchp,mntr,isro
  • If using Zookeeper Auth (SASL)

# Zookeeper Auth
export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/jaas.conf"
jass.conf
Client {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       username="test"
       password="test";
};
  • If using SSL/TLS on Zookeeper + SASL

export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/jaas.conf -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty -Dzookeeper.client.secure=true -Dzookeeper.ssl.trustStore.location=/tmp/kafka.server.truststore -Dzookeeper.ssl.trustStore.password=mypass -Dzookeeper.ssl.trustStore.type=PKCS12"
Note
Remember to change your zookeeper port on the ZOOKEEPER_HOST if necessary

Get runtime conf

# For VMs
echo conf | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo conf | curl telnet://localhost:2181"

Get runtime environments

# For VMs
echo envi | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo envi | curl telnet://localhost:2181"

Health Check

# For VMs
echo stats | curl telnet://$ZOOKEEPER_HOST
echo ruok | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo stats | curl telnet://localhost:2181"
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo ruok | curl telnet://localhost:2181"

Connections

# For VMs
echo reqs | curl telnet://$ZOOKEEPER_HOST
echo cons | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo reqs | curl telnet://localhost:2181"
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo cons | curl telnet://localhost:2181"

Details of the server

# For VMs
echo srvr | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo srvr | curl telnet://localhost:2181"

Brief info about watches

# For VMs
echo wchs | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo wchs | curl telnet://localhost:2181"

Details about watches

# For VMs
echo wchc | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo wchs | curl telnet://localhost:2181"

Snapshots info

# For VMs
echo dirs | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo dirs | curl telnet://localhost:2181"

Monitoring vars

# For VMs
echo mntr | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo mntr | curl telnet://localhost:2181"

Get read-only or read-write mode

# For VMs
echo isro | curl telnet://$ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo isro | curl telnet://localhost:2181"

Get Process

# For VMs
jps | grep QuorumPeerMain

# For kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $ZOOKEEPER_POD -- bash -c "ps aux | grep QuorumPeerMain"

Broker Operations

List active brokers

# For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/ids

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids
kafkacat -b $BROKER_HOST -L

List Broker Controller

# For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST get /controller

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 get /controller

List broker details

# For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/ids/{id}

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids/{id}
kafkacat -b $BROKER_HOST -L

List topics

# For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/topics

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/topics
kafkacat -b $BROKER_HOST -L -t <my-topic>

Change Broker Config

Change log cleaner threads.

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker id> \
    --alter \
    --add-config log.cleaner.threads=2

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker id> \
    --alter \
    --add-config log.cleaner.threads=2

Describe broker dynamic config

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker id> \
    --describe

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker id> \
    --describe

Delete broker config

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker id> \
    --alter \
    --delete-config log.cleaner.threads

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker id> \
    --alter \
    --delete-config log.cleaner.threads

Change cluster-wide dynamic config

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-default \
    --alter \
    --add-config log.cleaner.threads=2

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-default \
    --alter \
    --add-config log.cleaner.threads=2

Describe cluster-wide dynamic config

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-default \
    --describe

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-default \
    --describe

Disable hostname verification

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker-id> \
    --alter \
    --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --bootstrap-server $BROKER_HOST \
    --entity-type brokers \
    --entity-name <broker-id> \
    --alter \
    --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

Topic Operations

List topics using kafka-topics.sh

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --list \
    --zookeeper $ZOOKEEPER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --list \
    --zookeeper $ZOOKEEPER_HOST
# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --bootstrap-server $BROKER_HOST \
    --list

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --list \
    --bootstrap-server $BROKER_HOST

Describe topic

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --topic <topic_name> \
    --describe

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --topic <topic_name> \
    --describe
kafkacat -b $BROKER_HOST -L -t <topic_name>

Describe topic configs

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --describe

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --describe

Delete topic config

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --delete-config <config>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --delete-config <config>

Copy topic to another topic in the same cluster

kafkacat -C -b $BROKER_HOST -t <topic_name> -e | kafkacat -P -b $BROKER_HOST -t <topic-name2>

Copy topic to another topic in another cluster

kafkacat -C -b $BROKER_HOST -t <topic_name> -e | kafkacat -P -b $BROKER_HOST2 -t <topic-name>

Move topic to another broker

Create json necessary

topics-to-move.json
{"topics": [{"topic": "topic1"},
            {"topic": "topic2"}],
"version":1
}

Generate plan to move to brokers

generate plan to move to broker 5 and 6
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --topics-to-move-json-file topics-to-move.json \
    --broker-list "5,6" \
    --generate

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --delete-config <config>
Note
save the results from the command above to cluster-reassignment.json

Move to broker 5 and 6

move to broker 5 and 6
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file cluster-reassignment.json \
    --execute

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file cluster-reassignment.json \
    --execute

Verify status

verify status
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file cluster-reassignment.json \
    --verify

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file cluster-reassignment.json \
    --verify

Create topic

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --create \
    --zookeeper $ZOOKEEPER_HOST \
    --replication-factor 1 \
    --partitions 1 \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --create \
    --zookeeper $ZOOKEEPER_HOST \
    --replication-factor 1 \
    --partitions 1 \
    --topic <topic_name>

Create topic with config

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --bootstrap-server $BROKER_HOST \
    --create \
    --topic <topic_name> \
    --partitions 1 \
    --replication-factor 1 \
    --config max.message.bytes=64000 \
    --config flush.messages=1

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --bootstrap-server $BROKER_HOST \
    --create \
    --topic <topic_name> \
    --partitions 1 \
    --replication-factor 1 \
    --config max.message.bytes=64000 \
    --config flush.messages=1

Increase replication factor of __consumer_offsets

Create replication plan

reassignment.json
{"version":1,
 "partitions":[
   {"topic":"__consumer_offsets", "partition":0,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":1,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":2,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":3,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":4,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":5,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":6,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":7,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":8,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":9,  "replicas":[106,101,102,105]},
   {"topic":"__consumer_offsets", "partition":10, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":11, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":12, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":13, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":14, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":15, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":16, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":17, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":18, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":19, "replicas":[101,102,103,105]},
   {"topic":"__consumer_offsets", "partition":20, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":21, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":22, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":23, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":24, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":25, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":26, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":27, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":28, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":29, "replicas":[102,103,104,105]},
   {"topic":"__consumer_offsets", "partition":30, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":31, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":32, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":33, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":34, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":35, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":36, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":37, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":38, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":39, "replicas":[103,104,106,105]},
   {"topic":"__consumer_offsets", "partition":40, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":41, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":42, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":43, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":44, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":45, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":46, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":47, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":48, "replicas":[104,106,101,105]},
   {"topic":"__consumer_offsets", "partition":49, "replicas":[104,106,101,105]}
 ]
}

Increase partition

# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file reassignment.json \
    --execute

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file reassignment.json \
    --execute

Verify reassignment

# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file reassignment.json \
    --verify

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file reassignment.json \
    --verify

Alter topic

Alter retention time

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --topic <topic_name>\
    --config retention.ms=1000

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --topic <topic_name>\
    --config retention.ms=1000

Alter min.insync.replicas

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --topic <topic_name> \
    --config min.insync.replicas=2

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --topic <topic_name> \
    --config min.insync.replicas=2

Alter max.message.bytes

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --alter \
    --add-config max.message.bytes=128000

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --alter \
    --add-config max.message.bytes=128000

Delete retention time

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --topic <topic_name> \
    --delete-config retention.ms

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --topic <topic_name> \
    --delete-config retention.ms
# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --alter \
    --delete-config retention.ms

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --entity-type topics \
    --entity-name <topic_name> \
    --alter \
    --delete-config retention.ms

List topics under-replicated

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --under-replicated-partitions

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --under-replicated-partitions

Delete topic

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --delete \
    --zookeeper $ZOOKEEPER_HOST \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --delete \
    --zookeeper $ZOOKEEPER_HOST \
    --topic <topic_name>
# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --bootstrap-server $BROKER_HOST \
    --delete \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --bootstrap-server $BROKER_HOST \
    --delete \
    --topic <topic_name>

Get earliest offset

# For VMs
$KAFKA_BIN/kafka-run-class.sh \
    kafka.tools.GetOffsetShell \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --time -2

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh \
    kafka.tools.GetOffsetShell \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --time -2

Get latest offset

# For VMs
$KAFKA_BIN/kafka-run-class.sh \
    kafka.tools.GetOffsetShell \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --time -1

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh \
    kafka.tools.GetOffsetShell \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --time -1

Partition Operations

Increase partition number

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --alter \
    --topic <topic_name> \
    --partitions 8

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --alter \
    --topic <topic_name> \
    --partitions 8

Increase replication factor

topics.json
{
    "topics": [
        {
            "topic": "test"
        }
    ],
    "version": 1
}
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --generate \
    --broker-list "401,402,601" \
    --topics-to-move-json-file topics.json

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --generate \
    --broker-list "401,402,601" \
    --topics-to-move-json-file topics.json
new-replication-factor.json
{"version":1,"partitions":[{"topic":"topic1","partition":0,"replicas":[5,6,7]}]}
execute new replication factor
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file new-replication-factor.json \
    --execute

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file new-replication-factor.json \
    --execute
verify status of partition reassignment
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file new-replication-factor.json \
    --verify

$KAFKA_BIN/kafka-topics.sh \
    --bootstrap-server $ZOOKEEPER_HOST \
    --topic <topic_name> \
    --describe

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file new-replication-factor.json \
    --verify

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --bootstrap-server $ZOOKEEPER_HOST \
    --topic <topic_name> \
    --describe

Reassign partitions

Create plan

topics.json
{
    "topics": [
        {
            "topic": "test"
        }
    ],
    "version": 1
}
# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --generate \
    --broker-list "401,402,601" \
    --topics-to-move-json-file topics.json

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --generate \
    --broker-list "401,402,601" \
    --topics-to-move-json-file topics.json

Save the result of the above command to a file named replicas.json

# For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file replicas.json  \
    --execute

$KAFKA_BIN/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file replicas.json  \
    --verify

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file replicas.json  \
    --execute

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --reassignment-json-file replicas.json  \
    --verify

List unavailable partitions

# For VMs
$KAFKA_BIN/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --unavailable-partitions

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --unavailable-partitions

Force election on all partitions

# For VMs
$KAFKA_BIN/kafka-leader-election.sh \
    --election-type preferred \
    --bootstrap-server $BROKER_HOST \
    --all-topic-partitions

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-leader-election.sh \
    --election-type preferred \
    --bootstrap-server $BROKER_HOST \
    --all-topic-partitions

Force election on specific topic and partition

# For VMs
$KAFKA_BIN/kafka-leader-election.sh \
    --election-type preferred \
    --bootstrap-server $BROKER_HOST \
    --topic <topic name> \
    --partition <partition id>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-leader-election.sh \
    --election-type preferred \
    --bootstrap-server $BROKER_HOST \
    --topic <topic name> \
    --partition <partition id>

Consumer

List consumer groups

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --list \
    --bootstrap-server $BROKER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --list \
    --bootstrap-server $BROKER_HOST

Describe consumer groups

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --describe \
    --group <group_id> \
    --bootstrap-server $BROKER_HOST

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --describe \
    --group <group_id> \
    --bootstrap-server $BROKER_HOST

Describe all consumer groups

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --describe \
    --bootstrap-server $BROKER_HOST \
    --all-groups

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --describe \
    --bootstrap-server $BROKER_HOST \
    --all-groups

Delete consumer group

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --delete \
    --group <group-id-1> \
    --group <group-id-2>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --delete \
    --group <group-id-1> \
    --group <group-id-2>

Active member in a consumer group

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group-id> \
    --members

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group-id> \
    --members

Partition Assigned to each member

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group_id> \
    --members \
    --verbose

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group_id> \
    --members \
    --verbose

Consumer Group State

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group-id> \
    --state

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group-id> \
    --state

Consuming message

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name>
kafkacat -C -b $BROKER_HOST -t <topic_name>

Consuming message and formatting output

kafkacat -C -b $BROKER_HOST -t <topic_name> -q -f 'Topic %t using partition %p at offset %o has key = %k and value = %S'

Consuming message from the beginning

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name> \
    --from-beginning

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name> \
    --from-beginning

Consuming message from the end

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name>

Consuming message and show output in JSON

kafkacat -b $BROKER_HOST -t <topic_name> -J

Consuming and showing message key

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name> \
    --property print.key=true \
    --property key.separator=,

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name> \
    --property print.key=true \
    --property key.separator=,

Read one message

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name> \
    --max-messages 1

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic <topic_name> \
    --max-messages 1

Read the last 2 messages from topic and then exit

kafkacat -C -b $BROKER_HOST -t <topic_name> -o -2 -e

Read the last 2 messages from partition 0

kafkacat -C -b $BROKER_HOST -t <topic_name> -o -2 -e -p 0

Read from __consumer_offsets

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic __consumer_offsets \
    --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter' \
    --max-messages 1

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic __consumer_offsets \
    --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter' \
    --max-messages 1

Describe __consumer_offsets

# For VMs
$KAFKA_BIN/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --bootstrap-server $BROKER_HOST \
    --group <group-id> \
    --new-consumer \
    --describe

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --bootstrap-server $BROKER_HOST \
    --group <group-id> \
    --new-consumer \
    --describe

Read from __transaction_state

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter" \
    --topic __transaction_state \
    --from-beginning

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --bootstrap-server $BROKER_HOST \
    --topic __transaction_state \
    --from-beginning \
    --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter"

Consume using consumer group

# For VMs
$KAFKA_BIN/kafka-console-consumer.sh \
    --topic <topic_name> \
    --bootstrap-server $BROKER_HOST \
    --group <group-id>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh \
    --topic <topic_name> \
    --bootstrap-server $BROKER_HOST \
    --group <group-id>

Topics to which group is subscribed

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <group_id> \
    --describe

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <group_id> \
    --describe

Reset offset

Reset to the latest offset

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --reset-offsets \
    --group <group-id> \
    --topic topic1 \
    --to-latest

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --reset-offsets \
    --group <group-id> \
    --topic topic1 \
    --to-latest

Reset offset for a consumer group in a topic

# For VMs
# There are many other resetting options
# --shift-by <positive_or_negative_integer> / --to-current / --to-latest / --to-offset <offset_integer>
# --to-datetime <datetime_string> --by-duration <duration_string>
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <group_id> \
    --topic <topic_name> \
    --reset-offsets \
    --to-earliest \
    --execute

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <group_id> \
    --topic <topic_name> \
    --reset-offsets \
    --to-earliest \
    --execute

Reset offset from all consumer groups

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --all-groups \
    --reset-offsets \
    --topic <topic_name> \
    --to-earliest

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --all-groups \
    --reset-offsets \
    --topic <topic_name> \
    --to-earliest

Forward by 2 for example

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <groud_id> \
    --reset-offsets \
    --shift-by 2 \
    --execute \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <groud_id> \
    --reset-offsets \
    --shift-by 2 \
    --execute \
    --topic <topic_name>

Backward by 2 for example

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <groud_id> \
    --reset-offsets \
    --shift-by -2 \
    --execute \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --group <groud_id> \
    --reset-offsets \
    --shift-by -2 \
    --execute \
    --topic <topic_name>

Describe consumer group

# For VMs
$KAFKA_BIN/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group_id>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --group <group_id>

Check offset for consumer group

# For VMs
$KAFKA_BIN/kafka-consumer-offset-checker.sh  \
    --zookeeper $ZOOKEEPER_HOST \
    --group <group_id> \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-offset-checker.sh  \
    --zookeeper $ZOOKEEPER_HOST \
    --group <group_id> \
    --topic <topic_name>

Producer

Send message using file

# For VMs
$KAFKA_BIN/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name> < messages.txt

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name> < messages.txt
kafkacat -P -l -b $BROKER_HOST -t <topic_name> messages.txt

Send message using standard input

# For VMs
$KAFKA_BIN/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name>
kafkacat -P -b $BROKER_HOST -t <topic_name>

Send message using snappy compression

kafkacat -P -b $BROKER_HOST -t <topic_name> -z snappy

Send 200 messages to a topic

seq 200 | kafkacat -P -b $BROKER_HOST -t <topic_name>

Send message using string

# For VMs
echo "My Message" | $KAFKA_BIN/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name>
echo "My Message" | kafkacat -b $BROKER_HOST -t <topic_name>

Send message using headers

echo "My Message" | kafkacat -b $BROKER_HOST -t <topic_name>
echo "My Message" | kafkacat -b $BROKER_HOST -H "header1=value1" -H "header2=value2"

Send message using ack=all

# For VMs
$KAFKA_BIN/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --producer-property acks=all

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --producer-property acks=all

Send message with key

# For VMs
$KAFKA_BIN/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --property parse.key=true \
    --property key.separator=,

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh \
    --broker-list $BROKER_HOST \
    --topic <topic_name> \
    --property parse.key=true \
    --property key.separator=,
Note
Your message should be: <mykey>,<message>. For example: Gus,1000.

Quotas

Add quota for user and client-id

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-name <user> \
    --entity-type clients \
    --entity-name <client-id>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-name <user> \
    --entity-type clients \
    --entity-name <client-id>

Add quota for user

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-name <user>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-name <user>

Add quota for client-id

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type clients \
    --entity-name <client-id>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type clients \
    --entity-name <client-id>

Add default client-id quota for user

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-name <user> \
    --entity-type clients \
    --entity-default

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-name <user> \
    --entity-type clients \
    --entity-default

Add default quota for user

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-default

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type users \
    --entity-default

Add default quota for client-id

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type clients \
    --entity-default

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --alter \
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' \
    --entity-type clients \
    --entity-default

Describe quota for user and client-id

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --entity-type users \
    --entity-name <user> \
    --entity-type clients \
    --entity-name <cliente-id>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --entity-type users \
    --entity-name <user> \
    --entity-type clients \
    --entity-name <cliente-id>

Describe quota for a user

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --entity-type users \
    --entity-name <user>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --entity-type users \
    --entity-name <user>

Describe quota for a client

# For VMs
$KAFKA_BIN/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --entity-type clients \
    --entity-name <client-id>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh \
    --zookeeper $ZOOKEEPER_HOST \
    --describe \
    --entity-type clients \
    --entity-name <client-id>

ACLs

Allow <user1> and <user2> to read and write

# For VMs
$KAFKA_BIN/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:<user1> \
    --allow-principal User:<user2> \
    --allow-host <ip-address1> \
    --allow-host <ip-address2> \
    --operation Read \
    --operation Write \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:<user1> \
    --allow-principal User:<user2> \
    --allow-host <ip-address1> \
    --allow-host <ip-address2> \
    --operation Read \
    --operation Write \
    --topic <topic_name>

Allow all read from topic but <user1>

# For VMs
$KAFKA_BIN/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:* \
    --allow-host * \
    --deny-principal User:<user1> \
    --deny-host <ip-address> \
    --operation Read \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:* \
    --allow-host * \
    --deny-principal User:<user1> \
    --deny-host <ip-address> \
    --operation Read \
    --topic <topic_name>

Allow <user1> to produce on all topics

# For VMs
$KAFKA_BIN/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:<user1> \
    --allow-host <ip-address> \
    --producer --topic *

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:<user1> \
    --allow-host <ip-address> \
    --producer --topic *

Allow <user1> to consume on all topics

# For VMs
$KAFKA_BIN/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:<user1> \
    --allow-host <ip-address> \
    --consume --topic *

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --add \
    --allow-principal User:<user1> \
    --allow-host <ip-address> \
    --consume --topic *

Remove ACL for <user1> and <user2> to read and write

# For VMs
$KAFKA_BIN/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --remove \
    --allow-principal User:<user1> \
    --allow-principal User:<user2> \
    --allow-host <ip-address1> \
    --allow-host <ip-address2> \
    --operation Read \
    --operation Write \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --remove \
    --allow-principal User:<user1> \
    --allow-principal User:<user2> \
    --allow-host <ip-address1> \
    --allow-host <ip-address2> \
    --operation Read \
    --operation Write \
    --topic <topic_name>

List ACLs on specific topic

# For VMs
$KAFKA_BIN/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --list \
    --topic <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --list \
    --topic <topic_name>

List ACLs on all topics

# For VMs
$KAFKA_BIN/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --list \
    --topic *

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh \
    --authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST \
    --list \
    --topic *

Mirror Maker

Mirror topic

# For VMs
$KAFKA_BIN/kafka-mirror-maker.sh \
    --consumer.config consumer.properties \
    --producer.config producer.properties \
    --whitelist <topic_name>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-mirror-maker.sh \
    --consumer.config consumer.properties \
    --producer.config producer.properties \
    --whitelist <topic_name>

Delegation Token

Create token

# For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --create \
    --max-life-time-period -1 \
    --command-config client.properties \
    --renewer-principal User:<user>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --create \
    --max-life-time-period -1 \
    --command-config client.properties \
    --renewer-principal User:<user>

Renew token

# For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --renew \
    --renew-time-period -1 \
    --command-config client.properties \
    --hmac ABCDEFGHIJK

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --renew \
    --renew-time-period -1 \
    --command-config client.properties \
    --hmac ABCDEFGHIJK

Expire token

# For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --expire \
    --expiry-time-period -1 \
    --command-config client.properties \
    --hmac ABCDEFGHIJK

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --expire \
    --expiry-time-period -1 \
    --command-config client.properties \
    --hmac ABCDEFGHIJK

Describe token

# For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --command-config client.properties \
    --owner-principal User:<user1>

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh \
    --bootstrap-server $BROKER_HOST \
    --describe \
    --command-config client.properties \
    --owner-principal User:<user1>

Performance Test

Producer

# For VMs
$KAFKA_BIN/kafka-producer-perf-test.sh \
    --topic teste \
    --num-records 50000000 \
    --record-size 100 \
    --throughput -1 \
    --producer-props acks=all bootstrap.servers=$BROKER_HOST buffer.memory=67108864 batch.size=8196

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-producer-perf-test.sh \
    --topic teste \
    --num-records 50000000 \
    --record-size 100 \
    --throughput -1 \
    --producer-props acks=all bootstrap.servers=$BROKER_HOST buffer.memory=67108864 batch.size=8196

Consumer

# For VMs
$KAFKA_BIN/kafka-consumer-perf-test.sh \
    --group grupo \
    --print-metrics \
    --show-detailed-stats \
    --topic teste \
    --messages 600000 \
    --broker-list $BROKER_HOST \
    --timeout 1000000

# For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-perf-test.sh \
    --group grupo \
    --print-metrics \
    --show-detailed-stats \
    --topic teste \
    --messages 600000 \
    --broker-list $BROKER_HOST \
    --timeout 1000000