Permalink
Browse files

A bit of code restructuring; behaviour unchanged, passes all tests.

  • Loading branch information...
1 parent 63553a4 commit 9b4d06a9bf5f9f93fa8646c6d09ffcd5400f5e80 @phmarek committed May 25, 2011
Showing with 118 additions and 115 deletions.
  1. +118 −115 src/threading-queue.lisp
View
@@ -414,59 +414,59 @@
(parse-options (pop steps) +all-options+
:aliases +option-aliases+
:only-keywords t))
- (max-cc-thr-var (gensym "MAX-CONCUR-THR"))
- (thread-count-var (gensym "THR-COUNT"))
- (finished-threads-var (gensym "FINI"))
(stop-marker-var (gensym "STOP-MARKER"))
- (max-thr-var (gensym "MAX-THREADS"))
(m-tq-expr `(make-threading-queue :stop-sym ,stop-marker-var))
(initial-contents (assoc-val :initial-contents global-defaults))
(initial-queue (assoc-val :initial-queue global-defaults))
(ic-var-user (assoc-val :queue-named global-defaults nil))
(want-result (assoc-val :want-result global-defaults))
(ic-var
(if (or initial-contents initial-queue)
- (or ic-var-user (gensym "INIT-CONTENTS")))))
+ (or ic-var-user (gensym "INIT-CONTENTS"))))
+ vars functions code
+ destination)
+ ;;
+ ;;
+ ;; Sanity checks
+ ;;
;(format t "glob-def: ~a~%" global-defaults)
- ;; this name is only for the first queue valid;
- ;; set to NIL, so that other statements can set values,
- ;; but no duplicates variables are generated
(if (assoc-val :uses-tq global-defaults)
(error "~&:uses-tq may not be used in the global options~&"))
(if (and initial-contents initial-queue)
(error "~&:initial-contents is incompatible with :initial-queue."))
+ ;; this name is only for the first queue valid;
+ ;; Set option to NIL, so that statements can set other names,
+ ;; but no duplicates variables are generated
(push (cons :queue-named nil) global-defaults)
;;
+ ;;
+ ;; Initializations
+ ;;
+ (when ic-var
+ (if initial-queue
+ (progn
+ (push `(,ic-var ,initial-queue) vars)
+ (push `(check-type ,ic-var %threading-queue) code))
+ (progn
+ ;; If we have initial contents, we provide an "initial queue" with the data.
+ (push `(,ic-var ,m-tq-expr) vars)
+ (push `(tq-put-list ,ic-var ,initial-contents) code)
+ (unless ic-var-user
+ (push `(tq-input-vanished ,ic-var) code)))))
+ (let ((init-code (assoc-val :init global-defaults)))
+ (when init-code
+ (push
+ `(progn
+ ;; Make the init-code work, regardless
+ ;; whether its (a) or ((a) (b))
+ ,@ (if (consp (first init-code))
+ init-code
+ (list init-code))) code)))
+ ;;
+ ;;
;; For each step, parse options and build the code.
+ ;;
(iter
- ;; Sadly (collecting) doesn't work in (initially).
- (when (first-iteration-p)
- (when ic-var
- (if initial-queue
- (progn
- (collecting `(,ic-var ,initial-queue)
- into vars)
- (collecting `(check-type ,ic-var %threading-queue)
- into code))
- (progn
- ;; If we have initial contents, we provide an "initial queue" with the data.
- (collecting `(,ic-var ,m-tq-expr)
- into vars)
- (collecting `(tq-put-list ,ic-var ,initial-contents)
- into code)
- (unless ic-var-user
- (collecting `(tq-input-vanished ,ic-var)
- into code)))))
- (let ((init-code (assoc-val :init global-defaults)))
- (when init-code
- (collecting
- `(progn
- ;; Make the init-code work, regardless
- ;; whether its (a) or ((a) (b))
- ,@ (if (consp (first init-code))
- init-code
- (list init-code)))
- into code))))
;;
;; Get statements
(for %stmt = (pop steps))
@@ -484,8 +484,7 @@
;; Other, per-statement, values
(for user-queue-name = (assoc-val :queue-named stmt-options))
(if user-queue-name
- (collecting `(tq-new-input ,user-queue-name)
- into code))
+ (push `(tq-new-input ,user-queue-name) code))
(for queue-name =
;; assoc-val default parameter cannot be used, as an
;; element with NIL would get used, too
@@ -495,93 +494,97 @@
initially ic-var)
;;
;; Code block building
- (for destination =
+ (setf destination
(if (or steps want-result)
; steps is here already changed (POP above), so not (CDR steps)
queue-name))
(for fns = (%make-start-fn prev-queue-name destination stmt-counter stmt-options stmt))
- (appending fns into functions)
+ (setf functions (nconc (reverse fns) functions))
(if destination
- (collecting `(,destination ,m-tq-expr) into vars))
- (collecting
+ (push `(,destination ,m-tq-expr) vars))
+ (push
`(new-thread #',(caar (last fns))
,(assoc-val :parallel stmt-options)
,prev-queue-name
,destination)
- into code)
+ code)
;;
;;
- (while steps)
- ;;
- ;; Result generation
- (finally
- (return
- `(let ((,max-cc-thr-var 0)
- (,stop-marker-var ,(assoc-val :stop-marker global-defaults))
- (,max-thr-var ,(assoc-val :max-concurrent-threads global-defaults))
- ,finished-threads-var
- (,thread-count-var (sb-thread:make-semaphore :count 0)))
- ;; the tqs depend on stop-marker-var
- (let , vars
- (labels
- , functions
- (declare (inline ,@(mapcar #'first functions)))
- ;; TODO: lower functions, upper functions & flet?
- ;; the statements given by the user shouldn't see those labels -
- ;; but our lambdas want to reference other lambdas, so we cannot use flet.
- ;; TODO: make normal functions? We'd have to pass a lot
- ;; of data, or use special variables ...
- (labels
- ((chg-thr-count (delta)
- (declare (type fixnum delta))
- ;; negative means more threads allowed (less active)
- (cond
- ((minusp delta)
- (sb-thread:signal-semaphore ,thread-count-var (- delta)))
- ((plusp delta)
- (iter (repeat delta)
- (sb-thread:wait-on-semaphore ,thread-count-var)))))
- (concur-set (to)
- (chg-thr-count (- ,max-cc-thr-var to))
- (setf ,max-cc-thr-var to))
- (new-thread (fn count prev-queue next-queue)
- (iter (repeat (or count 1))
- ;; todo: first thread doesn't increment
- ;; input+output count, so that the main
- ;; thread doesn't need to decrement again
- (chg-thr-count 1)
- (if next-queue
- (tq-new-input next-queue))
- (sb-thread:make-thread
- (lambda ()
- (unwind-protect (funcall fn)
- (push sb-thread:*current-thread* ,finished-threads-var)
- (chg-thr-count -1)
- (if next-queue
- (tq-input-vanished next-queue))
- nil)))
- ;; stop creating threads if there's
- ;; nothing more to do
- ;; TODO: in case of "upwards" injection
- ;; we have to create all threads
- (until (and prev-queue
- (tq-end-of-queue? prev-queue))))
- ;; Now remove the initial 1 from the semaphore
- (if next-queue
- (tq-input-vanished next-queue))))
- (assert (plusp ,max-thr-var))
- (concur-set ,max-thr-var)
- ,@ code
- ;; wait for end
- (concur-set 0)
- ;; return final data and collect threads
- (values
- ,(cond
- ((eq T want-result)
- `(tq-get ,destination t))
- ((null want-result)
- nil)
- (t want-result))
- (mapcar #'sb-thread:join-thread ,finished-threads-var)))))))))))
+ (while steps))
+ ;;
+ ;;
+ ;; Result generation
+ ;;
+ (let ((max-cc-thr-var (gensym "MAX-CONCUR-THR"))
+ (max-thr-var (gensym "MAX-THREADS"))
+ (finished-threads-var (gensym "FINI"))
+ (thread-count-var (gensym "THR-COUNT")))
+ `(let ((,max-cc-thr-var 0)
+ (,stop-marker-var ,(assoc-val :stop-marker global-defaults))
+ (,max-thr-var ,(assoc-val :max-concurrent-threads global-defaults))
+ ,finished-threads-var
+ (,thread-count-var (sb-thread:make-semaphore :count 0)))
+ ;; the tqs depend on stop-marker-var
+ (let ,(reverse vars)
+ (labels
+ ,(reverse functions)
+ (declare (inline ,@(mapcar #'first functions)))
+ ;; TODO: lower functions, upper functions & flet?
+ ;; the statements given by the user shouldn't see those labels -
+ ;; but our lambdas want to reference other lambdas, so we cannot use flet.
+ ;; TODO: make normal functions? We'd have to pass a lot
+ ;; of data, or use special variables ...
+ (labels
+ ((chg-thr-count (delta)
+ (declare (type fixnum delta))
+ ;; negative means more threads allowed (less active)
+ (cond
+ ((minusp delta)
+ (sb-thread:signal-semaphore ,thread-count-var (- delta)))
+ ((plusp delta)
+ (iter (repeat delta)
+ (sb-thread:wait-on-semaphore ,thread-count-var)))))
+ (concur-set (to)
+ (chg-thr-count (- ,max-cc-thr-var to))
+ (setf ,max-cc-thr-var to))
+ (new-thread (fn count prev-queue next-queue)
+ (iter (repeat (or count 1))
+ ;; todo: first thread doesn't increment
+ ;; input+output count, so that the main
+ ;; thread doesn't need to decrement again
+ (chg-thr-count 1)
+ (if next-queue
+ (tq-new-input next-queue))
+ (sb-thread:make-thread
+ (lambda ()
+ (unwind-protect (funcall fn)
+ (push sb-thread:*current-thread* ,finished-threads-var)
+ (chg-thr-count -1)
+ (if next-queue
+ (tq-input-vanished next-queue))
+ nil)))
+ ;; stop creating threads if there's
+ ;; nothing more to do
+ ;; TODO: in case of "upwards" injection
+ ;; we have to create all threads
+ (until (and prev-queue
+ (tq-end-of-queue? prev-queue))))
+ ;; Now remove the initial 1 from the semaphore
+ (if next-queue
+ (tq-input-vanished next-queue))))
+ (assert (plusp ,max-thr-var))
+ (concur-set ,max-thr-var)
+ ,@ (reverse code)
+ ;; wait for end
+ (concur-set 0)
+ ;; return final data and collect threads
+ (values
+ ,(cond
+ ((eq T want-result)
+ `(tq-get ,destination t))
+ ((null want-result)
+ nil)
+ (t want-result))
+ (mapcar #'sb-thread:join-thread ,finished-threads-var)))))))))

0 comments on commit 9b4d06a

Please sign in to comment.