Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a graphite listener. #23

Merged
merged 3 commits into from Oct 23, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/riemann/common.clj
Expand Up @@ -7,6 +7,7 @@
(:require clojure.set)
(:require [clj-json.core :as json])
(:require [clojure.java.io :as io])
(:use [clojure.string :only [split]])
(:use clojure.tools.logging)
(:use protobuf.core)
(:use gloss.core)
Expand Down Expand Up @@ -145,6 +146,30 @@
:query (:query msg)
:events (map post-load-event (:events msg))}))

(defn decode-graphite-line
"Decode a line coming from graphite.
Graphite uses a simple scheme where each metric is given as a CRLF delimited
line, space split with three items:

* The metric name
* The metric value (optionally NaN)
* The timestamp

By default, decode-graphite-line will yield a simple metric with just
a service metric and timestamp, a parser-fn can be given to it, which
will yield a map to merge onto the result. This can be used when
graphite metrics have known patterns that you wish to extract more
information (host, refined service name, tags) from"
[line parser-fn]
(when-let [[service metric timestamp] (split line #" ")]
(when (not= metric "nan") ;; discard nan values
{:ok true
:states []
:events [(let [res {:service service
:metric (Float. metric)
:time (Long. timestamp)}]
(if parser-fn (merge res (parser-fn res)) res))]})))

(defn ^"[B" encode
"Builds and dumps a protobuf message from a hash. Applies pre-dump-event to
events."
Expand Down
11 changes: 10 additions & 1 deletion src/riemann/config.clj
Expand Up @@ -11,20 +11,29 @@
(:use clojure.tools.logging)
(:use riemann.client)
(:require [riemann.pubsub :as pubsub])
(:require [riemann.graphite :as graphite])
(:use [riemann.streams :exclude [update-index delete-from-index]])
(:use riemann.email)
(:use riemann.graphite)
(:gen-class))

(def ^{:doc "A default core."} core (core/core))

(def graphite #'graphite/graphite)

(defn tcp-server
"Add a new TCP server with opts to the default core."
[& opts]
(dosync
(alter (core :servers) conj
(riemann.server/tcp-server core (apply hash-map opts)))))

(defn graphite-server
"Add a new Graphite TCP server with opts to the default core."
[& opts]
(dosync
(alter (core :servers) conj
(riemann.server/graphite-server core (apply hash-map opts)))))

(defn udp-server
"Add a new UDP server with opts to the default core."
[& opts]
Expand Down
129 changes: 127 additions & 2 deletions src/riemann/graphite.clj
Expand Up @@ -4,10 +4,44 @@
(:import [java.net Socket])
(:import [java.io Writer])
(:import [java.io OutputStreamWriter])
(:import [java.util.concurrent ArrayBlockingQueue])
(:import [java.util.concurrent ArrayBlockingQueue]
(java.net InetSocketAddress)
(java.util.concurrent Executors)
(org.jboss.netty.util CharsetUtil)
(org.jboss.netty.bootstrap ConnectionlessBootstrap
ServerBootstrap)
(org.jboss.netty.buffer ChannelBufferInputStream
ChannelBuffers)
(org.jboss.netty.channel ChannelHandler
ChannelHandlerContext
ChannelPipeline
ChannelPipelineFactory
ChannelStateEvent
Channels
ExceptionEvent
FixedReceiveBufferSizePredictorFactory
MessageEvent
SimpleChannelHandler
SimpleChannelUpstreamHandler)
(org.jboss.netty.channel.group ChannelGroup
DefaultChannelGroup)
(org.jboss.netty.channel.socket DatagramChannelFactory)
(org.jboss.netty.channel.socket.nio NioDatagramChannelFactory
NioServerSocketChannelFactory)
(org.jboss.netty.handler.codec.string StringDecoder StringEncoder)
(org.jboss.netty.handler.codec.frame LengthFieldBasedFrameDecoder
LengthFieldPrepender
DelimiterBasedFrameDecoder
Delimiters)
(org.jboss.netty.handler.codec.oneone OneToOneDecoder)
(org.jboss.netty.handler.execution
ExecutionHandler
OrderedMemoryAwareThreadPoolExecutor
MemoryAwareThreadPoolExecutor))
(:use [clojure.string :only [split join replace]])
(:use clojure.tools.logging)
(:use riemann.common))
(:use riemann.common)
(:require [riemann.server :as server]))

(defn graphite-path-basic
"Constructs a path for an event. Takes the hostname fqdn, reversed,
Expand Down Expand Up @@ -79,3 +113,94 @@
; Reconnect in 5 seconds
(Thread/sleep 5000)
(add-socket)))))))))

(defn graphite-frame-decoder
"A closure which yields a graphite frame-decoder. Taking an argument
which will be given to decode-graphite-line (hence the closure)"
[parser-fn]
(fn []
(proxy [OneToOneDecoder] []
(decode [context channel message]
(decode-graphite-line message parser-fn)))))

