Skip to content

Commit

Permalink
Test that the input plugin can recover from a consumer
Browse files Browse the repository at this point in the history
error. Passes any exception in the reader thread back to read-batch
which will throw an exception which can be handled via restart-pred-fn
or :lifecycle/handle-exception
  • Loading branch information
lbradstreet committed Jan 26, 2016
1 parent d2cbceb commit 211b9aa
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 3 deletions.
4 changes: 2 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
:username :env
:password :env
:sign-releases false}}
:dependencies [[org.clojure/clojure "1.7.0"]
:dependencies [[org.clojure/clojure "1.8.0"]
^{:voom {:repo "git@github.com:onyx-platform/onyx.git" :branch "master"}}
[org.onyxplatform/onyx "0.8.5"]
[clj-kafka "0.3.2" :exclusions [org.apache.zookeeper/zookeeper zookeeper-clj]]
[clj-kafka "0.3.4" :exclusions [org.apache.zookeeper/zookeeper zookeeper-clj]]
[com.stuartsierra/component "0.2.3"]
[cheshire "5.5.0"]
[zookeeper-clj "0.9.3" :exclusions [io.netty/netty org.apache.zookeeper/zookeeper]]]
Expand Down
14 changes: 13 additions & 1 deletion src/onyx/plugin/kafka.clj
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@
(catch InterruptedException e
(throw e))
(catch Throwable e
(fatal e))))
;; pass exception back to reader thread
(>!! ch (t/input (random-uuid) e)))))
(catch InterruptedException e
(throw e))))

Expand Down Expand Up @@ -180,6 +181,9 @@
result))))
(filter :message)))]
(doseq [m batch]
(when (instance? java.lang.Throwable (:message m))
(throw (:message m)))

(swap! pending-messages assoc (:id m) m))
(when (and (all-done? (vals @pending-messages))
(all-done? batch)
Expand Down Expand Up @@ -286,10 +290,18 @@
serializer-fn (kw->fn (:kafka/serializer-fn task-map))]
(->KafkaWriteMessages config topic producer serializer-fn)))

(defn read-handle-exception [event lifecycle lf-kw exception]
:restart)

(def read-messages-calls
{:lifecycle/before-task-start start-kafka-consumer
:lifecycle/handle-exception read-handle-exception
:lifecycle/after-task-stop close-read-messages})

(defn write-handle-exception [event lifecycle lf-kw exception]
:restart)

(def write-messages-calls
{:lifecycle/before-task-start inject-write-messages
:lifecycle/handle-exception write-handle-exception
:lifecycle/after-task-stop close-write-resources})
150 changes: 150 additions & 0 deletions test/onyx/plugin/input_broker_reboot_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
(ns onyx.plugin.input-broker-reboot-test
(:require [clojure.core.async :refer [chan >!! <!!]]
[onyx.plugin.core-async :refer [take-segments!]]
[onyx.plugin.kafka]
[com.stuartsierra.component :as component]
[clj-kafka.producer :as kp]
[onyx.kafka.embedded-server :as ke]
[onyx.api]
[midje.sweet :refer :all]))

(def id (java.util.UUID/randomUUID))

(def env-config
{:zookeeper/address "127.0.0.1:2188"
:zookeeper/server? true
:zookeeper.server/port 2188
:onyx/id id})

(def peer-config
{:zookeeper/address "127.0.0.1:2188"
:onyx.peer/job-scheduler :onyx.job-scheduler/greedy
:onyx.messaging/impl :aeron
:onyx.messaging/peer-port 40199
:onyx.messaging/bind-addr "localhost"
:onyx/id id})

(def env (onyx.api/start-env env-config))

(def kafka-server
(component/start
(ke/map->EmbeddedKafka {:hostname "127.0.0.1"
:port 9092
:broker-id 0
:log-dir "/tmp/embedded-kafka2"
:zookeeper-addr "127.0.0.1:2188"})))

