Skip to content

Commit

Permalink
Basic websockets pubsub server.
Browse files Browse the repository at this point in the history
You can set up a websocket server with

(ws-server)

or

(ws-server :host x :port 5556)

The server answers websocket connections to /pubsub/<topic>. Any query
can be applied as a filter:

/pubsub/topic/query=true /pubsub/topic/query=(service =~ "%cat" and
metric < 5.4) (query must be url-encoded, obviously)

The resulting websocket will receive a stream of UTF8 frames, each
containing a JSON representation of an event from that topic which
matched the query.

Initial testing shows excellent latency (no perceivable lag) but appears
to drop events in volume. Need to investigate queue depths. Be aware
that the correctness and safety of the query system are not guaranteed;
just like the protobufs server, assume this system runs *arbitrary* code
from the network.
  • Loading branch information
aphyr committed Jun 10, 2012
1 parent 18e6105 commit 01764b2
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 2 deletions.
1 change: 1 addition & 0 deletions riemann.config
Expand Up @@ -4,6 +4,7 @@

(tcp-server)
(udp-server)
(ws-server)

(periodically-expire 10)

Expand Down
6 changes: 6 additions & 0 deletions src/riemann/common.clj
Expand Up @@ -5,6 +5,7 @@
(:import [java.util Date])
(:require gloss.io)
(:require clojure.set)
(:require [clj-json.core :as json])
(:require [clojure.java.io :as io])
(:use clojure.tools.logging)
(:use protobuf.core)
Expand Down Expand Up @@ -108,6 +109,11 @@
:query (:query msg)
:events (map post-load-event (:events msg))}))

(defn event-to-json
"Convert an event to a JSON string."
[event]
(json/generate-string event))

(defn decode-inputstream
"Decode an InputStream to a message. Decodes the protobuf representation of
Msg and applies post-load-event to all events."
Expand Down
7 changes: 7 additions & 0 deletions src/riemann/config.clj
Expand Up @@ -32,6 +32,13 @@
(alter (core :servers) conj
(riemann.server/udp-server core (apply hash-map opts)))))

(defn ws-server
"Add a new websockets server with opts to the default core."
[& opts]
(dosync
(alter (core :servers) conj
(riemann.server/ws-server core (apply hash-map opts)))))

(defn streams
"Add any number of streams to the default core."
[& things]
Expand Down
64 changes: 62 additions & 2 deletions src/riemann/server.clj
Expand Up @@ -30,12 +30,17 @@
OrderedMemoryAwareThreadPoolExecutor))
(:require [riemann.query :as query])
(:require [riemann.index :as index])
(:use [riemann.core])
(:use [riemann.common])
(:use riemann.core)
(:use riemann.common)
(:use riemann.pubsub)
(:use clojure.tools.logging)
(:use [protobuf.core])
(:use [slingshot.slingshot :only [try+]])
(:use clojure.stacktrace)
(:use lamina.core)
(:use aleph.http)
(:use [clj-http.util :only [url-decode]])
(:use [clojure.string :only [split]])
(:require gloss.io))

(defn handle
Expand Down Expand Up @@ -224,3 +229,58 @@
(fn []
(-> all-channels .close .awaitUninterruptibly)
(.releaseExternalResources bootstrap)))))

(defn http-query-map
"Converts a URL query string into a map."
[string]
(apply hash-map
(map url-decode
(mapcat (fn [kv] (split kv #"=" 2))
(split string #"&")))))

;;; Websockets
(defn ws-pubsub-handler [core ch hs]
(let [topic (url-decode (last (split (:uri hs) #"/" 3)))
params (http-query-map (:query-string hs))
query (params "query")
pred (query/fun (query/ast query))
sub (subscribe (:pubsub core) topic
(fn [event]
(when (pred event)
(enqueue ch (event-to-json event)))))]
(info "New websocket subscription to" topic ":" query)
(receive-all ch (fn [msg]
(when-not msg
; Shut down channel
(info "Closing websocket "
(:remote-addr hs) topic query)
(close ch)
(unsubscribe (:pubsub core) sub))))))

(defn ws-handler [core]
(fn [ch handshake]
(info "Websocket connection from" (:remote-addr handshake)
(:uri handshake)
(:query-string handshake))
(condp re-matches (:uri handshake)
#"^/pubsub/[^/]+$" (ws-pubsub-handler core ch handshake)
:else (do
(info "Unknown URI " (:uri handshake) ", closing")
(close ch)))))

(defn ws-server
"Starts a new websocket server for a core. Starts immediately.
Options:
:host The address to listen on (default localhost)
:post The port to listen on (default 5556)"
([core] (udp-server core {}))
([core opts]
(let [opts (merge {:host "localhost"
:port 5556}
opts)
s (start-http-server (ws-handler core) {:host (:host opts)
:port (:port opts)
:websocket true})]
(info "Websockets server" opts "online")
s)))

0 comments on commit 01764b2

Please sign in to comment.