Permalink
Browse files

Merge pull request #2 from utahstreetlabs/socket-agents

Allow socket reuse by isolating sockets inside agents
  • Loading branch information...
2 parents 1379b58 + 0acb8c3 commit b2f959b5743219f348627fa832dc8f62e2baa6ae @z00b z00b committed Mar 22, 2012
Showing with 65 additions and 12 deletions.
  1. +50 −9 src/accession/core.clj
  2. +15 −3 test/accession/test/timeout.clj
View
@@ -111,18 +111,59 @@
(.setTcpNoDelay true)
(.setKeepAlive true)))
+(defn- socket-and-streams
+ [spec]
+ (let [socket (doto (socket spec) (.setSoTimeout (:timeout spec)))
+ in (DataInputStream. (BufferedInputStream. (.getInputStream socket)))
+ out (.getOutputStream socket)]
+ [socket in out spec]))
+
+(defn- close-socket-and-streams
+ [[socket in out _]]
+ (.close socket) (.close in) (.close out))
+
+(def socket-atom (atom {}))
+
+(defn reset-sockets! []
+ (swap! socket-atom (fn [hash]
+ (doseq [s-and-s (map deref (vals hash))]
+ (close-socket-and-streams s-and-s))
+ {})))
+
+(defn- socket-agent
+ [spec]
+ (when (not (@socket-atom spec))
+ (swap! socket-atom #(assoc % spec (agent (socket-and-streams spec)))))
+ (@socket-atom spec))
+
(defn request
"Responsible for actually making the request to the Redis
- server. Sets the timeout on the socket if one was specified."
+ server. Sets the timeout on the socket if one was specified.
+
+Uses a long lived open socket owned by an agent to execute the request.
+If the socket throws an exception reading or writing, close it and start
+a new one but do not retry the query.
+
+Throwables thrown in the agent will be manually rethrown in the caller
+thread."
[conn & query]
- (with-open [socket (doto (socket conn)
- (.setSoTimeout (:timeout conn)))
- in (DataInputStream. (BufferedInputStream. (.getInputStream socket)))
- out (.getOutputStream socket)]
- (.write out (.getBytes (apply str query)))
- (if (next query)
- (doall (repeatedly (count query) #(response in)))
- (response in))))
+ (let [p (promise)]
+ (send (socket-agent conn)
+ (fn [[socket in out spec :as s-and-s]]
+ (try
+ (.write out (.getBytes (apply str query)))
+ (deliver p (if (next query)
+ (doall (repeatedly (count query) #(response in)))
+ (response in)))
+ s-and-s
+ (catch Throwable e
+ (deliver p e) (close-socket-and-streams s-and-s)
+ (socket-and-streams spec)))))
+ (let [result (deref p)]
+ ;; this seems potentially slow due to reflection - benchmark, maybe use protocol
+ (if (instance? Throwable result)
+ (throw result)
+ result))))
(defn receive-message
"Used in conjunction with an open channel to handle messages that
@@ -9,7 +9,7 @@
(if-let [request (.readLine rdr)]
(do
(with-open [os (.getOutputStream conn)]
- (do (Thread/sleep 1000)
+ (do (Thread/sleep 500)
(.write os (.getBytes "*3\r\n$3\r\none\r\n$3\r\ntwo\r\n$5\r\nthree\r\n"))
(.flush os)
(.close conn)))))))
@@ -28,13 +28,25 @@
(defn stop-server [server]
(.close (:socket server)))
-#_(deftest test-timeout
+(deftest test-timeout
+ (redis/reset-sockets!)
(let [server (start-server)
conn (redis/connection-map {:port 9000})]
(do (is (= (redis/with-connection conn (redis/get "foo")) ["one" "two" "three"]))
(stop-server server)))
(let [server (start-server)
- conn (redis/connection-map {:port 9000 :timeout 500})]
+ conn (redis/connection-map {:port 9000 :timeout 100})]
(do (is (thrown? Exception
(redis/with-connection conn (redis/get "foo"))))
(stop-server server))))
+
+(deftest test-socket-failure
+ (redis/reset-sockets!)
+ (let [server (start-server)
+ conn (redis/connection-map {:port 9000})]
+ (do (is (= (redis/with-connection conn (redis/get "foo")) ["one" "two" "three"]))
+ (stop-server server)
+ (is (thrown? Exception (redis/with-connection conn (redis/get "foo"))))
+ (let [server (start-server)]
+ (is (= (redis/with-connection conn (redis/get "foo")) ["one" "two" "three"]))
+ (stop-server server)))))

0 comments on commit b2f959b

Please sign in to comment.