(defn graphite-handler
"Returns a Graphite handler for the given core"
[core ^ChannelGroup channel-group]
(proxy [SimpleChannelHandler] []
(channelOpen [context ^ChannelStateEvent state-event]
(.add channel-group (.getChannel state-event)))

(messageReceived [^ChannelHandlerContext context
^MessageEvent message-event]
(let [channel (.getChannel message-event)
msg (.getMessage message-event)]
(try
(let [response (server/handle core msg)
encoded (encode response)]
(.write channel (ChannelBuffers/wrappedBuffer encoded)))
(catch java.nio.channels.ClosedChannelException e
(warn "channel closed")))))
(exceptionCaught [context ^ExceptionEvent exception-event]
(warn (.getCause exception-event) "Graphite handler caught")
(.close (.getChannel exception-event)))))

(defn graphite-cpf
"Graphite Channel Pipeline Factory"
[core channel-group message-decoder]
(warn "graphite-cpf")
(proxy [ChannelPipelineFactory] []
(getPipeline []
(let [decoder (StringDecoder. CharsetUtil/UTF_8)
encoder (StringEncoder. CharsetUtil/UTF_8)
executor (ExecutionHandler.
(OrderedMemoryAwareThreadPoolExecutor.
16 1048576 1048576)) ;; Magic is the best!
handler (graphite-handler core channel-group)]
(doto (Channels/pipeline)
(.addLast "framer" (DelimiterBasedFrameDecoder.
1024 ;; Will the magic ever stop ?
(Delimiters/lineDelimiter)))
(.addLast "string-decoder" decoder)
(.addLast "string-encoder" encoder)
(.addLast "message-decoder" (message-decoder))
(.addLast "executor" executor)
(.addLast "handler" handler))))))

(defn graphite-server
"Start a graphite-server, some bits could be factored with tcp-server.
Only the default option map and the bootstrap change."
([core] (graphite-server core {}))
([core opts]
(let [opts (merge {:host "localhost"
:port 2003
:message-decoder graphite-frame-decoder}
opts)
bootstrap (ServerBootstrap.
(NioServerSocketChannelFactory.
(Executors/newCachedThreadPool)
(Executors/newCachedThreadPool)))
all-channels (DefaultChannelGroup. (str "graphite-server " opts))
cpf (graphite-cpf core all-channels
((:message-decoder opts) (:parser-fn opts)))]

; Configure bootstrap
(doto bootstrap
(.setPipelineFactory cpf)
(.setOption "readWriteFair" true)
(.setOption "tcpNoDelay" true)
(.setOption "reuseAddress" true)
(.setOption "child.tcpNoDelay" true)
(.setOption "child.reuseAddress" true)
(.setOption "child.keepAlive" true))

; Start bootstrap
(let [server-channel (.bind bootstrap
(InetSocketAddress. ^String (:host opts)
^Integer (:port opts)))]
(.add all-channels server-channel))
(info "Graphite server" opts " online")

; fn to close server
(fn []
(-> all-channels .close .awaitUninterruptibly)
(.releaseExternalResources bootstrap)))))
8 changes: 6 additions & 2 deletions src/riemann/server.clj
Expand Up @@ -3,6 +3,7 @@
incoming events to the core's streams, queries the core's index for states."
(:import (java.net InetSocketAddress)
(java.util.concurrent Executors)
(org.jboss.netty.util CharsetUtil)
(org.jboss.netty.bootstrap ConnectionlessBootstrap
ServerBootstrap)
(org.jboss.netty.buffer ChannelBufferInputStream
Expand All @@ -23,8 +24,11 @@
(org.jboss.netty.channel.socket DatagramChannelFactory)
(org.jboss.netty.channel.socket.nio NioDatagramChannelFactory
NioServerSocketChannelFactory)
(org.jboss.netty.handler.codec.string StringDecoder StringEncoder)
(org.jboss.netty.handler.codec.frame LengthFieldBasedFrameDecoder
LengthFieldPrepender)
LengthFieldPrepender
DelimiterBasedFrameDecoder
Delimiters)
(org.jboss.netty.handler.codec.oneone OneToOneDecoder)
(org.jboss.netty.handler.execution
ExecutionHandler
Expand Down Expand Up @@ -111,7 +115,6 @@
(warn (.getCause exception-event) "TCP handler caught")
(.close (.getChannel exception-event)))))


(defn udp-handler
"Returns a UDP handler for the given core."
[core ^ChannelGroup channel-group]
Expand Down Expand Up @@ -156,6 +159,7 @@
(.addLast "executor" executor)
(.addLast "handler" handler))))))


(defn tcp-server
"Create a new TCP server for a core. Starts immediately. Options:
:host The host to listen on (default localhost).
Expand Down