Permalink
Browse files

Add record-blockage

  • Loading branch information...
1 parent 7640f97 commit 32c04d60c7205cff101ca9a984cb1d50643d74b5 @jpalmucci committed Aug 20, 2010
Showing with 110 additions and 44 deletions.
  1. 0 {src → }/LICENSE
  2. +8 −0 README
  3. +2 −1 project.clj
  4. +37 −4 src/yield.clj
  5. +0 −39 test/clj_yield/core_test.clj
  6. +63 −0 test/test.clj
View
File renamed without changes.
View
8 README
@@ -30,3 +30,11 @@ as a simple loop.
sequence. Since 'yield' computes its output using a (single) background
thread, this isn't a problem. (Note: you can still spawn new threads
inside the body of a with-yielding.)
+
+3) I wanted to move a time consuming operation into another
+thread. Lazy sequences acting as buffered queues are great at
+sychronizing producer and consumer threads.
+
+##
+
+TODO: document record-blockage
View
@@ -1,5 +1,6 @@
(defproject clj-yield/clj-yield "1.0"
:description "A function like python's yield statement that can push items to a lazy sequence from arbitrary (non-lazy) code."
:dependencies [[org.clojure/clojure "1.2.0-RC1"]
- [org.clojure/clojure-contrib "1.2.0-RC1"]] )
+ [org.clojure/clojure-contrib "1.2.0-RC1"]
+ [clj-iterate/clj-iterate "0.9"]] )
View
@@ -1,5 +1,5 @@
(ns yield
- (:use clojure.test))
+ (:use clojure.test iterate))
;; ********************************************************************************
;;
@@ -71,10 +71,39 @@ Yielding 4
`(with-yielding* ~n
(bound-fn [~name]
- ~@body)))
+ ~@body)
+ (file-position 1)))
-(defn with-yielding* [n f]
- (let [queue (atom (java.util.concurrent.LinkedBlockingQueue. (int n)))
+(def *blockage-map* (atom nil))
+
+(defn make-persistent [map]
+ "remove the atoms from a blockage map"
+ (iterate/iter {for [loc {block :block non-block :non-block}] in map}
+ {assoc {:block @block :non-block @non-block} key loc}))
+
+(defmacro record-blockage [& body]
+ "Record where and how many times a with-yielding seq is blocked
+ because the consumers are not consuming fast enough. Good for
+ detecting bottlenecks"
+ `(let [old# @*blockage-map*]
+ (try
+ (reset! *blockage-map* {})
+ ~@body
+ (make-persistent @*blockage-map*)
+ (finally
+ (reset! *blockage-map* nil)))))
+
+
+(defn with-yielding* [n f pos]
+ (let [blockage-rec (and @*blockage-map*
+ (swap! *blockage-map*
+ (fn [map]
+ (if (contains? map pos)
+ map
+ (assoc map pos {:block (atom 0) :non-block (atom 0)}))))
+ (@*blockage-map* pos))
+
+ queue (atom (java.util.concurrent.LinkedBlockingQueue. (int n)))
ft (future
(try
(f queue)
@@ -83,6 +112,10 @@ Yielding 4
(.offer @queue e 10 java.util.concurrent.TimeUnit/DAYS))
(finally (.offer @queue *end-marker* 10 java.util.concurrent.TimeUnit/DAYS))))
get-ele (fn get-ele [guard]
+ (if blockage-rec
+ (if (= (.remainingCapacity ^java.util.concurrent.LinkedBlockingQueue @queue) 0)
+ (swap! (blockage-rec :block) inc)
+ (swap! (blockage-rec :non-block) inc)))
(let [ele (.take ^java.util.concurrent.LinkedBlockingQueue @queue) ]
(cond (= ele *end-marker*) ()
@@ -1,39 +0,0 @@
-(ns clj-yield.core-test
- (:use [yield] :reload-all)
- (:use [clojure.test]))
-
-(deftest with-yielding-test
- ;; check that the sequence is correctly generated
- (is (= (apply + (with-yielding [out 10]
- (loop [x 1000]
- (if (pos? x)
- (do
- (yield out x)
- (recur (dec x)))))))
- 500500))
-
- ;; Throw an exception, but don't read far enough to get it.
- ;; checks lazy semantics
- (is (= (first (with-yielding [out 10]
- (loop [x 1000]
- (if (pos? x)
- (do
- (yield out x)
- (recur (dec x)))))
- (throw (Exception. "exception"))))
- 1000))
-
- ;; hit the exception to test the exception throwing mechanism
- (is (= (try
- (count
- (with-yielding [out 10]
- (loop [x 1000]
- (if (pos? x)
- (do
- (yield out x)
- (recur (dec x)))))
- (throw (Exception. "exception"))))
- (catch Exception e :exception))
- :exception)))
-
-
View
@@ -0,0 +1,63 @@
+(ns test
+ (use yield)
+ (use iterate)
+ (use clojure.test))
+
+(defn slow-producer []
+ (iter {for x in (with-yielding [out 1]
+ (iter {for i from 1 to 5}
+ (Thread/sleep 500)
+ (yield out i)))}
+ {collect x}))
+
+(defn slow-consumer []
+ (iter {for x in (with-yielding [out 1]
+ (iter {for i from 1 to 5}
+ (yield out i)))}
+ (Thread/sleep 500)
+ {collect x}))
+
+(deftest with-yielding-test
+ ;; check that the sequence is correctly generated
+ (is (= (apply + (with-yielding [out 10]
+ (loop [x 1000]
+ (if (pos? x)
+ (do
+ (yield out x)
+ (recur (dec x)))))))
+ 500500))
+
+ ;; Throw an exception, but don't read far enough to get it.
+ ;; checks lazy semantics
+ (is (= (first (with-yielding [out 10]
+ (loop [x 1000]
+ (if (pos? x)
+ (do
+ (yield out x)
+ (recur (dec x)))))
+ (throw (Exception. "exception"))))
+ 1000))
+
+ ;; hit the exception to test the exception throwing mechanism
+ (is (= (try
+ (count
+ (with-yielding [out 10]
+ (loop [x 1000]
+ (if (pos? x)
+ (do
+ (yield out x)
+ (recur (dec x)))))
+ (throw (Exception. "exception"))))
+ (catch Exception e :exception))
+ :exception))
+
+ (is (= (first (vals (record-blockage
+ (slow-consumer))))
+ {:block 5, :non-block 1}))
+
+ (is (= (first (vals (record-blockage
+ (slow-producer))))
+ {:block 0, :non-block 6}))
+ )
+
+

0 comments on commit 32c04d6

Please sign in to comment.