bin/kafka-topics.sh --create --bootstrap-server <BrokerHost:port> --topic <topic_name> --partitions 2 --replication-factor 2
bin/kafka-topics.sh --bootstrap-server <BrokerHost:port> --alter --topic <topic_name> --partitions 101
bin/kafka-topics.sh --bootstrap-server <BrokerHost:port> --list
bin/kafka-topics.sh --bootstrap-server <BrokerHost:port> --describe --topic <topic_name>
bin/kafka-topics.sh --bootstrap-server <BrokerHost:port> --alter --topic <topic_name> --config retention.ms=1000
... wait for 5 minutes ...
bin/kafka-topics.sh --bootstrap-server <BrokerHost:port> --alter --topic <topic_name> --delete-config retention.ms
bin/kafka-topics.sh --bootstrap-server <BrokerHost:port> --delete --topic <topic_name>
bin/kafka-configs.sh --bootstrap-server <BrokerHost:port> --alter --entity-name <topic_name> --add-config 'retention.ms=1000' --entity-type topics
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <kafkahost:port> --topic <topic_name> --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <kafkahost:port> --topic <topic_name> --time -2
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <kafkahost:port> --topic <topic_name> --time -1
bin/kafka-console-producer.sh --broker-list <kafkahost:port> --topic <topic_name>
bin/kafka-console-consumer.sh --new-consumer --bootstrap-server <kafkahost:port> --topic <topic_name> --from-beginning
bin/kafka-consumer-offset-checker.sh --zookeeper=<zookeeperhost:port> --topic=<topic_name> --group=my_consumer_group
Add the following property to config/consumer.properties
:
exclude.internal.topics=false
bin/kafka-console-consumer.sh --consumer.config config/consumer.properties --from-beginning --topic __consumer_offsets --zookeeper <zookeeperhost:port> --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
bin/kafka-consumer-groups.sh --zookeeper <zookeeperhost:port> --list
(old api)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server <kafkahost:port> --list
(new api)
Describe the consumer group (TOPIC, PARTITION, CURRENT-OFFSET, LOG-END-OFFSET, LAG, CONSUMER-ID, HOST and CLIENT-ID)
./bin/kafka-consumer-groups.sh --bootstrap-server <kafkahost:port> --describe --group <group_name>
bin/kafka-consumer-groups.sh --zookeeper <zookeeperhost:port> --describe --group <group_name>
bin/kafka-consumer-groups.sh --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest
This will print the expected result of the reset, but not actually run it.
bin/kafka-consumer-groups.sh --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute
This will execute the reset and reset the consumer group offset for the specified topic back to 0.
kafkacat -C -b <kafkahost:port> -t <topic_name> -p 0 -o -5 -e
bin/zookeeper-shell.sh <zookeeperhost:port>