Skip to content
Browse files

Remove potentially blocking call to .put on LinkedBlockingQueue

  • Loading branch information...
1 parent 6f5aee8 commit d7e738ea3b83564dd2fea7642ffeb016b9616c31 @marktriggs committed
Showing with 20 additions and 4 deletions.
  1. +20 −4 src/marcgrep/core.clj
View
24 src/marcgrep/core.clj
@@ -13,7 +13,7 @@
[java.io BufferedReader ByteArrayInputStream FileOutputStream PrintWriter]
[java.util Date]
[java.security MessageDigest]
- [java.util.concurrent LinkedBlockingQueue]
+ [java.util.concurrent LinkedBlockingQueue TimeUnit]
(org.mortbay.jetty Server Request Response))
(:require [marcgrep.predicates :as predicates]
[compojure.route :as route]
@@ -218,6 +218,20 @@ predicate to each record and sends matches to the appropriate destination."
(recur))))))))
+(defn offer-while
+ "Keep trying to add an element to a queue, stopping every now and then to
+check whether a predicate still holds. If the predicate becomes false, just
+give up and go home.
+
+Returns true if adding the item eventually succeeds. False otherwise."
+ [^LinkedBlockingQueue queue item pred]
+ (loop []
+ (or (.offer queue item (long 10) TimeUnit/SECONDS)
+ (if (pred)
+ (recur)
+ false))))
+
+
(defn run-jobs
"Run a list of jobs. Spread the work across a bunch of workers and take care
of gathering up and collating their results."
@@ -239,9 +253,11 @@ of gathering up and collating their results."
(if (every? future-done? workers)
nil ; slackers!
(if-let [record (.next marc-records)]
- (do (.put queue record)
- (recur))
- (.put queue :eof)))))
+ (when (offer-while queue record
+ #(not-every? future-done? workers))
+ (recur))
+ (offer-while queue :eof
+ #(not-every? future-done? workers))))))
(doseq [worker workers] @worker)

0 comments on commit d7e738e

Please sign in to comment.
Something went wrong with that request. Please try again.