Skip to content

Commit

Permalink
Refactor riemann.transport.tcp
Browse files Browse the repository at this point in the history
Tries to clean up the coupling between the TCP and graphite servers by
decoupling the netty handler into a generic wrapper and a
server-specific handler function. Ugh, this lifecycle stuff is getting
out of hand.
  • Loading branch information
aphyr committed Jan 2, 2013
1 parent b5f2ff3 commit 95c7caf
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 30 deletions.
12 changes: 11 additions & 1 deletion src/riemann/transport.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
here since netty is the preferred method of providing transports"
(:use [slingshot.slingshot :only [try+]]
[riemann.common :only [decode-msg]]
[riemann.codec :only [encode-pb-msg]]
[riemann.index :only [search]]
clojure.tools.logging)
(:require [riemann.query :as query])
(:import
(com.aphyr.riemann Proto$Msg)
(org.jboss.netty.channel ChannelPipelineFactory ChannelPipeline)
(org.jboss.netty.buffer ChannelBufferInputStream)
(org.jboss.netty.handler.codec.oneone OneToOneDecoder)
(org.jboss.netty.handler.codec.oneone OneToOneDecoder
OneToOneEncoder)
(org.jboss.netty.handler.codec.protobuf ProtobufDecoder
ProtobufEncoder)
(org.jboss.netty.handler.execution ExecutionHandler
Expand Down Expand Up @@ -59,6 +61,14 @@
(decode [context channel message]
(decode-msg message))))

(defn msg-encoder
"Netty encoder for maps -> Msg protobuf objects"
[]
(proxy [OneToOneEncoder] []
(encode [context channel message]
(encode-pb-msg message))))


(defn handle
"Handles a msg with the given core."
[core msg]
Expand Down
20 changes: 12 additions & 8 deletions src/riemann/transport/graphite.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@
(when-let [[service metric timestamp] (split line #" ")]
(when (not= metric "nan") ;; discard nan values
(try
{:ok true
:states []
:events [(let [res {:service service
:metric (Float. metric)
:time (Long. timestamp)}]
(if parser-fn (merge res (parser-fn res)) res))]}
(let [res {:service service
:metric (Float. metric)
:time (Long. timestamp)}]
(if parser-fn (merge res (parser-fn res)) res))
(catch Exception e {:ok :true :service "exception"})))))

(defn graphite-frame-decoder
Expand All @@ -44,6 +42,12 @@
(decode [context channel message]
(decode-graphite-line message parser-fn)))))

(defn graphite-handler
"Given a core and a MessageEvent, applies the message to core."
[core e]
(doseq [stream (:streams core)]
(stream (.getMsg e))))

(defn graphite-server
"Start a graphite-server, some bits could be factored with tcp-server.
Only the default option map and the bootstrap change."
Expand All @@ -63,6 +67,6 @@
(:parser-fn opts)))))]
(tcp-server (merge {:host "127.0.0.1"
:port 2003
:write-back false
:pipeline-factory pipeline-factory}
:pipeline-factory pipeline-factory
:handler graphite-handler}
opts)))))
43 changes: 22 additions & 21 deletions src/riemann/transport/tcp.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
protobuf-decoder
protobuf-encoder
msg-decoder
msg-encoder
channel-pipeline-factory]]
[riemann.codec :only [encode-pb-msg]]
[riemann.service :only [Service]]
[clojure.tools.logging :only [info warn]]
[riemann.transport :only [handle]]))
Expand All @@ -40,34 +40,35 @@
[]
(LengthFieldPrepender. 4))

(defn tcp-handler
"Returns a TCP handler around the given atom pointing to a core"
[core ^ChannelGroup channel-group encode-fn]
(defn gen-tcp-handler
"Wraps netty boilerplate for common TCP server handlers. Given a reference to
a core, a channel group, and a handler fn, returns a SimpleChannelHandler
which calls (handler core message-event) with each received message."
[core ^ChannelGroup channel-group handler]
(proxy [SimpleChannelHandler] []
(channelOpen [context ^ChannelStateEvent state-event]
(.add channel-group (.getChannel state-event)))

(messageReceived [^ChannelHandlerContext context
^MessageEvent message-event]
(let [channel (.getChannel message-event)
msg (.getMessage message-event)]
(try
(let [out (handle @core msg)]
(when encode-fn
(.write channel (encode-fn out))))
(handler @core message-event)
(catch java.nio.channels.ClosedChannelException e
(warn "channel closed"))
(catch com.google.protobuf.InvalidProtocolBufferException e
(warn "invalid message, closing")
(.close channel)))))
(warn "channel closed"))))

(exceptionCaught [context ^ExceptionEvent exception-event]
(let [cause (.getCause exception-event)]
(when-not (= ClosedChannelException (class cause))
(when-not (instance? ClosedChannelException cause)
(warn (.getCause exception-event) "TCP handler caught")
(.close (.getChannel exception-event)))))))

(defrecord TCPServer [host port pipeline-factory core killer write-back encode-fn]
(defn tcp-handler
"Given a core and a MessageEvent, applies the message to core."
[core ^MessageEvent e]
(.write (.getChannel e)
(handle core (.getMessage e))))

(defrecord TCPServer [host port pipeline-factory handler core killer]
; core is a reference to a core
; killer is a reference to a function which shuts down the server.

Expand All @@ -92,8 +93,7 @@
(str "tcp-server " host ":" port))
cpf (channel-pipeline-factory
pipeline-factory
(tcp-handler core all-channels
(if write-back encode-fn)))]
(gen-tcp-handler core all-channels handler))]

; Configure bootstrap
(doto bootstrap
Expand Down Expand Up @@ -142,12 +142,13 @@
(.addLast "protobuf-encoder"
(protobuf-encoder))
(.addLast "msg-decoder"
(msg-decoder)))]
(msg-decoder))
(.addLast "msg-encoder"
(msg-encoder)))]
(TCPServer.
(get opts :host "127.0.0.1")
(get opts :port 5555)
(get opts :pipeline-factory pipeline-factory)
(get opts :handler tcp-handler)
(atom nil)
(atom nil)
(get opts :write-back true)
(get opts :encode-fn encode-pb-msg)))))
(atom nil)))))

0 comments on commit 95c7caf

Please sign in to comment.