Latest commit 5112558 Jul 19, 2018

README.md

Kubernetes Kafka K8SKafka

This project contains a Docker image meant to facilitate the deployment of Apache Kafka on Kubernetes using StatefulSets.

Limitations

  1. Persistent Volumes must be used. emptyDirs will likely result in a loss of data.
  2. Storage media I/O isolation is not generally possible at this time. Consider using Pod Anti-Affinity rules to place noisy neighbors on separate Nodes.

Docker Image

The docker image contained in this repository is comprised of a base Ubuntu 16.04 image using the latest release of the OpenJDK JRE based on the 1.8 JVM (JDK 8u111), the latest stable release of Kafka (10.2.0) using Scala 2.11. Ubuntu is a much larger image than BusyBox or Alpine, but these images contain mucl or ulibc. This requires a custom version of OpenJDK to be built against a libc runtime other than glibc. While there are smaller Kafka images based on Alpine and BusyBox, the interactions between Kafka, the JVM, and glibc are better understood and easier to debug.

The image is built such that the Kafka JVM process is designated to run as a non-root user. By default, this user is kafka and has UID 1000 and GID 1000. The Kafka package is installed into the /opt/kafka directory, all configuration is installed into /opt/kafka/config and all executables are in /opt/kafka/bin. Due to the implementation of the scripts in /opt/kafka/bin, it is not feasible to symbolically link them into the /user/bin directory. As such, the /opt/kafka/bin directory is added to the PATH environment variable.

ZooKeeper

Kafka requires an installation of Apache Zookeeper for broker configuration storage and coordination. An example of how to deploy a ZooKeeper ensemble on Kubernetes can be found here. For testing purposes an ensemble of 1-3 servers is sufficient. For production use, you should consider deploying at least 5 servers so that you can tolerate the loss of one server during the planned maintenance of another. If you are running ZooKeeper on Kubernetes, it is best to use a separate ensemble for each Kafka cluster. For production use, you should ensure that each ZooKeeper server has at least 2 GiB of heap with at least 4 GiB of reserved memory for the Pod. As ZooKeeper is not particularly CPU intensive, 2 CPUs per server should be sufficient for most use cases. If you are running Kubernetes on a Cloud Provider (e.g. GCP, Azure, or AWS), you should provision a fast storage class for the ZooKeeper PVs. As the PVs are backed by network attached storage, there is little to be gained from isolating the write ahead log from the snapshots directory.

Headless Service

The Kafka Stateful Set requires a Headless Service to control the network domain for the Kafka brokers. The yaml below creates a Headless Service that allows brokers to be discovered and exposes the 9093 port for client connections.

apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  labels:
    app: kafka
spec:
  ports:
  - port: 9093
    name: server
  clusterIP: None
  selector:
    app: kafka

StatefulSet

The Kafka StatefulSet deploys a configurable number of replicas on the Kubernetes cluster. The StatefulSet serviceName must match the Headless Service and specify the desired number of brokers.

apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: kafka-svc
  replicas: 3
  ...

Configuration

This section details the configuration of the Kafka cluster.

Broker Configuration

The configuration for each broker is generated by overriding the default configuration with command line flags. The high and medium importance configuration parameters form the Kafka documentation.

kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override listeners=PLAINTEXT://:9093 \
          --override zookeeper.connect=zk-0.zk-svc.default.svc.cluster.local:2181,zk-1.zk-svc.default.svc.cluster.local:2181,zk-2.zk-svc.default.svc.cluster.local:2181 \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=false \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override inter.broker.protocol.version=0.10.2-IV0 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000 
  • Note that the broker.id is extracted from the ordinal index of the StatefulSet's Pods.
  • The listeners configuration must specify the port indicated by the headless service (9093 in this case).
  • The zookeeper.connect string is a comma separated list of the host:port pairs of the ZooKeeper servers in the ensemble.

OS Image tuning

For production use, it is important to configure the base OS image to allow for a sufficient number of file descriptors for your workload.

  • For each broker, (partition) * (partition_size/segment_size) determines the number of files the Broker will have open at any give time. You must ensure that this will not result in the Broker process dying because it has exhausted its allowable number of file descriptors.

CPU

