Skip to content

onyx-platform/onyx-kafka

Repository files navigation

onyx-kafka

Onyx plugin providing read and write facilities for Kafka. This plugin automatically discovers broker locations from ZooKeeper and updates the consumers when there is a broker failover.

This plugin version is only compatible with Kafka 0.10+. Please use onyx-kafka-0.8 with Kafka 0.8.

Installation

In your project file:

[org.onyxplatform/onyx-kafka "0.14.5.0"]

In your peer boot-up namespace:

(:require [onyx.plugin.kafka])

Functions

read-messages

Reads segments from a Kafka topic. Peers will automatically be assigned to each of the topics partitions, balancing the number of partitions over the number of peers, unless :kafka/partition is supplied in which case only one partition will be read from.

Catalog entry:

{:onyx/name :read-messages
 :onyx/plugin :onyx.plugin.kafka/read-messages
 :onyx/type :input
 :onyx/medium :kafka
 :kafka/topic "my topic"
 :kafka/receive-buffer-bytes 65536
 :kafka/zookeeper "127.0.0.1:2181"
 :kafka/offset-reset :earliest
 :kafka/deserializer-fn :my.ns/deserializer-fn
 :kafka/wrap-with-metadata? false
 ;; :kafka/start-offsets {p1 offset1, p2, offset2}
 ;; :kafka/target-offsets {p1 offset3, p2, offset4}
 :onyx/batch-timeout 50
 :onyx/n-peers << NUMBER OF PEERS TO READ PARTITIONS, UP TO N-PARTITION MAX >>
 :onyx/batch-size 100
 :onyx/doc "Reads messages from a Kafka topic"}

Lifecycle entry:

{:lifecycle/task :read-messages
 :lifecycle/calls :onyx.plugin.kafka/read-messages-calls}
Attributes
key type default description
:kafka/topic string The topic name to connect to
:kafka/partition string Optional: partition to read or write to from if auto-assignment is not used
:kafka/zookeeper string The ZooKeeper connection string
:kafka/offset-reset keyword Offset bound to seek to when not found - :earliest or :latest
:kafka/receive-buffer-bytes integer 65536 The size in the receive buffer in the Kafka consumer.
:kafka/key-deserializer-fn keyword A keyword that represents a fully qualified namespaced function to deserialize a record's key. Takes one argument - a byte array. Only used when :kafka/wrap-with-metadata? is true.
:kafka/deserializer-fn keyword A keyword that represents a fully qualified namespaced function to deserialize a record's value. Takes one argument - a byte array
:kafka/wrap-with-metadata? boolean false Wraps message into map with keys :key, :serialized-key-size, :serialized-value-size, :offset, :timestamp, :partition, :topic and :message itself
:kafka/start-offsets map Allows a task to be supplied with the starting offsets for all partitions. Maps partition to offset, e.g. {0 50, 1, 90} will start at offset 50 for partition 0, and offset 90 for partition 1
:kafka/target-offsets map Allows a task to be supplied with target offsets for all partitions. The consumer will read up to and including the target offset for each partition.
:kafka/consumer-opts map A map of arbitrary configuration to merge into the underlying Kafka consumer base configuration. Map should contain strings as keys, and the valid values described in the Kafka Docs.
write-messages

Writes segments to a Kafka topic using the Kafka "new" producer.

Catalog entry:

{:onyx/name :write-messages
 :onyx/plugin :onyx.plugin.kafka/write-messages
 :onyx/type :output
 :onyx/medium :kafka
 :kafka/topic "topic"
 :kafka/zookeeper "127.0.0.1:2181"
 :kafka/serializer-fn :my.ns/serializer-fn
 :kafka/request-size 307200
 :onyx/batch-size batch-size
 :onyx/doc "Writes messages to a Kafka topic"}

Lifecycle entry:

{:lifecycle/task :write-messages
 :lifecycle/calls :onyx.plugin.kafka/write-messages-calls}

Segments supplied to a :onyx.plugin.kafka/write-messages task should be in in the following form: {:message message-body} with optional partition, topic and key values.

{:message message-body
 :key optional-key
 :partition optional-partition
 :topic optional-topic}
Attributes
key type default description
:kafka/topic string The topic name to connect to
:kafka/zookeeper string The ZooKeeper connection string
:kafka/key-serializer-fn keyword A keyword that represents a fully qualified namespaced function to serialize a record's key. Takes one argument - the segment
:kafka/serializer-fn keyword A keyword that represents a fully qualified namespaced function to serialize a record's value. Takes one argument - the segment
:kafka/request-size number 307200 The maximum size of request messages. Maps to the max.request.size value of the internal kafka producer.
:kafka/no-seal? boolean false Do not write :done to the topic when task receives the sentinel signal (end of batch job)
:kafka/producer-opts map A map of arbitrary configuration to merge into the underlying Kafka producer base configuration. Map should contain strings as keys, and the valid values described in the Kafka Docs.

Test Utilities

A take-segments utility function is provided for use when testing the results of jobs with kafka output tasks. take-segments reads from a topic until a :done is reached, and then returns the results. Note, if a :done is never written to a topic, this will hang forever as there is no timeout.

(ns your-ns.a-test
  (:require [onyx.kafka.utils :as kpu]))

;; insert code to run a job here

;; retrieve the segments on the topic
(def results
  (kpu/take-segments (:zookeeper/address peer-config) "yourtopic" your-decompress-fn))

(last results)
; :done

Development

To benchmark, start a real ZooKeeper instance (at 127.0.0.1:2181) and Kafka instance, and run the following benchmarks.

Write perf, single peer writer:

TIMBRE_LOG_LEVEL=:info lein test onyx.plugin.output-bench-test :benchmark

Read perf, single peer reader:

TIMBRE_LOG_LEVEL=:info lein test onyx.plugin.input-benchmark-test :benchmark

Past results are maintained in dev-resources/benchmarking/results.txt.

Contributing

Pull requests into the master branch are welcomed.

License

Copyright © 2018 Distributed Masonry

Distributed under the Eclipse Public License, the same as Clojure.