Threading Queue - splitting work across several threads
(threading-feed (:initial-contents '(1 2 3 4 5)) (:parallel 3 (sleep 1) *) (format t "output single-threaded: ~a~%" *) (:parallel 2 (format t "output multi-threaded: ~a~%" *)))
This Common Lisp library provides an easy way to split work over several workers.
Elements are returned with a second value of
T, so a
NIL will come out as
(values NIL T). As soon as a queue is empty it returns
(values NIL NIL), and won't call your code anymore. This is similar to the
There are several options that can be set globally and per-step. Using per-step options in the global sections makes them defaults for the individual steps.
- :initial-contents NIL; evaluated
This expression is evaluated once, and specifies the elements that get fed into the first step.
If this is not specified, the first step will be called until it returns
NIL; a verbatim
NILelement can be inserted into the queue by returning a non-
- :initial-queue NIL
This is used to use an existing TQ as input to the first step.
- :max-concurrent-threads 512; evaluated
The maximum number of simultaneously running threads.
- :want-result T
If this is
T(the default), the return values of the last step are accumulated and returned.
NILonly a single
NILis returned; this is useful if the side-effects of the process are important.
Any expression can be used here; eg.
(progn (+ 2 2))is valid, and makes the first value returned from
- :named NIL
Specifies the name of the
BLOCKthat is put around the generated code.
- :init-code NIL
This names code to be placed before starting the queueing operations.
- :before-stopping NIL
Code given here is put in the main thread, just before waiting for the threads to terminate.
This is a good place to cleanup queues defined with
:queue-named, assuming that the
:max-concurrent-threadslimit allows execution to get there.
Most of these can be given in the global section, and are then used as default for each step.
- :parallel NIL aka :threads; evaluated
How many threads should be allocated for this step?
This is an upper limit; if there's no input left, no additional threads will be started.
- :arg-name *DEFAULT-ARG-NAME*
This symbol gets used as argument for the generated functions; the default is
*, see the example above.
- :batch-size NIL; evaluated
If you want to handle multiple elements at once, you can give this option a positive fixnum.
:batch-size 3the step will get a list with up to 3 elements each time it is called; and the return value must be a list, although it can consist of more or fewer elements. Using
:batch-size 1will give you (and expect) lists, too!
NILyou get and return singular elements.
- :at-end T
This specifies the code that gets run at the end of each thread; the returned values are accumulated into the second value of
- :queue-named NIL
Normally the queue variable names are generated via
GENSYM; with this option the queue that passed data further down can be given a name. This is useful if you want to back-inject values (to get a kind of loop), or to return a queue, to pass it to other functions.
If this is used in the global options, the
:initial-contents-queue gets a name.
Please note that you need to care about the input-reference-counting yourself; one input reference is kept for named queues, and you must clean that up via
- :call-with-fns NIL
This option takes a
lambdathat gets called with three parameters: a closure to read from the input queue (with an optional count parameter, see :batch-size above), a closure that puts all its parameters onto the output queue, and a closure that puts the list it gets as first parameter onto the output queue.
If this is used, no other statements might be given.
(threading-feed () ... (:call-with-fns (lambda (qget qput-rest qput-list) (declare (ignore qput-list)) (iter (for (values items valid) = (funcall qget 3)) (while valid) (funcall qput-rest (apply #'+ items)))))))
The example takes up to 3 numbers from the input queue, and passes their sum to the output queue. Please note the check for end-of-queue (via the
:paralleloption is still available, and the end-on-output-queue signalling is automatically done.
- :filter NIL
This option takes an expression, and discards all elements for which it returns NIL.
(threading-feed (:initial-contents '(1 2 3 4 5 6)) (:filter (oddp *)))
1 3 5.
:arg-namecan be used, too.
This example illustrates which parameters are evaluated and which are not.
I'm aware that
(decf) are not atomic, that the special variable
*thread-count* might be per-thread, etc.
(threading-feed (:parallel ((cpu-count) (floor (cpu-count) 2)) :batch-size (if (filtering?) 1 NIL) :at-start (incf *thread-count*) :at-end (decf *thread-count*)) ...)
READ-function family has the other style of signaling the end of data; their second value is
T to signal end-of-data. If a step consists only of a single expression, and its first symbol is included in
threading-queue:*reverse-2nd-value*, a bit of code to reverse the meaning is put around the statement.
So a typical case might look like this:
(with-open-file (stream "/etc/hosts") (threading-feed () (read-line stream nil nil) (:parallel 3 (cons * (expensive-operation *))) (format t "Result: ~a => ~a~%" (car *) (cdr *))))
Because TSQ uses
sb-concurrency:mailbox (and some other SBCL specifica) internally, it is currently limited to SBCL.
Patches are welcome.
TSQ was written by Philipp Marek,
philipp BAT marek POT priv LOT at. Please excuse the obfuscation.
TSQ is released under the LLGPL, see http://www.cliki.net/LGPL.