Typical production Kafka broker deployments run on dual processor Xeon's with multiple hardware threads per core. However, CPU is unlikely to be your bottleneck. An 8 CPU deployment should be more than sufficient for good performance. You should start by simulating your workload with 2-4 CPUs and titrating up from there. It is highly unlikely that CPU will be the bottleneck for your deployment.

Memory

Kafka utilizes the OS page cache heavily to buffer data. To fully understand the interaction of Kafka and Linux containers you should read this and this. In particular, its is important to understand the accounting and isolation offered for the page cache for a mem cgroup. If your primary concern is isolation and performance you should do the following.

  • Determine the number of seconds of data you want to buffer t (time).
  • Determine the total write throughput of the deployment tp (storage/time).
  • tp * t gives the memory storage requirement that you should reserve. This should be set as the memory request for the container.

Disk

Disk throughput is the most common bottleneck that users encounter with Kafka. Given that Persistent Volumes are backed by network attached storage, the throughput is, in most cases, capped on a per Node basis without respect to the number of Persistent Volumes that are attached to the Node. For instance, if you are deploying Kafka onto a GKE or GCP based Kubernetes cluster, and if you use the standard PD type, your maximum sustained per instance throughput is 120 MB/s (Write) and 180 MB/s (Read). If you have multiple applications, each with a Persistent Volume mounted, these numbers represent the total achievable throughput. If you find that you have contention you should consider using Pod Anti-Affinity rules to ensure that noisy neighbors are not collocated on the same Node.

Pod Affinity

The Kafka Pod in the StatefulSet's PodTemplateSpec contains a Pod Anti-Affinity and a Pod Anti-Affinity rule.

    affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values: 
                    - kafka
              topologyKey: "kubernetes.io/hostname"
        podAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
             - weight: 1
               podAffinityTerm:
                 labelSelector:
                    matchExpressions:
                      - key: "app"
                        operator: In
                        values: 
                        - zk
                 topologyKey: "kubernetes.io/hostname"

The Pod Anti-Affinity rule ensures that two Kafka Broker's will never be launched on the same Node. This isn't strictly necessary, but it helps provide stronger availability guarantees in the presence of Node failure, and it helps alleviate disk throughput bottlenecks. The Pod Affinity rule attempts to collocate Kafka and ZooKeeper on the same Nodes. You will likely have more Kafka brokers than ZooKeeper servers, but the Kubernetes scheduler will attempt to, where possible, collocate Kafka brokers and ZooKeeper servers while respecting the hard spreading enforced by the Pod Anti-Affinity rule. This optimization attempts to minimize the amount of network I/O between the ZooKeeper ensemble and the Kafka cluster. However, if disk contention becomes an issue, it is equally valid to express a Pod Anti-Affinity rule to ensure that ZooKeeper servers and Kafka brokers are not scheduled onto the same node.

Testing

The easies way to test your deployment is to use the create a topic and use the console producer and consumer. Use kubectl exec to execute a bash shell on one of the brokers.

> kubectl exec -ti kafka-0 -- bash

From the command line create a topic using kafka-topics.sh

> kafka-topics.sh --create \
--topic test \
--zookeeper zk-0.zk-svc.default.svc.cluster.local:2181,zk-1.zk-svc.default.svc.cluster.local:2181,zk-2.zk-svc.default.svc.cluster.local:2181 \
--partitions 3 \
--replication-factor 2

Run the console consumer as below.

> kafka-console-consumer.sh --topic test --bootstrap-server localhost:9093

Use kubectl exec to execute a bash shell on another one of the brokers. You can use the same broker, but using a different broker will demonstrate that the system is working across multiple Nodes.

> kubectl exec -ti kafka-1 -- bash

Run the console producer and generate a few messages by typing into stdin. Every time you press Enter you will flush a message to the consumer.

> kafka-console-producer.sh --topic test --broker-list localhost:9093
hello
I like kafka
goodbye

You will see the messages on the console in which the console consumer is running.

> kafka-console-consumer.sh --topic test --bootstrap-server localhost:9093
hello
I like kafka
goodbye

Horizontal Scaling

You can use kubectl scale to horizontally scale your cluster. The below will scale the number of brokers to two.

> kubectl scale statefulset kafka --replicas=2

You should note that, when you scale a Kafka cluster up or down you will have to use kafka-reassign-partitions.sh to ensure that your data is correctly replicated and assigned after scaling.