Skip to content
/ kafqa Public
forked from gojek/kafqa

Quality tool for kafka, verifying kafka ops

License

Notifications You must be signed in to change notification settings

lavkesh/kafqa

 
 

Repository files navigation

KAFQA

Build Status codecov Kafka quality analyser, measuring data loss, ops, latency

Running

  • ensure go modules is enabled GO111MODULES=on if part of GOPATH and having old go version.
  • ensure kafka broker mentioned in config is up.
source kafkqa.env && go build && ./kafkqa
  • run make to run tests and linting

Report

Tool generates report which contains the following information.

  • latency: average, min, max of latency (consumption till msg received)
  • Total messages sent, received and lost
  • App run time
+---+--------------------------------+--------------+
|   |          DESCRIPTION           |    VALUE     |
+---+--------------------------------+--------------+
| 1 | Messages Lost                  |        49995 |
| 2 | Messages Sent                  |        50000 |
| 3 | Messages Received              |            5 |
| 3 | Min Consumption Latency Millis |         7446 |
| 3 | Max Consumption Latency Millis |         7461 |
| 3 | App Run Time                   | 8.801455502s |
+---+--------------------------------+--------------+

Dashboard

prometheus metrics can be viewed in grafana by importing the dashboard in scripts/dasbhoard

Data

Message format sent over kafka

message {
    sequence id
    id (unique) UUID
    timestamp
    random (size s/m/l)
}

Running separate consumer and producers

  • CONSUMER_ENABLED, PRODUCER_ENABLED can be set to only run specific component
  • setting PRODUCER_TOTAL_MESSAGES=-1 will produce the messages infinitely.
# run only consumer
CONSUMER_ENABLED="true"
PRODUCER_ENABLED="false"
  • If you want to consume message produce in proto format from non kafqa producer
  • The latency will be measured from the consumed time to the timestamp given in the proto.
export PROTO_PARSER_ENABLED="true"
export PROTO_PARSER_MESSAGE_NAME="com.test.user.UserLocationLogMessage"
export PROTO_PARSER_FILE_PATH=/proto/test.proto
export PROTO_PARSER_TIMESTAMP_INDEX=3
  • Requires redis store to track and ack messages
STORE_TYPE="redis"
STORE_REDIS_HOST="127.0.0.1:6379"
STORE_RUN_ID="run-$CONSUMER_GROUP_ID"

SSL Setup

Producer and consumer supports SSL, set the following env configuration

CONSUMER_SECURITY_PROTOCOL="ssl"
CONSUMER_CA_LOCATION="/certs/ca/rootCA.crt" # Public root ca certificate
CONSUMER_CERTIFICATE_LOCATION="/certs/client/client.crt" # certificate signed by ICA / root CA
CONSUMER_KEY_LOCATION="/certs/client/client.key" # private key

Disable consumer Auto commit

if consumer is restarted, some messages could be not tracked, as it's committed before processing. To disable and commit after processing the messages (This increases the run time though) set `CONSUMER_ENABLE_AUTO_COMMIT="false"

Configuration of application is customisable with kafkq.env eg: tweak the concurrency of producers/consumers.

Todo

  • Compute now - kafka timestamp and report it
  • Generate Random consumer group and topic id (for development)
  • Add more metrics on messages which're lost (ID/Sequence/Duplicates)
  • Producer to handle high throughput (queue full issue)
  • measure % of data loss, average of latency

Done:

  • convert fmt to log
  • Add timestamp to kafka message
  • Makefile
  • Compute lag (receive t - produce t)
  • Consumer
    • listen to interrupt and kill consumer or stop with timeout
  • Add store to keep track of messages (producer) [interface]
  • Ack in store to for received messages (consumer)
  • Generate produce & consume basic report
  • Prometheus exporter for metrics
  • CI (vet/lint/golangci) (travis)
  • Capture throughput metrics

About

Quality tool for kafka, verifying kafka ops

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Go 96.9%
  • Makefile 2.3%
  • Other 0.8%