Browse files

Add support for nREPL over HornetQ

  • Loading branch information...
1 parent af59364 commit 4df592f600c7213231ca56d6b014918f4cbc2ad3 @hugoduncan hugoduncan committed Oct 1, 2012
View
1 .gitignore
@@ -6,3 +6,4 @@ classes
marmalade
target
.lein-*
+data
View
4 README.md
@@ -15,6 +15,10 @@ environments and for debuggers.
<td>nREPL server.</td>
</tr>
<tr>
+ <td><a href="https://github.com/pallet/ritz/tree/develop/nrepl">ritz-nrepl-hornetq</a></td>
+ <td>nREPL server and client for running over HornetQ</td>
+ </tr>
+ <tr>
<td><a href="https://github.com/pallet/ritz/tree/develop/debugger">ritz-debugger</a></td>
<td>A library for using the JVM JPDA debugger in clojure.</td>
</tr>
View
124 lein-ritz/src/leiningen/ritz_hornetq.clj
@@ -0,0 +1,124 @@
+(ns leiningen.ritz-hornetq
+ "Start a nrepl session over HornetQ."
+ (:require
+ [clojure.string :as string]
+ [clojure.java.io :as io]
+ [leiningen.core.eval :as eval]
+ [leiningen.core.project :as project]
+ [clojure.tools.nrepl.server :as nrepl.server])
+ (:use
+ [clojure.tools.cli :only [cli]]
+ [leiningen.core.main :only [debug]]))
+
+(def nrepl-profile {:dependencies '[[org.clojure/tools.nrepl "0.2.0-beta9"
+ :exclusions [org.clojure/clojure]]]})
+
+(def ritz-profile {:dependencies '[[ritz/ritz-nrepl-hornetq "0.5.1-SNAPSHOT"
+ :exclusions [org.clojure/clojure]]]})
+
+(defn- start-nrepl-server
+ "Start the nrepl server."
+ [{{:keys [nrepl-middleware]} :repl-options :as project}
+ hornetq-server-form hornetq-opts
+ {:keys [log-level] :as opts}]
+ {:pre [project]}
+ (debug "start-nrepl-server hornetq-opts" hornetq-opts)
+ (let [project (project/merge-profiles project [ritz-profile])
+ form
+ `(do
+ ~hornetq-server-form
+ (ritz.nrepl-hornetq/start-server
+ ~(merge hornetq-opts opts))
+ @(promise))]
+ (eval/eval-in-project project form '(do (require 'ritz.nrepl-hornetq)))))
+
+(defn hornetq-server-form
+ [hornetq-server stomp]
+ (let [opts {:netty hornetq-server :stomp stomp :in-vm true}]
+ `(do
+ (require '~'hornetq-clj.server)
+ (let [server# (@(ns-resolve
+ '~'hornetq-clj.server
+ '~'server)
+ '~opts)]
+ (.start server#)
+ (doto
+ (Thread.
+ #(loop []
+ (when (.isStarted server#)
+ (Thread/sleep 10000)
+ (recur))))
+ (.start))))))
+
+(defn hornetq-netty-opts
+ [port host user password]
+ {:transport :netty
+ :host (or host "localhost")
+ :port port
+ :user user
+ :password password})
+
+(defn hornetq-in-vm-opts
+ []
+ {:transport :in-vm})
+
+
+(defn- hornetq-user [project]
+ (or (System/getenv "LEIN_HORNETQ_USER")
+ (-> project :repl-options :hornetq :user)
+ ""))
+
+(defn- hornetq-password [project]
+ (or (System/getenv "LEIN_HORNETQ_PASSWORD")
+ (-> project :repl-options :hornetq :password)
+ ""))
+
+(defn- hornetq-port [project]
+ (Integer. ^String (or (System/getenv "LEIN_HORNETQ_PORT")
+ (-> project :repl-options :hornetq :port)
+ "5445")))
+
+(defn- hornetq-host [project]
+ (or (System/getenv "LEIN_HORNETQ_HOST")
+ (-> project :repl-options :hornetq :host)
+ "localhost"))
+
+(defn integer-or-bool [^String arg]
+ (println "integer-or-bool" arg)
+ (try
+ (Integer. arg)
+ (catch Exception _
+ (try
+ (boolean (Boolean. arg))
+ (catch Exception _
+ arg)))))
+
+(defn ^:no-project-needed ritz-hornetq
+ "Start a ritz repl session over HornetQ."
+ [project & args]
+ (debug "ritz-hornetq")
+ (let [[{:keys [log-level hornetq-server stomp user password] :as opts}
+ [port host]]
+ (cli args
+ ["-l" "--log-level" "Set the log level" :default nil]
+ ["-u" "--user" "HornetQ user name" :default ""]
+ ["-p" "--password" "HornetQ password" :default ""]
+ ["-h" "--hornetq-server" "Run a HornetQ server (can specify port)"
+ :default nil :parse-fn integer-or-bool]
+ ["-s" "--stomp"
+ "Enable STOMP in the HornetQ server (can specify port)"
+ :default nil :parse-fn integer-or-bool])
+ opts (update-in opts [:log-level] #(when % (keyword %)))
+ run-hornetq? (or hornetq-server stomp)
+ port (or port (hornetq-port project))
+ host (or host (hornetq-host project))
+ user (or user (hornetq-user project))
+ password (or password (hornetq-password project))]
+ (debug "ritz-hornetq opts" opts)
+ (start-nrepl-server
+ project
+ (when run-hornetq? (hornetq-server-form hornetq-server stomp))
+ (if run-hornetq?
+ (hornetq-in-vm-opts)
+ (hornetq-netty-opts port host user password))
+ opts)))
View
10 nrepl-hornetq/.gitignore
@@ -0,0 +1,10 @@
+/target
+/lib
+/classes
+/checkouts
+pom.xml
+*.jar
+*.class
+.lein-deps-sum
+.lein-failures
+.lein-plugins
View
47 nrepl-hornetq/README.md
@@ -0,0 +1,47 @@
+# ritz-nrepl-hornetq
+
+An nREPL server implemented with HornetQ
+
+## Usage
+
+### Server
+
+To start an nREPL server over HornetQ:
+
+ lein ritz-hornetq
+
+By default this connects to a HornetQ server on `localhost`, using the standard
+5445 port. You can specify port and host as arguments.
+
+ lein ritz-hornetq --user "me" --password "letmein" 55445 somehost
+
+You can also ask for an embedded HornetQ server to be started.
+
+ lein ritz-hornetq --hornetq-server 5445
+
+### Client
+
+Add `ritz-nrepl-hornetq` to your `project.clj` or `:user` profile plugins:
+
+ [ritz/ritz-nrepl-hornetq "0.5.1"]
+
+You can then use lein to start a repl against a running HornetQ server.
+
+ lein repl :connect hornetq://localhost:5445
+
+### Embedding
+
+Add `ritz/ritz-nrepl-hornet` to your projects dependencies. You start the server
+
+```clj
+(ns my-app
+ (:use [ritz.nrepl-hornetq :only [start-server]]))
+
+(start-server {:transport :netty :host "somehost" :port 5445})
+```
+
+## License
+
+Copyright © 2012 Hugo Duncan
+
+Distributed under the Eclipse Public License.
View
12 nrepl-hornetq/dev-resources/logback.xml
@@ -0,0 +1,12 @@
+<configuration scan="true" scanPeriod="5 seconds" debug="false">
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="ERROR">
+ <appender-ref ref="CONSOLE" />
+ </root>
+
+</configuration>
View
14 nrepl-hornetq/project.clj
@@ -0,0 +1,14 @@
+(defproject ritz/ritz-nrepl-hornetq "0.5.1-SNAPSHOT"
+ :description "nREPL transport for HornetQ"
+ :url "http://github.com/ritz"
+ :license {:name "Eclipse Public License"
+ :url "http://www.eclipse.org/legal/epl-v10.html"}
+ :dependencies [[org.clojure/clojure "1.4.0"]
+ [org.clojure/tools.nrepl "0.2.0-beta9"]
+ [cheshire "3.0.0"]
+ [hornetq-clj/client "0.2.0-SNAPSHOT"]
+ [ritz/ritz-repl-utils "0.5.1-SNAPSHOT"]]
+ :profiles {:dev {:dependencies [[hornetq-clj/server "0.2.0-SNAPSHOT"]
+ [clojure-complete "0.2.2"]
+ [org.slf4j/jul-to-slf4j "1.6.4"]
+ [ch.qos.logback/logback-classic "1.0.0"]]}})
View
79 nrepl-hornetq/src/ritz/nrepl_hornetq.clj
@@ -0,0 +1,79 @@
+(ns ritz.nrepl-hornetq
+ (:use
+ [clojure.tools.nrepl.server :only [handle* default-handler]]
+ [clojure.tools.nrepl.transport :only [recv]]
+ [ritz.logging :only [trace]]
+ [ritz.nrepl-hornetq.transport :only [make-transport ]]))
+
+
+;;; Need an embedded server, with no classpath magic, as well as a server with a
+;;; configurable classpath.
+
+(defn handle-messages
+ [{:keys [continue handler transport] :as server}]
+ (when @continue
+ (when-let [msg (recv transport)]
+ (trace "handle %s" msg)
+ (handle* msg handler transport)
+ (recur server))))
+
+(defn handle
+ "Handles requests received via `transport` using `handler`.
+ Returns nil when `recv` returns nil for the given transport."
+ [{:keys [continue handler transport] :as server}]
+ (trace "server handle")
+ (with-open [transport transport]
+ (handle-messages server))
+ (trace "server handle done"))
+
+(defn stop-server
+ "Stops a server started via `start-server`."
+ [server]
+ (trace "stop-server %s" server)
+ (send-off server (fn [server] (reset! (:continue server) nil) server)))
+
+(defn start-server
+ "Starts a HornetQ nREPL server. Configuration options include:
+
+Returns a handle to the server that is started, which may be stopped either via
+`stop-server`, (.close server), or automatically via `with-open`.
+
+consumer-queue
+: the name of the queue the nrepl-server should read from.
+
+service-queue
+: the name of the queue the nrepl-server should write to.
+
+queue-options
+: queue options for creating the queues.
+
+host
+: host name or ip of the HornetQ server
+
+port
+: port of the HornetQ server
+
+user
+: user login for the HornetQ server
+
+password
+: password for the HornetQ server
+
+transport
+: which HornetQ transport to use (:in-vm or :netty)
+
+session-options
+: options to pass to the HornetQ session
+
+"
+ [{:keys [consumer-queue producer-queue queue-options handler] :as options}]
+ (let [options (merge {:producer-queue "/nrepl/client"
+ :consumer-queue "/nrepl/server"}
+ options)
+ smap {:continue (atom true)
+ :transport (make-transport options)
+ :handler (or handler (default-handler))}
+ server (proxy [clojure.lang.Agent java.io.Closeable] [smap]
+ (close [] (stop-server this)))]
+ (send-off server handle)
+ server))
View
118 nrepl-hornetq/src/ritz/nrepl_hornetq/client.clj
@@ -0,0 +1,118 @@
+(ns ritz.nrepl-hornetq.client
+ "A client for a nrepl-hornetq server."
+ (:refer-clojure :exclude [send])
+ (:use
+ [clojure.stacktrace :only [print-cause-trace]]
+ [clojure.string :only [split]]
+ [clojure.tools.nrepl.transport :only [send recv]]
+ [clojure.tools.nrepl :only [url-connect] :as nrepl]
+ [ritz.nrepl-hornetq.transport :only [make-transport]]
+ [ritz.logging :only [trace]])
+ (:require
+ [clojure.tools.nrepl.middleware.interruptible-eval :as interruptible-eval]))
+
+
+(defn next-id [{:keys [id] :as client}]
+ (swap! id inc))
+
+(defn set-handler [{:keys [handlers] :as client} id response-handler]
+ (swap! handlers assoc id response-handler))
+
+(defn handler [{:keys [handlers] :as client} id]
+ (get @handlers id))
+
+(defn remove-handler [{:keys [handlers] :as client} id]
+ (swap! handlers dissoc id))
+
+(defn handle-message
+ [executor handler msg]
+ (trace "client/handle-message %s" msg)
+ (.execute executor #(handler msg)))
+
+(defn handle-messages
+ [{:keys [continue handlers transport executor] :as client}]
+ {:pre [transport]}
+ (trace "client/handle-messages")
+ (when @continue
+ (when-let [{:keys [id status] :as msg} (recv transport)]
+ (trace "client/handle-messages read a message")
+ (handle-message executor (handler client id) msg)
+ (when (#{:done :error :interrupted} status)
+ (trace "client/handle-messages remove handler %s" id)
+ (remove-handler client id))
+ (recur client))))
+
+(defn handle
+ "Handles replies received via `transport`."
+ [{:keys [transport] :as client}]
+ (try
+ (trace "client/handle")
+ (do ;; with-open [_ transport]
+ (handle-messages client)
+ (trace "client/handle done"))
+ (catch Exception e
+ (trace "client/handle exception %s" (with-out-str (print-cause-trace e)))
+ (throw e))))
+
+(defn send-message
+ [client msg response-handler]
+ (let [{:keys [id transport] :as client} @client
+ id (next-id client)]
+ (set-handler client id response-handler)
+ (send transport (assoc msg :id id))))
+
+(defn stop-client
+ "Stops a client started via `start-client`."
+ [{:keys [transport] :as client}]
+ (trace "stop-client %s" client)
+ (send-off client (fn [client] (reset! (:continue client) nil) client)))
+
+(defn start-client
+ [{:keys [executor in-vm] :as options}]
+ (let [options (merge {:producer-queue "/nrepl/server"
+ :consumer-queue "/nrepl/client"}
+ options)
+ smap {:continue (atom true)
+ :transport (make-transport options)
+ :id (atom 0)
+ :handlers (atom {})
+ :executor (or executor (#'interruptible-eval/configure-executor))}
+ client (proxy [clojure.lang.Agent java.io.Closeable] [smap]
+ (close [] (stop-client this)))]
+ (send-off client handle)
+ client))
+
+(def hornetq-defaults
+ {:host "localhost"
+ :port 5445
+ :transport :netty
+ :consumer-queue "/nrepl/client"
+ :producer-queue "/nrepl/server"})
+
+(defn query-params
+ "Return a map of query parameters (last value wins if key specified multiple
+ times."
+ [^java.net.URI uri]
+ (into {}
+ (map
+ #(vector (keyword (key %)) (last (val %)))
+ (.getParameters
+ (org.jboss.netty.handler.codec.http.QueryStringDecoder. uri)))))
+
+(defn uri-options
+ "Convert a hornetq uri to an options map for make-transport"
+ [uri]
+ (let [uri (#'nrepl/to-uri uri)
+ port (.getPort uri)
+ [user password] (when-let [user-info (.getUserInfo uri)]
+ (split user-info #"/"))]
+ (merge hornetq-defaults
+ (query-params uri)
+ (when (pos? port) {:port port})
+ (when user {:user user})
+ (when password {:password password})
+ {:host (.getHost uri)})))
+
+(defmethod url-connect "hornetq"
+ [uri]
+ (make-transport (uri-options uri)))
View
171 nrepl-hornetq/src/ritz/nrepl_hornetq/transport.clj
@@ -0,0 +1,171 @@
+(ns ritz.nrepl-hornetq.transport
+ "An nREPL transport using a HornetQ client."
+ (:require
+ clojure.tools.nrepl.transport)
+ (:use
+ [cheshire.core :as json]
+ [clojure.stacktrace :only [print-cause-trace]]
+ [hornetq-clj.core-client
+ :only [create-consumer create-producer create-queue
+ create-message query-queue read-message-string write-message-string
+ send-message session-factory session
+ create-session-factory]
+ :rename {session make-session send-message send-hornetq-message}]
+ [ritz.logging :only [trace]]))
+
+;;; # Session
+(defn hornetq-session
+ "Returns a function of no arguments that returns a session for the HornetQ
+server.
+
+Specify user, password, host, port, in-vm for the HornetQ server.
+
+Use session-options to specify:
+ xa, auto-commit-sends, auto-commit-acks, pre-acknowledge, and ack-batch-size."
+ [{:keys [user password host port transport locator-type session-options]
+ :or {user "" password "" locator-type :static}
+ :as options}]
+ (trace "hornetq-session %s" (:transport options))
+ (let [session-factory (create-session-factory
+ locator-type
+ (merge
+ {:block-on-durable-send true
+ :block-on-non-durable-send true}
+ options)
+ options)]
+ (trace "hornetq-session factory %s" session-factory)
+ #(make-session session-factory user password session-options)))
+
+(defn session-map
+ [s {:keys [consumer-queue producer-queue] :as options} session-fn]
+ (let [session (or (:session s) (doto (session-fn) (.start)))]
+ (-> s
+ (assoc :session session)
+ (update-in [:consumer]
+ #(or
+ %
+ (try
+ (trace "create-consumer %s" consumer-queue)
+ (create-consumer session consumer-queue options)
+ (catch Exception e
+ (trace "create consumer %s" e)))))
+ (update-in [:producer]
+ #(or
+ %
+ (try
+ (trace "create-producer %s" producer-queue)
+ (create-producer session producer-queue)
+ (catch Exception e
+ (trace "create producer %s" e))))))))
+
+(defn ensure-session
+ [{:keys [consumer-queue producer-queue] :as options} session-fn session-atom]
+ {:pre [consumer-queue producer-queue]}
+ (trace "ensure-session")
+ (let [s @session-atom]
+ (or (and (every? s [:session :consumer :producer]) s)
+ (swap! session-atom session-map options session-fn))))
+
+(defn session
+ [connection]
+ (:session connection))
+
+(defn consumer
+ [connection]
+ (:consumer connection))
+
+(defn producer
+ [connection]
+ (:producer connection))
+
+
+;;; # Queues
+(defn hornetq-make-queue
+ "Make a queue on the hornet-server"
+ [connection queue-name queue-options]
+ (trace "hornetq-make-queue %s" queue-name)
+ (create-queue (session connection) queue-name queue-options))
+
+(defn hornetq-queue-info
+ "Make a queue on the hornet-server"
+ [connection queue-name]
+ (trace "hornetq-queue-info %s" queue-name)
+ (query-queue (session connection) queue-name))
+
+(defn hornetq-ensure-queue
+ "Make a queue on the hornet-server if it doesn't already exist."
+ [connection queue-name queue-options]
+ (trace "hornetq-ensure-queue %s" queue-name)
+ (when-not (.isExists (hornetq-queue-info connection queue-name))
+ (hornetq-make-queue connection queue-name queue-options)))
+
+
+;;; # Send and receive
+(defn process-received-message
+ "Take a hornetq message, and return the nREPL message from it."
+ [message]
+ (trace "process-received-message")
+ (when message
+ (.acknowledge message)
+ (let [s (read-message-string message)
+ _ (trace "process-received-message s %s" s)
+ msg (json/parse-string s true)]
+ (trace "process-received-message msg %s" msg)
+ msg)))
+
+(defn send-message
+ "Send a message from the client to the nREPL server."
+ [{:keys [durable producer-queue] :as options}
+ {:keys [session producer] :as session}
+ msg]
+ {:pre [session producer]}
+ (trace "send-message %s to %s as %s"
+ msg producer-queue (json/generate-string msg))
+ (let [message (create-message session durable)]
+ (write-message-string message (json/generate-string msg))
+ (send-hornetq-message producer message producer-queue)))
+
+;;; # Types
+(defprotocol QueueBuilder
+ (ensure-queue [_ queue-name options]
+ "Make a queue using the specified options"))
+
+;;; # Transport
+(defrecord HornetQTransport [options session-fn connection]
+ clojure.tools.nrepl.transport/Transport
+ (recv [this]
+ (trace "recv")
+ (let [c (consumer (ensure-session options session-fn connection))]
+ (trace "recv consumer %s" c)
+ (assert (not (.isClosed c)))
+ (process-received-message (.receive c))))
+ (recv [this timeout] (process-received-message
+ (.receive
+ (consumer
+ (ensure-session options session-fn connection))
+ timeout)))
+ (send [this msg] (send-message
+ options
+ (ensure-session options session-fn connection)
+ msg))
+ QueueBuilder
+ (ensure-queue [this queue-name queue-options]
+ (hornetq-ensure-queue
+ (ensure-session options session-fn connection)
+ queue-name
+ queue-options))
+ java.io.Closeable
+ (close [_] (when-let [session (:session @connection)]
+ (.close session))))
+
+
+(defn make-transport
+ "Return an nREPL transport using a HornetQ message broker."
+ [{:keys [consumer-queue producer-queue queue-options]
+ :as options}]
+ {:pre [consumer-queue producer-queue]}
+ (let [transport (HornetQTransport.
+ options (hornetq-session options) (atom {}))]
+ (ensure-queue transport consumer-queue queue-options)
+ (ensure-queue transport producer-queue queue-options)
+ transport))
View
4 nrepl-hornetq/src/ritz_nrepl_hornetq/plugin.clj
@@ -0,0 +1,4 @@
+(ns ritz-nrepl-hornetq.plugin
+ "Loads the client url-connect method."
+ (:require
+ ritz.nrepl-hornetq.client))
View
67 nrepl-hornetq/test/ritz/nrepl_hornetq_test.clj
@@ -0,0 +1,67 @@
+(ns ritz.nrepl-hornetq-test
+ (:refer-clojure :exclude [send])
+ (:use
+ [clojure.pprint :only [pprint]]
+ [clojure.tools.nrepl :only [url-connect]]
+ [clojure.tools.nrepl.transport :only [send recv]]
+ [hornetq-clj.server
+ :only [server locate-queue] :rename {server hornetq-server}]
+ [ritz.logging :only [trace set-level]]
+ ritz.nrepl-hornetq
+ [ritz.nrepl-hornetq.client :only [start-client stop-client send-message]]
+ clojure.test)
+ (:import
+ [java.util.logging LogManager Logger Level]
+ org.slf4j.bridge.SLF4JBridgeHandler))
+
+
+(defn install-slf4j-bridge
+ []
+ (.. (LogManager/getLogManager) reset)
+ (SLF4JBridgeHandler/install)
+ (.. (Logger/getLogger "global") (setLevel Level/FINEST)))
+
+(defonce use-slf4 (or (install-slf4j-bridge) 1))
+
+ (set-level :trace)
+
+(defonce ^{:defonce true} server
+ (doto (hornetq-server {:in-vm true :netty 55445}) .start))
+
+(defn show-queues []
+ (println (locate-queue server "/nrepl/client"))
+ (println (locate-queue server "/nrepl/server")))
+
+(defn queues []
+ [(locate-queue server "/nrepl/client")
+ (locate-queue server "/nrepl/server")])
+
+(defn pprint-queues []
+ (pprint (map bean (queues))))
+
+(deftest start-test
+ (testing "start and stop"
+ (let [server (start-server {:transport :in-vm})]
+ (is server)
+ (stop-server server)))
+ (testing "simple eval"
+ (let [server (start-server {:transport :in-vm})
+ client (start-client {:transport :in-vm})
+ p (promise)]
+ (try
+ (is server)
+ (is client)
+ (is (:id @client))
+ (Thread/sleep 100)
+ (send-message
+ client
+ {:op "eval" :code "(let [x 1] (* 2 x))"}
+ #(deliver p (:value %)))
+ (is (= "2" @p))
+ (finally
+ (stop-client client)
+ (stop-server server))))))
+
+(deftest url-connect-test
+ (is (url-connect "hornetq://localhost/?transport=in-vm"))
+ (is (url-connect "hornetq://localhost:55445/?transport=netty")))
View
2 project.clj
@@ -4,7 +4,7 @@
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:scm {:url "git@github.com:pallet/ritz.git"}
- :sub ["repl-utils" "debugger" "nrepl" "swank" "lein-ritz"]
+ :sub ["repl-utils" "debugger" "nrepl" "nrepl-hornetq" "swank" "lein-ritz"]
:plugins [[lein-sub "0.2.3"]]
:aliases {"clean" ["sub" "clean"]
"install" ["sub" "install"]

0 comments on commit 4df592f

Please sign in to comment.