Skip to content

Commit

Permalink
Improve riemann.kafka resilience to empty messages (#1016)
Browse files Browse the repository at this point in the history
  • Loading branch information
szrumi committed Jun 5, 2022
1 parent 1ec011a commit 0f81590
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions src/riemann/kafka.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
(:use [riemann.common :only [event]]
[riemann.core :only [stream!]]
[riemann.service :only [Service ServiceEquiv ServiceStatus]]
[clojure.tools.logging :only [info error]]))
[clojure.tools.logging :only [debug info error]]))

(defn kafka
"Returns a function that is invoked with a topic name and an optional message key and returns a stream. That stream is a function which takes an event or a sequence of events and sends them to Kafka.
Expand Down Expand Up @@ -82,8 +82,11 @@
msgs-by-topic (get msgs :by-topic)]
(doseq [records msgs-by-topic
record (last records)]
(let [event (event (get record :value))]
(stream! @core event)))))
(try
(let [event (event (get record :value))]
(stream! @core event))
(catch java.lang.NullPointerException e
(debug (str "Invalid message: " record)))))))
(catch Exception e
(do
(reset! running? false)
Expand Down

0 comments on commit 0f81590

Please sign in to comment.