diff --git a/leftparen.scm b/leftparen.scm index 7f6e57e..1952edf 100644 --- a/leftparen.scm +++ b/leftparen.scm @@ -187,6 +187,8 @@ ;; task queues make-threaded-task-queue sleep-task-thread-for-at-least + task-inspector-lock + task-inspector-num-tasks-thunk ) diff --git a/task-queue.scm b/task-queue.scm index 901e731..19b881e 100644 --- a/task-queue.scm +++ b/task-queue.scm @@ -2,11 +2,38 @@ (require "util.scm") -(provide make-threaded-task-queue sleep-task-thread-for-at-least) +;; +;; Task Queues +;; +;; This library allows you to create synchronized "task queues". In normal use, +;; you add tasks (which are just thunks) onto the queue. Meanwhile, the queue itself +;; tries to evaluate the thunks, in FIFO order, one at a time. When the thread runs +;; out of tasks, then it suspends itself. But, when a new task is received, the +;; thread is automatically resumed, so, in effect, it's always ready to work. +;; +;; Use (make-threaded-task-queue) to generate a VALUES of a thread that is suspended +;; (which contains an empty task queue), and a procedure of one argument (a task thunk) +;; which is the task adder. You may never need to use the returned thread, but if +;; you want to put a task thread on hiatus, use, e.g., +;; (sleep-task-thread-for-at-least 30) +;; which suspends processing of tasks for at least 30 seconds. In practice, the time +;; the thread waits should be just every so slightly more than the given seconds value. +;; +;; We allow for "one off" LIFO queue semantics. That is, if you add a task using +;; the task adder function (assume you've named it add!), you can write +;; (add! (lambda () ...some task...) #:add-to-front #t) +;; and this task will be the next one to be chosen for evaluation. +;; + +(provide make-threaded-task-queue + sleep-task-thread-for-at-least + task-inspector-lock + task-inspector-num-tasks-thunk) ;; returns a VALUES of a thread that is suspended and a procedure for adding thunk tasks -; to the queue -(define (make-threaded-task-queue) +;; to the queue; if return-inspector is #t, return a third value which is an inspector +;; object, which can be accessed with the task-inspector-* fns. +(define (make-threaded-task-queue #:return-inspector (return-inspector #f)) (let ((queue (make-task-queue)) (just-created #t)) (letrec ((thread-thunk (lambda () @@ -19,29 +46,38 @@ (let ((wait-until (task-queue-msg-wait-until msg)) (now (current-seconds))) (if (> wait-until now) - ;; then we found a relevant wait until msg + ;; so we received a message that told us to + ;; wait...so sleep the thread for that long: (begin (sleep (- wait-until now)) (thread-thunk)) - ;; otherwise, the msg wasn't valid, so try again + ;; otherwise, the msg is no longer relevant, + ;; so try again: (lp))) - ;; o/w we attack the queue + ;; no msg? then attack the queue: (let ((more? (task-pop-and-handle! queue))) (if more? (thread-thunk) (begin (thread-suspend the-thread) (thread-thunk))))))))) (the-thread (thread thread-thunk))) - (values the-thread (lambda (task-thunk) - (task-push! queue task-thunk) - (thread-resume the-thread)))))) + (let ((add-task-fn (lambda (task-thunk #:add-to-front (add-to-front #f)) + (task-push! queue task-thunk #:add-to-front add-to-front) + (thread-resume the-thread)))) + (if return-inspector + (values the-thread add-task-fn (make-a-task-inspector queue)) + (values the-thread add-task-fn)))))) + +(define-struct task-inspector (lock num-tasks-thunk)) + +(define (make-a-task-inspector queue) + (make-task-inspector (task-queue-dat-lock queue) + (lambda () (length (task-queue-dat-lst queue))))) (define-struct task-queue-msg (wait-until)) (define (sleep-task-thread-for-at-least a-thread secs-to-wait) (thread-send a-thread (make-task-queue-msg (+ (current-seconds) secs-to-wait)))) -;; it should be FIFO to preserve semantics as much as possible - ;; IDEA: have tasks that are "infinite" ..i.e., once you execute one, it pops back on ;; to the end of the queue. @@ -50,11 +86,12 @@ (define (make-task-queue) (make-task-queue-dat (make-lock) '() #f)) -(define (tasks-todo? queue) - (not (null? (task-queue-dat-lst queue)))) +;; We implement the queue as a mutable list, where the next task to do is the +;; car of the list. We keep a last task pointer so that we can easily add to the end +;; of the queue. -;; executes the next task if one is available; returns #t if there are more tasks left -;; (after the one just executed). #f o/w. +;; executes the next task if one is available; returns #t if there *are* more tasks left +;; (after the one just executed). returns #f if no tasks are available. (define (task-pop-and-handle! queue) (sync-on-lock (task-queue-dat-lock queue) @@ -66,12 +103,18 @@ (next-task))) (tasks-todo? queue)))) -(define (task-push! queue task-thunk) +(define (task-push! queue task-thunk #:add-to-front (add-to-front #f)) (let ((cell-for-end (mcons task-thunk '()))) (sync-on-lock (task-queue-dat-lock queue) (if (not (tasks-todo? queue)) (set-task-queue-dat-lst! queue cell-for-end) - (set-mcdr! (task-queue-dat-last-task-ptr queue) cell-for-end)) - (set-task-queue-dat-last-task-ptr! queue cell-for-end)))) - + (if add-to-front + (set-task-queue-dat-lst! queue (mcons task-thunk (task-queue-dat-lst queue))) + (set-mcdr! (task-queue-dat-last-task-ptr queue) cell-for-end))) + (unless add-to-front + (set-task-queue-dat-last-task-ptr! queue cell-for-end))))) + +;; not synchronized, so make sure you use in a sync block. +(define (tasks-todo? queue) + (not (null? (task-queue-dat-lst queue))))