Skip to content
Newer
Older
100644 281 lines (260 sloc) 9.97 KB
4e01ed9 @pkhuong Thread pool with recursive task generation
authored
1 (defpackage "WORK-QUEUE"
2 (:use "CL" "SB-EXT" "SB-THREAD")
08e8906 @pkhuong Document thread-pool + tweak interface
authored
3 (:export "TASK" "TASK-P" "BULK-TASK" "BULK-TASK-P"
4 "TASK-DESIGNATOR"
5 "QUEUE" "MAKE" "P" "ALIVE-P"
693a555 @pkhuong Thread pool diffs
authored
6 "ENQUEUE" "ENQUEUE-ALL" "STOP"
b8af6ce @pkhuong Default bindings in worker threads, use it to refer to the right para…
authored
7 "PUSH-SELF" "PUSH-SELF-ALL"
b18cf37 @pkhuong Recursive waiting in thread-pool, and leak more information to help w…
authored
8 "PROGRESS-UNTIL"
9 "CURRENT-QUEUE" "WORKER-ID" "WORKER-COUNT")
bae856f @pkhuong Split up in work-stack and work-queue
authored
10 (:import-from "WORK-STACK"
08e8906 @pkhuong Document thread-pool + tweak interface
authored
11 "TASK" "TASK-P"
bae856f @pkhuong Split up in work-stack and work-queue
authored
12 "BULK-TASK" "BULK-TASK-P"
08e8906 @pkhuong Document thread-pool + tweak interface
authored
13 "TASK-DESIGNATOR"))
14
15 ;;; Work-unit queue/stack, with thread-pool
16 ;;;
17 ;;; Normal work queue: created with a fixed number of worker threads,
18 ;;; and a shared FIFO of work units (c.f. work-stack).
19 ;;;
20 ;;; However, each worker also has a work-stack. This way, tasks can
21 ;;; spawn new tasks recursively, while enjoying temporal locality and
22 ;;; skipping in front of the rest of the queue.
23 ;;;
24 ;;; ENQUEUE/ENQUEUE-ALL insert work units in the queue.
25 ;;;
26 ;;; PUSH-SELF/PUSH-SELF-ALL insert work units in the worker's local
27 ;;; stack, or, if not executed by a worker, punt to ENQUEUE/ENQUEUE-ALL.
28 ;;;
29 ;;; Note that the work-stacks support task-stealing, so pushing to the
30 ;;; local stack does not reduce parallelism.
4e01ed9 @pkhuong Thread pool with recursive task generation
authored
31
32 (in-package "WORK-QUEUE")
33
34 (defstruct (queue
e28c720 @pkhuong Eschew real locking over condvar in work queue; we have a lock-free q…
authored
35 (:constructor %make-queue))
36 (locks (error "foo") :type (simple-array mutex 1)
4e01ed9 @pkhuong Thread pool with recursive task generation
authored
37 :read-only t)
38 (cvar (make-waitqueue) :type waitqueue
39 :read-only t)
40 (nthread (error "foo") :type (and unsigned-byte fixnum)
41 :read-only t)
42 (state (error "foo") :type cons
43 :read-only t)
bae856f @pkhuong Split up in work-stack and work-queue
authored
44 (queue (sb-queue:make-queue) :type sb-queue:queue)
45 (stacks (error "Foo") :type (simple-array work-stack:stack 1)
4e01ed9 @pkhuong Thread pool with recursive task generation
authored
46 :read-only t)
47 (threads (error "Foo") :type (simple-array t 1)
48 :read-only t))
49
50 (declaim (inline p))
51 (defun p (x)
52 (queue-p x))
53
bae856f @pkhuong Split up in work-stack and work-queue
authored
54 (defun grab-task (queue stacks i)
55 (let ((task (sb-queue:dequeue queue)))
56 (when task
57 (return-from grab-task task)))
4e01ed9 @pkhuong Thread pool with recursive task generation
authored
58 (let ((n (length stacks)))
59 (dotimes (j n)
60 (let* ((i (mod (+ i j) n))
cbcbf0d @pkhuong More robust get-task
authored
61 (task (or (work-stack:steal (aref stacks i))
62 (sb-queue:dequeue queue))))
4e01ed9 @pkhuong Thread pool with recursive task generation
authored
63 (when task
64 (return-from grab-task task))))))
65
693a555 @pkhuong Thread pool diffs
authored
66 (defvar *worker-id* nil)
b6ca3dd @pkhuong Track default hint for bulk tasks better in thread-pool; also name wo…
authored
67 (defvar *worker-hint* 0)
08e8906 @pkhuong Document thread-pool + tweak interface
authored
68 (defvar *current-queue* nil)
693a555 @pkhuong Thread pool diffs
authored
69
b18cf37 @pkhuong Recursive waiting in thread-pool, and leak more information to help w…
authored
70 (declaim (inline current-queue worker-id worker-count))
35b669d @pkhuong Refactor work-queue:current-queue usage
authored
71 (defun current-queue (&optional default)
72 (if *current-queue*
73 (weak-pointer-value *current-queue*)
74 default))
b8af6ce @pkhuong Default bindings in worker threads, use it to refer to the right para…
authored
75
b18cf37 @pkhuong Recursive waiting in thread-pool, and leak more information to help w…
authored
76 (defun worker-id ()
77 *worker-id*)
78
1ec0d76 @pkhuong More bugfixes in parallel primitives
authored
79 (defun worker-count (&optional (queue (current-queue)))
80 (and queue (queue-nthread queue)))
b18cf37 @pkhuong Recursive waiting in thread-pool, and leak more information to help w…
authored
81
82 (defun loop-get-task (state lock cvar queue stacks i
83 &optional max-time)
fd1fc48 @pkhuong More spin loop before going to slow mutex/condvar path waiting for mo…
authored
84 (flet ((try ()
85 (when (eql (car state) :done)
86 (return-from loop-get-task nil))
87 (let ((task (grab-task queue stacks i)))
88 (when task
89 (return-from loop-get-task task)))))
90 (declare (inline try))
3182bff @pkhuong Tweak wait times in work queue
authored
91 (let ((timeout 1e-4)
b18cf37 @pkhuong Recursive waiting in thread-pool, and leak more information to help w…
authored
92 (total 0d0)
2ae2722 @pkhuong heavier busy-looping in thread pool
authored
93 (fast t))
b18cf37 @pkhuong Recursive waiting in thread-pool, and leak more information to help w…
authored
94 (declare (single-float timeout)
95 (double-float total))
cb7bc6c @pkhuong Undo damage in thread-pool
authored
96 (loop
97 (if fast
98 (dotimes (i 128)
99 (try)
100 (loop repeat (* i 128)
101 do (spin-loop-hint)))
102 (try))
103 ;; Don't do this at home.
104 (setf fast nil)
105 (with-mutex (lock)
106 (if (condition-wait cvar lock :timeout timeout)
107 (setf fast t)
108 (grab-mutex lock)))
109 (when (and max-time
110 (> (incf total timeout) max-time))
111 (return :timeout))
112 (setf timeout (min 1.0 (* timeout 1.1)))))))
fd1fc48 @pkhuong More spin loop before going to slow mutex/condvar path waiting for mo…
authored
113
4013654 @pkhuong Factor our worker loop logic
authored
114 (declaim (inline %worker-loop))
115 (defun %worker-loop (weak-queue index hint &optional poll-function wait-time)
3dcc430 @pkhuong Smarter recursive waiting: work on our own stack before stealing
authored
116 (let* ((wait-time (and poll-function (or wait-time 1)))
117 (wqueue (or (weak-pointer-value weak-queue)
118 (return-from %worker-loop)))
119 (i index)
120 (state (queue-state wqueue))
121 (cvar (queue-cvar wqueue))
122 (locks (queue-locks wqueue))
123 (lock (aref locks i))
124 (queue (queue-queue wqueue))
125 (stacks (queue-stacks wqueue))
126 (stack (aref stacks i)))
127 (labels ((poll ()
128 (when poll-function
129 (let ((x (funcall poll-function)))
130 (when x (return-from %worker-loop x)))))
131 (work ()
132 (loop while (progn
133 (poll)
134 (work-stack:run-one stack))
135 do (when (eq (car state) :done)
136 (return-from %worker-loop)))))
137 (declare (inline poll work))
138 (work)
139 (let ((task (loop-get-task state lock cvar
140 queue stacks i
141 wait-time)))
142 (unless (and task queue)
143 (return-from %worker-loop))
144 (cond (poll-function
145 (when (eq task :timeout)
cbcbf0d @pkhuong More robust get-task
authored
146 (poll)
147 (return-from %worker-loop)))
3dcc430 @pkhuong Smarter recursive waiting: work on our own stack before stealing
authored
148 (t
149 (assert (not (eq task :timeout)))))
150 (if (bulk-task-p task)
151 (work-stack:push stack task hint)
152 (work-stack:execute-task task))
153 (work)
154 (setf queue nil)))))
4013654 @pkhuong Factor our worker loop logic
authored
155
b8af6ce @pkhuong Default bindings in worker threads, use it to refer to the right para…
authored
156 (defun %make-worker (wqueue i &optional binding-names binding-compute)
4013654 @pkhuong Factor our worker loop logic
authored
157 (declare (type queue wqueue))
158 (let* ((state (queue-state wqueue))
b6ca3dd @pkhuong Track default hint for bulk tasks better in thread-pool; also name wo…
authored
159 (nthread (queue-nthread wqueue))
e788ffb @pkhuong Try to make context smarter and allow finalisation
authored
160 (hint (float (/ i nthread) 1d0))
161 (weak-queue (make-weak-pointer wqueue)))
693a555 @pkhuong Thread pool diffs
authored
162 (make-thread
e788ffb @pkhuong Try to make context smarter and allow finalisation
authored
163 (lambda (&aux (*worker-id* i) (*current-queue* weak-queue) (*worker-hint* hint))
b8af6ce @pkhuong Default bindings in worker threads, use it to refer to the right para…
authored
164 (progv binding-names (mapcar (lambda (x)
165 (if (functionp x) (funcall x) x))
166 binding-compute)
4013654 @pkhuong Factor our worker loop logic
authored
167 (loop
04ae010 @pkhuong Scrub worker threads' stack from time to time
authored
168 (flet ((inner ()
4013654 @pkhuong Factor our worker loop logic
authored
169 (%worker-loop weak-queue i hint)))
7becef2 @pkhuong Trivial refactors. Sanity checked all logic below xecto-impl
authored
170 (declare (notinline inner))
04ae010 @pkhuong Scrub worker threads' stack from time to time
authored
171 (inner)
4013654 @pkhuong Factor our worker loop logic
authored
172 (sb-sys:scrub-control-stack)
173 (when (eq (car state) :done)
174 (return))))))
b6ca3dd @pkhuong Track default hint for bulk tasks better in thread-pool; also name wo…
authored
175 :name (format nil "Work queue worker ~A/~A" i nthread))))
4e01ed9 @pkhuong Thread pool with recursive task generation
authored
176
b18cf37 @pkhuong Recursive waiting in thread-pool, and leak more information to help w…
authored
177 (defun progress-until (condition)
178 (let* ((condition (if (functionp condition)
179 condition (fdefinition condition)))
180 (i (worker-id))
181 (weak-queue *current-queue*)
4013654 @pkhuong Factor our worker loop logic
authored
182 (state (queue-state
183 (or (current-queue)
184 (error "Not in recursive wait?!"))))
b18cf37 @pkhuong Recursive waiting in thread-pool, and leak more information to help w…
authored
185 (wait-time 1d-3))
186 (tagbody
187 retry
188 (labels ((check ()
189 (let ((value (funcall condition)))
190 (when value (return-from progress-until value))))
191 (inner ()
4013654 @pkhuong Factor our worker loop logic
authored
192 (%worker-loop weak-queue i *worker-hint*
193 #'check wait-time)))
b18cf37 @pkhuong Recursive waiting in thread-pool, and leak more information to help w…
authored
194 (declare (notinline inner)
195 (inline check))
13e4c3a @pkhuong Fix progress-until (recursive working)
authored
196 (check)
b18cf37 @pkhuong Recursive waiting in thread-pool, and leak more information to help w…
authored
197 (inner)
198 (sb-sys:scrub-control-stack)
4013654 @pkhuong Factor our worker loop logic
authored
199 (unless (eql :done (car state))
200 (go retry))
201 (check)))))
b18cf37 @pkhuong Recursive waiting in thread-pool, and leak more information to help w…
authored
202
1bc70e7 @pkhuong Reorganisation, nicer inheritance for parallel futures
authored
203 (defun make (nthread &optional constructor &rest arguments)
204 (declare (type (and unsigned-byte fixnum) nthread)
205 (dynamic-extent arguments))
e28c720 @pkhuong Eschew real locking over condvar in work queue; we have a lock-free q…
authored
206 (let* ((threads (make-array nthread))
b8af6ce @pkhuong Default bindings in worker threads, use it to refer to the right para…
authored
207 (default-bindings (getf arguments :bindings))
208 (arguments (loop for (key value) on arguments by #'cddr
209 unless (eql key :bindings)
210 nconc (list key value)))
e28c720 @pkhuong Eschew real locking over condvar in work queue; we have a lock-free q…
authored
211 (wqueue (apply (or constructor #'%make-queue)
212 :locks (map-into (make-array nthread) #'make-mutex)
213 :cvar (make-waitqueue)
214 :nthread nthread
215 :state (list :running)
216 :queue (sb-queue:make-queue)
217 :stacks (map-into (make-array nthread) #'work-stack:make)
218 :threads threads
219 arguments)))
220 (finalize wqueue (let ((cvar (queue-cvar wqueue))
bae856f @pkhuong Split up in work-stack and work-queue
authored
221 (state (queue-state wqueue)))
4e01ed9 @pkhuong Thread pool with recursive task generation
authored
222 (lambda ()
e28c720 @pkhuong Eschew real locking over condvar in work queue; we have a lock-free q…
authored
223 (setf (car state) :done)
224 (condition-broadcast cvar))))
b8af6ce @pkhuong Default bindings in worker threads, use it to refer to the right para…
authored
225 (let ((binding-names (mapcar #'car default-bindings))
226 (binding-values (mapcar #'cdr default-bindings)))
227 (dotimes (i nthread wqueue)
228 (setf (aref threads i)
229 (%make-worker wqueue i binding-names binding-values))))))
4e01ed9 @pkhuong Thread pool with recursive task generation
authored
230
231 (defun stop (queue)
232 (declare (type queue queue))
e28c720 @pkhuong Eschew real locking over condvar in work queue; we have a lock-free q…
authored
233 (setf (car (queue-state queue)) :done)
234 (condition-broadcast (queue-cvar queue))
693a555 @pkhuong Thread pool diffs
authored
235 nil)
4e01ed9 @pkhuong Thread pool with recursive task generation
authored
236
237 (defun alive-p (queue)
238 (declare (type queue queue))
239 (eql (car (queue-state queue)) :running))
240
9bf46f7 @pkhuong Fix some uses of *current-queue* to go through function
authored
241 (defun enqueue (task &optional (queue (current-queue)))
08e8906 @pkhuong Document thread-pool + tweak interface
authored
242 (declare (type task-designator task)
243 (type queue queue))
e28c720 @pkhuong Eschew real locking over condvar in work queue; we have a lock-free q…
authored
244 (assert (alive-p queue))
245 (sb-queue:enqueue task (queue-queue queue))
246 (condition-broadcast (queue-cvar queue))
4e01ed9 @pkhuong Thread pool with recursive task generation
authored
247 nil)
248
9bf46f7 @pkhuong Fix some uses of *current-queue* to go through function
authored
249 (defun enqueue-all (tasks &optional (queue (current-queue)))
4e01ed9 @pkhuong Thread pool with recursive task generation
authored
250 (declare (type queue queue))
e28c720 @pkhuong Eschew real locking over condvar in work queue; we have a lock-free q…
authored
251 (assert (alive-p queue))
252 (let ((queue (queue-queue queue)))
253 (map nil (lambda (task)
254 (sb-queue:enqueue task queue))
255 tasks))
256 (condition-broadcast (queue-cvar queue))
4e01ed9 @pkhuong Thread pool with recursive task generation
authored
257 nil)
693a555 @pkhuong Thread pool diffs
authored
258
9bf46f7 @pkhuong Fix some uses of *current-queue* to go through function
authored
259 (defun push-self (task &optional (queue (current-queue)))
693a555 @pkhuong Thread pool diffs
authored
260 (declare (type queue queue)
08e8906 @pkhuong Document thread-pool + tweak interface
authored
261 (type task-designator task))
693a555 @pkhuong Thread pool diffs
authored
262 (assert (alive-p queue))
263 (let ((id *worker-id*))
264 (cond (id
265 (assert (eql (aref (queue-threads queue) id)
266 *current-thread*))
b6ca3dd @pkhuong Track default hint for bulk tasks better in thread-pool; also name wo…
authored
267 (work-stack:push (aref (queue-stacks queue) id) task *worker-hint*))
693a555 @pkhuong Thread pool diffs
authored
268 (t
606e16f @pkhuong Fix internal usage of enqueue[-all], and make sure to reacquire locks…
authored
269 (enqueue task queue)))))
693a555 @pkhuong Thread pool diffs
authored
270
9bf46f7 @pkhuong Fix some uses of *current-queue* to go through function
authored
271 (defun push-self-all (tasks &optional (queue (current-queue)))
693a555 @pkhuong Thread pool diffs
authored
272 (declare (type queue queue))
273 (assert (alive-p queue))
274 (let ((id *worker-id*))
275 (cond (id
276 (assert (eql (aref (queue-threads queue) id)
277 *current-thread*))
b6ca3dd @pkhuong Track default hint for bulk tasks better in thread-pool; also name wo…
authored
278 (work-stack:push-all (aref (queue-stacks queue) id) tasks *worker-hint*))
693a555 @pkhuong Thread pool diffs
authored
279 (t
606e16f @pkhuong Fix internal usage of enqueue[-all], and make sure to reacquire locks…
authored
280 (enqueue-all tasks queue)))))
Something went wrong with that request. Please try again.