Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fix race condition in seque

Signed-off-by: Stuart Halloway <stu@thinkrelevance.com>
  • Loading branch information...
commit 0e3a535711044c68c34c34caa26c8cb05e10a346 1 parent 54f790e
@amalloy amalloy authored stuarthalloway committed
Showing with 40 additions and 11 deletions.
  1. +18 −11 src/clj/clojure/core.clj
  2. +22 −0 test/clojure/test_clojure/agents.clj
View
29 src/clj/clojure/core.clj
@@ -4756,18 +4756,25 @@
n-or-q
(LinkedBlockingQueue. (int n-or-q)))
NIL (Object.) ;nil sentinel since LBQ doesn't support nils
- agt (agent (seq s))
+ agt (agent (lazy-seq s)) ; never start with nil; that signifies we've already put eos
+ log-error (fn [q e]
+ (if (.offer q q)
+ (throw e)
+ e))
fill (fn [s]
- (try
- (loop [[x & xs :as s] s]
- (if s
- (if (.offer q (if (nil? x) NIL x))
- (recur xs)
- s)
- (.put q q))) ; q itself is eos sentinel
- (catch Exception e
- (.put q q)
- (throw e))))
+ (when s
+ (if (instance? Exception s) ; we failed to .offer an error earlier
+ (log-error q s)
+ (try
+ (loop [[x & xs :as s] (seq s)]
+ (if s
+ (if (.offer q (if (nil? x) NIL x))
+ (recur xs)
+ s)
+ (when-not (.offer q q) ; q itself is eos sentinel
+ ()))) ; empty seq, not nil, so we know to put eos next time
+ (catch Exception e
+ (log-error q e))))))
drain (fn drain []
(lazy-seq
(let [x (.take q)]
View
22 test/clojure/test_clojure/agents.clj
@@ -151,6 +151,28 @@
(.join))
(is (= @a :thread-binding))))
+;; check for a race condition that was causing seque to leak threads from the
+;; send-off pool. Specifically, if we consume all items from the seque, and
+;; the LBQ continues to grow, it means there was an agent action blocking on
+;; the .put, which would block indefinitely outside of this test.
+(deftest seque-threads
+ (let [queue-size 5
+ slow-seq (for [x (take (* 2 queue-size) (iterate inc 0))]
+ (do (Thread/sleep 25)
+ x))
+ small-lbq (java.util.concurrent.LinkedBlockingQueue. queue-size)
+ worker (seque small-lbq slow-seq)]
+ (doall worker)
+ (is (= worker slow-seq))
+ (Thread/sleep 250) ;; make sure agents have time to run or get blocked
+ (let [queue-backlog (.size small-lbq)]
+ (is (<= 0 queue-backlog queue-size))
+ (when-not (zero? queue-backlog)
+ (.take small-lbq)
+ (Thread/sleep 250) ;; see if agent was blocking, indicating a thread leak
+ (is (= (.size small-lbq)
+ (dec queue-backlog)))))))
+
; http://clojure.org/agents
; agent
Please sign in to comment.
Something went wrong with that request. Please try again.