Skip to content

osskit/dafka-consumer

Repository files navigation

Dockerized kafka consumer

Overview

Dafka-consumer is a dockerized Kafka consumer used to abstract consuming messages from a kafka topic.

Using dafka-consumer, consuming messages is as simple as getting a POST request to your service, with the body of the request being the kafka message.

Motivation

Why use this over just a Kafka client?

  • Abstracts away the messaging layer, could be replaced with RabbitMQ or any other consumer.
  • Separates configuration, everything that's related to Kafka is encapsulated in Dafka and not the service itself.
  • When testing your service you only test your service's logic and not the messaging layer implementation details.

image

Usage & Examples

docker-compose

version: '3.9'

services:
    consumer:
        image: osskit/dafka-consumer
        ports:
            - 4001:4001
        environment:
            - KAFKA_BROKER=kafka:9092
            - TARGET_BASE_URL=http://target:2000/
            - TOPICS_ROUTES=foo:consume,bar:consume,^([^.]+).bar:consume
            - DEAD_LETTER_TOPIC=dead-letter
            - GROUP_ID=consumer_1
            - MONITORING_SERVER_PORT=4001

In joint with dafka-producer:

version: '3.9'

services:
    consumer:
        image: osskit/dafka-consumer
        ports:
            - 4001:4001
        environment:
            - KAFKA_BROKER=kafka:9092
            - TARGET_BASE_URL=http://target:2000/
            - TOPICS_ROUTES=foo:consume,bar:consume,^([^.]+).bar:consume
            - DEAD_LETTER_TOPIC=dead-letter # optional
            - GROUP_ID=consumer_1

    producer:
        image: osskit/dafka-producer
        ports:
            - 6000:6000
        environment:
            - PORT=6000
            - KAFKA_BROKER=kafka:9092

Kubernetes

You can use the provided Helm Chart, this gives you a Deployment separated from your service's Pod.

It's also possible to use this as a Sidecar.

Parameters

Container images are configured using parameters passed at runtime.

Parameter Default Values Description
KAFKA_BROKER required URL for the Kafka Broker
TARGET_BASE_URL required The target's HTTP POST endpoint
GROUP_ID required A unique id for the consumer group
TOPICS_ROUTES required A map between topics and their endpoint routes (e.g topic:/consume)
TARGET_TIMEOUT_MS 298000 Timeout for the target's response. Must be lower then RETRY_POLICY_MAX_DURATION_MS
RETRY_POLICY_MAX_DURATION_MS 299000 Maximum duration of all retry attempts. Must be lower then KAFKA_POLL_INTERVAL_MS
KAFKA_POLL_INTERVAL_MS 300000 (5 min) The maximum delay between invocations of poll() when using consumer group management. See max.poll.interval.ms for more details.
POLL_TIMEOUT 1000 Description of POLL_TIMEOUT
MAX_POLL_RECORDS 50 Number of records to process in a single batch
SESSION_TIMEOUT 10000 Description of SESSION_TIMEOUT
RETRY_PROCESS_WHEN_STATUS_CODE_MATCH 5[0-9][0-9] Retry to process the record if the returning status code matches the regex
PRODUCE_TO_DEAD_LETTER_TOPIC_WHEN_STATUS_CODE_MATCH ^(?!2\\d\\d$)\\d{3}$ Produce to dead letter topic when matching status code regex
RETRY_POLICY_EXPONENTIAL_BACKOFF 50,5000,10 A list that represents the [delay, maxDelay, delayFactor] in retrying message processing
DEAD_LETTER_TOPIC null Dead letter topic name
MONITORING_SERVER_PORT 0 Consumer monitoring and healthcheck service port
TARGET_HEALTHCHECK null Target's healthcheck endpoint to verify it's alive
USE_SASL_AUTH=false false use SASL authentication
SASL_USERNAME required if USE_SASL_AUTH=true SASL username to authenticate
SASL_PASSWORD required if USE_SASL_AUTH=true SASL password to authenticate
TRUSTSTORE_FILE_PATH null Truststore certificate file path
TRUSTSTORE_PASSWORD required if TRUSTSTORE_FILE_PATH != null Truststore's password
USE_PROMETHEUS false Export metrics to Prometheus
PROMETHEUS_BUCKETS 0.003,0.03,0.1,0.3,1.5,10 A list of Prometheus buckets to use

License

MIT License