(def peer-group (onyx.api/start-peer-group peer-config))

(def topic (str "onyx-test-" (java.util.UUID/randomUUID)))

(def producer
(kp/producer
{"metadata.broker.list" "127.0.0.1:9092"
"serializer.class" "kafka.serializer.DefaultEncoder"
"partitioner.class" "kafka.producer.DefaultPartitioner"}))

(kp/send-message producer (kp/message topic (.getBytes (pr-str {:n 1}))))
(kp/send-message producer (kp/message topic (.getBytes (pr-str {:n 2}))))
(kp/send-message producer (kp/message topic (.getBytes (pr-str {:n 3}))))

(defn deserialize-message [bytes]
(read-string (String. bytes "UTF-8")))

(def workflow
[[:read-messages :identity]
[:identity :out]])

(defn restartable? [e]
true)

(def catalog
[{:onyx/name :read-messages
:onyx/plugin :onyx.plugin.kafka/read-messages
:onyx/type :input
:onyx/medium :kafka
:kafka/topic topic
:kafka/group-id "onyx-consumer"
:kafka/fetch-size 307200
:kafka/chan-capacity 1000
:kafka/zookeeper "127.0.0.1:2188"
:kafka/offset-reset :smallest
:kafka/force-reset? false
:kafka/empty-read-back-off 500
:kafka/commit-interval 500
:kafka/deserializer-fn ::deserialize-message
:onyx/restart-pred-fn ::restartable?
:onyx/max-peers 1
:onyx/batch-size 2
:onyx/doc "Reads messages from a Kafka topic"}

{:onyx/name :identity
:onyx/fn :clojure.core/identity
:onyx/type :function
:onyx/batch-size 2}

{:onyx/name :out
:onyx/plugin :onyx.plugin.core-async/output
:onyx/type :output
:onyx/medium :core.async
:onyx/max-peers 1
:onyx/batch-size 2
:onyx/doc "Writes segments to a core.async channel"}])

(def out-chan (chan 100))

(defn inject-out-ch [event lifecycle]
{:core.async/chan out-chan})

(def out-calls
{:lifecycle/before-task-start inject-out-ch})

(def batch-num (atom 0))

(def lifecycles
[{:lifecycle/task :read-messages
:lifecycle/calls :onyx.plugin.kafka/read-messages-calls}
{:lifecycle/task :out
:lifecycle/calls ::out-calls}
{:lifecycle/task :out
:lifecycle/calls :onyx.plugin.core-async/writer-calls}])

(def v-peers (onyx.api/start-peers 4 peer-group))

(onyx.api/submit-job
peer-config
{:catalog catalog :workflow workflow
:lifecycles lifecycles
:task-scheduler :onyx.task-scheduler/balanced})

(Thread/sleep 10000)

(def stopped-server (component/stop kafka-server))

(Thread/sleep 2000)

(def started-again (component/start stopped-server))

(def producer2
(kp/producer
{"metadata.broker.list" "127.0.0.1:9092"
"serializer.class" "kafka.serializer.DefaultEncoder"
"partitioner.class" "kafka.producer.DefaultPartitioner"}))

(kp/send-message producer2 (kp/message topic (.getBytes (pr-str {:n 4}))))
(kp/send-message producer2 (kp/message topic (.getBytes (pr-str {:n 5}))))
(kp/send-message producer2 (kp/message topic (.getBytes (pr-str {:n 6}))))
(kp/send-message producer2 (kp/message topic (.getBytes (pr-str :done))))

(def results (take-segments! out-chan))

(fact (set results) => #{{:n 1} {:n 2} {:n 3} {:n 4} {:n 5} {:n 6} :done})

(doseq [v-peer v-peers]
(onyx.api/shutdown-peer v-peer))

(onyx.api/shutdown-peer-group peer-group)

(onyx.api/shutdown-env env)

(component/stop kafka-server)

0 comments on commit 211b9aa

Please sign in to comment.