Skip to content

Commit

Permalink
add stealing scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
lmj committed May 5, 2012
1 parent 8577929 commit 10bd989
Show file tree
Hide file tree
Showing 13 changed files with 544 additions and 140 deletions.
2 changes: 1 addition & 1 deletion bench/bench.lisp
Expand Up @@ -137,7 +137,7 @@ results are riffled for comparison."
(mapcar (compose 'funcall 'args-fn) specs))))))))) (mapcar (compose 'funcall 'args-fn) specs)))))))))


(defun call-with-temp-kernel (worker-count fn) (defun call-with-temp-kernel (worker-count fn)
(let1 *kernel* (make-kernel worker-count) (let1 *kernel* (make-kernel worker-count :spin-count 10000)
(unwind-protect (funcall fn) (unwind-protect (funcall fn)
(end-kernel :wait t)))) (end-kernel :wait t))))


Expand Down
8 changes: 6 additions & 2 deletions bench/profile.lisp
Expand Up @@ -74,9 +74,13 @@


(defun enable-profiling () (defun enable-profiling ()
(profile-fns #.(home-functions-in-packages-passing (profile-fns #.(home-functions-in-packages-passing
(curry 'match-package-p "lparallel"))) (lambda (pkg)
;; causes recursion problem in profiler (or (match-package-p "lparallel" pkg)
(match-package-p "bordeaux-threads" pkg)
#+(and sbcl lparallel.with-stealing-scheduler)
(match-package-p "sb-concurrency" pkg)))))
#+(or) #+(or)
;; causes recursion problem in profiler
(profile-fns (sort map-into map reduce))) (profile-fns (sort map-into map reduce)))


(defun profile (&rest args) (defun profile (&rest args)
Expand Down
10 changes: 9 additions & 1 deletion lparallel.asd
Expand Up @@ -28,6 +28,12 @@
;;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE ;;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
;;; OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ;;; OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


;;; default to stealing scheduler on sbcl
#.(when (and (find :sbcl *features*)
(not (find :lparallel.without-stealing-scheduler *features*)))
(pushnew :lparallel.with-stealing-scheduler *features*)
(values))

(defsystem :lparallel (defsystem :lparallel
:version "1.2.2" :version "1.2.2"
:description "Parallelism for Common Lisp" :description "Parallelism for Common Lisp"
Expand Down Expand Up @@ -76,13 +82,15 @@ See http://lparallel.com for documentation and examples.
(:file "queue") (:file "queue")
(:file "counter") (:file "counter")
(:file "biased-queue") (:file "biased-queue")
(:file "spin-queue")
(:module "kernel" (:module "kernel"
:serial t :serial t
:components ((:file "util") :components ((:file "util")
(:file "thread-locals") (:file "thread-locals")
(:file "handling") (:file "handling")
(:file "classes") (:file "classes")
(:file "central-scheduler") #-lparallel.with-stealing-scheduler (:file "central-scheduler")
#+lparallel.with-stealing-scheduler (:file "stealing-scheduler")
(:file "core") (:file "core")
(:file "timeout"))) (:file "timeout")))
(:file "kernel-util") (:file "kernel-util")
Expand Down
1 change: 1 addition & 0 deletions packages-test.lisp
Expand Up @@ -34,6 +34,7 @@
#:lparallel.thread-util #:lparallel.thread-util
#:lparallel.raw-queue #:lparallel.raw-queue
#:lparallel.queue #:lparallel.queue
#:lparallel.spin-queue
#:lparallel.kernel #:lparallel.kernel
#:lparallel.cognate #:lparallel.cognate
#:lparallel.defpun #:lparallel.defpun
Expand Down
17 changes: 16 additions & 1 deletion packages.lisp
Expand Up @@ -128,12 +128,26 @@
#:dec-counter #:dec-counter
#:counter-value)) #:counter-value))


(defpackage #:lparallel.spin-queue
(:use #:cl
#:lparallel.util
#:lparallel.raw-queue
#:lparallel.counter)
(:export #:spin-queue
#:make-spin-queue
#:push-spin-queue
#:pop-spin-queue
#:peek-spin-queue
#:spin-queue-count
#:spin-queue-empty-p))

(defpackage #:lparallel.kernel (defpackage #:lparallel.kernel
(:use #:cl (:use #:cl
#:lparallel.util #:lparallel.util
#:lparallel.thread-util #:lparallel.thread-util
#:lparallel.queue #:lparallel.queue
#:lparallel.biased-queue) #:lparallel.biased-queue
#:lparallel.spin-queue)
(:export #:make-kernel (:export #:make-kernel
#:kernel-worker-count #:kernel-worker-count
#:check-kernel #:check-kernel
Expand All @@ -148,6 +162,7 @@
#:kill-tasks #:kill-tasks
#:task-handler-bind) #:task-handler-bind)
(:export #:*kernel* (:export #:*kernel*
#:*kernel-spin-count*
#:*task-category* #:*task-category*
#:*task-priority*) #:*task-priority*)
(:export #:transfer-error (:export #:transfer-error
Expand Down
16 changes: 5 additions & 11 deletions src/kernel/central-scheduler.lisp
Expand Up @@ -14,9 +14,12 @@


(alias-function biased-queue-lock lparallel.biased-queue::lock) (alias-function biased-queue-lock lparallel.biased-queue::lock)


(alias-function make-scheduler make-biased-queue) (defun make-scheduler (workers spin-count)
(declare (ignore workers spin-count))
(make-biased-queue))


(defun/type schedule-task (scheduler task priority) (scheduler task t) t (defun/type schedule-task (scheduler task priority)
(scheduler (or task null) t) t
(declare #.*normal-optimize*) (declare #.*normal-optimize*)
(ccase priority (ccase priority
(:default (push-biased-queue task scheduler)) (:default (push-biased-queue task scheduler))
Expand All @@ -34,12 +37,3 @@
;; don't steal nil, the end condition flag ;; don't steal nil, the end condition flag
(when (peek-biased-queue/no-lock scheduler) (when (peek-biased-queue/no-lock scheduler)
(pop-biased-queue/no-lock scheduler)))) (pop-biased-queue/no-lock scheduler))))

(setf (macro-function 'with-locked-scheduler)
(macro-function 'with-locked-biased-queue))

(alias-function scheduler-empty-p/no-lock biased-queue-empty-p/no-lock)

(defun/type distribute-tasks/no-lock (scheduler tasks) (scheduler sequence) t
(dosequence (task tasks)
(push-biased-queue/no-lock task scheduler)))
22 changes: 20 additions & 2 deletions src/kernel/classes.lisp
Expand Up @@ -37,9 +37,27 @@


(defslots worker () (defslots worker ()
((thread :reader thread) ((thread :reader thread)
(running-category :reader running-category :initform nil))) (running-category :reader running-category :initform nil)
(index :reader worker-index :type fixnum)
(from-worker :reader from-worker :initform (make-queue) :type queue)
(to-worker :reader to-worker :initform (make-queue) :type queue)
#+lparallel.with-stealing-scheduler
(tasks :reader tasks :type spin-queue)))


(deftype scheduler () 'biased-queue) #+lparallel.with-stealing-scheduler
(defslots scheduler ()
((workers :type simple-vector)
(wait-cvar :initform (make-condition-variable))
(wait-lock :initform (make-lock))
(wait-count :initform 0 :type fixnum)
(notify-count :initform 0)
(spin-count)
(low-priority-tasks :initform (make-spin-queue) :type spin-queue)))

#-lparallel.with-stealing-scheduler
(progn
(deftype scheduler () 'biased-queue)
(defun tasks (scheduler) (declare (ignore scheduler))))


(locally (declare #.*full-optimize*) (locally (declare #.*full-optimize*)
(defslots optimizer () (defslots optimizer ()
Expand Down
148 changes: 75 additions & 73 deletions src/kernel/core.lisp
Expand Up @@ -59,16 +59,28 @@
(exec-task/non-worker task))) (exec-task/non-worker task)))
t)) t))


(defun/type handshake/to-worker (worker) (worker) t
(with-worker-slots (from-worker to-worker) worker
(push-queue 'proceed to-worker)
(assert (eq 'ok (pop-queue from-worker)))))

(defun/type handshake/from-worker (worker) (worker) t
(with-worker-slots (from-worker to-worker) worker
(assert (eq 'proceed (pop-queue to-worker)))
(push-queue 'ok from-worker)))

(defun/type replace-worker (kernel worker) (kernel worker) t (defun/type replace-worker (kernel worker) (kernel worker) t
(with-kernel-slots (workers workers-lock) kernel (with-kernel-slots (workers workers-lock) kernel
(with-lock-held (workers-lock) (with-lock-held (workers-lock)
(let1 index (position worker workers :test #'eq) (let1 index (position worker workers :test #'eq)
(assert index) (assert index)
(assert (eql index (with-worker-slots (index) worker
index)))
(unwind-protect/ext (unwind-protect/ext
:prepare (warn "lparallel: Replacing lost or dead worker.") :prepare (warn "lparallel: Replacing lost or dead worker.")
:main (multiple-value-bind (new-worker guard) (make-worker kernel) :main (let1 new-worker (make-worker kernel index (tasks worker))
(setf (svref workers index) new-worker) (setf (svref workers index) new-worker)
(funcall guard)) (handshake/to-worker new-worker))
:abort (warn "lparallel: Worker replacement failed! ~ :abort (warn "lparallel: Worker replacement failed! ~
Kernel is defunct -- call `end-kernel'.")))))) Kernel is defunct -- call `end-kernel'."))))))


Expand Down Expand Up @@ -105,30 +117,45 @@
context context
kernel)))) kernel))))


(defun/type make-worker (kernel) (kernel) (values worker function) #+lparallel.with-stealing-scheduler
(defun %make-worker (index tasks)
(make-worker-instance :thread nil :index index :tasks tasks))

#-lparallel.with-stealing-scheduler
(defun %make-worker (index tasks)
(declare (ignore tasks))
(make-worker-instance :thread nil :index index))

(defun make-worker (kernel index tasks)
(with-kernel-slots (worker-info) kernel (with-kernel-slots (worker-info) kernel
(with-worker-info-slots (bindings name) worker-info (with-worker-info-slots (bindings name) worker-info
(let* ((worker (make-worker-instance :thread nil)) (let* ((worker (%make-worker index tasks))
(guard (make-queue)) (worker-thread (with-worker-slots (from-worker to-worker) worker
(worker-thread (with-thread (:bindings bindings :name name) (with-thread (:bindings bindings :name name)
(pop-queue guard) (unwind-protect/ext
(enter-worker-loop kernel worker)))) :prepare (handshake/from-worker worker)
:main (enter-worker-loop kernel worker)
:cleanup (push-queue 'exit from-worker))))))
(with-worker-slots (thread) worker (with-worker-slots (thread) worker
(setf thread worker-thread)) (setf thread worker-thread))
(values worker (lambda () (push-queue 'proceed guard))))))) worker))))


(defvar *optimizer* nil) (defvar *optimizer* nil)


(defgeneric make-optimizer-data (specializer) (defgeneric make-optimizer-data (specializer)
(:method ((specializer (eql nil))) (:method ((specializer (eql nil)))
(declare (ignore specializer)))) (declare (ignore specializer))))


(defvar *kernel-spin-count* 10 ; need data to determine a good number
"Default value of the `spin-count' argument to `make-kernel'.")

(defun make-kernel (worker-count (defun make-kernel (worker-count
&key &key
(bindings `((*standard-output* . ,*standard-output*) (bindings `((*standard-output* . ,*standard-output*)
(*error-output* . ,*error-output*))) (*error-output* . ,*error-output*)))
(worker-context #'funcall) (worker-context #'funcall)
(name "lparallel-worker")) (name "lparallel-worker")
(spin-count *kernel-spin-count*))
"Create a kernel with `worker-count' number of worker threads. "Create a kernel with `worker-count' number of worker threads.
`bindings' is an alist for establishing thread-local variables inside `bindings' is an alist for establishing thread-local variables inside
Expand All @@ -141,6 +168,9 @@ function which must be funcalled. It begins the worker loop and will
not return until the worker exits. Default value of `worker-context' not return until the worker exits. Default value of `worker-context'
is #'funcall. is #'funcall.
When a worker discovers that no tasks are available, `spin-count' is
the number of stealing iterations done by the worker before sleeping.
`name' is a string identifier for worker threads. It corresponds to `name' is a string identifier for worker threads. It corresponds to
the string returned by `bordeaux-threads:thread-name'." the string returned by `bordeaux-threads:thread-name'."
(check-type worker-count (integer 1 #.most-positive-fixnum)) (check-type worker-count (integer 1 #.most-positive-fixnum))
Expand All @@ -153,21 +183,22 @@ the string returned by `bordeaux-threads:thread-name'."
:name name)) :name name))
(workers (make-array worker-count)) (workers (make-array worker-count))
(kernel (make-kernel-instance (kernel (make-kernel-instance
:scheduler (make-scheduler) :scheduler (make-scheduler workers spin-count)
:workers workers :workers workers
:workers-lock (make-lock) :workers-lock (make-lock)
:worker-info worker-info :worker-info worker-info
:optimizer-data (make-optimizer-data *optimizer*)))) :optimizer-data (make-optimizer-data *optimizer*))))
(with-kernel-slots (workers worker-info) kernel (with-kernel-slots (workers worker-info) kernel
(with-worker-info-slots (bindings) worker-info (with-worker-info-slots (bindings) worker-info
(push (cons '*kernel* kernel) bindings) (push (cons '*kernel* kernel) bindings)
(let1 guards () (let1 index 0
(map-into workers (map-into workers
(lambda () (lambda ()
(multiple-value-bind (worker guard) (make-worker kernel) (make-worker kernel
(push guard guards) (prog1 index (incf index))
worker))) (make-spin-queue))))
(mapc #'funcall guards)))) (dosequence (worker workers)
(handshake/to-worker worker)))))
kernel)) kernel))


(defun check-kernel () (defun check-kernel ()
Expand Down Expand Up @@ -294,8 +325,8 @@ return value is the number of tasks that would have been killed if
(when *kernel* (when *kernel*
(when (null task-category) (when (null task-category)
(error "task category cannot be NIL in KILL-TASKS")) (error "task category cannot be NIL in KILL-TASKS"))
(with-kernel-slots (workers scheduler) *kernel* (with-kernel-slots (workers workers-lock) *kernel*
(with-locked-scheduler scheduler (with-lock-held (workers-lock)
(let1 victims (map 'vector (let1 victims (map 'vector
#'thread #'thread
(remove-if-not (lambda (worker) (remove-if-not (lambda (worker)
Expand All @@ -306,79 +337,50 @@ return value is the number of tasks that would have been killed if
(map nil #'destroy-thread victims)) (map nil #'destroy-thread victims))
(length victims)))))) (length victims))))))


(defun kernel-idle-p/no-lock (kernel)
(with-kernel-slots (scheduler workers) kernel
(and (scheduler-empty-p/no-lock scheduler)
(notany #'running-category workers))))

(defun kernel-idle-p (kernel)
(with-kernel-slots (scheduler) kernel
(with-locked-scheduler scheduler
(kernel-idle-p/no-lock kernel))))

(defun wait-for-tasks (channel kernel)
(loop
(when (kernel-idle-p kernel)
(return))
(let1 *task-priority* :low
(submit-task channel (lambda ())))
(receive-result channel)))

(defmacro/once with-idle-kernel (&once channel &once kernel &body body)
(with-gensyms (retry)
`(tagbody ,retry
(wait-for-tasks ,channel ,kernel)
(with-locked-scheduler (scheduler ,kernel)
(unless (kernel-idle-p/no-lock ,kernel)
(go ,retry))
,@body))))

(defun shutdown (channel kernel) (defun shutdown (channel kernel)
(with-kernel-slots (scheduler) kernel (let1 *task-priority* :low
(with-idle-kernel channel kernel (submit-task channel (lambda ())))
(distribute-tasks/no-lock (receive-result channel)
scheduler (make-array (%kernel-worker-count kernel) (with-kernel-slots (scheduler workers) kernel
:initial-element nil))))) (repeat (length workers)
(schedule-task scheduler nil :low))
(dosequence (worker workers)
(assert (eq 'exit (pop-queue (from-worker worker)))))))


(defun end-kernel (&key wait) (defun end-kernel (&key wait)
"Sets `*kernel*' to nil and ends all workers gracefully. "Sets `*kernel*' to nil and ends all workers gracefully.
But hang on -- are you certain you wish to do this? `end-kernel' is an `end-kernel' should not be used as a substitute for properly waiting
expensive operation involving heavy locking to detect a finished on tasks with `receive-result' or otherwise.
state. Creating and destroying threads is also expensive. A kernel is
meant to be your trusted friend for the lifetime of the Lisp process.
Having more than one kernel is fine; simply use `let' to bind a kernel
instance to `*kernel*' when you need it. Use `kill-tasks' to terminate
deadlocked or infinite looping tasks.
If `wait' is nil (the default) then `end-kernel' returns immediately. If `wait' is nil (the default) then `end-kernel' returns immediately.
Current tasks are waited upon by a separate shutdown manager thread. Workers are waited upon by a separate shutdown manager thread.
If `wait' is non-nil then `end-kernel' blocks until all tasks are If `wait' is non-nil then `end-kernel' blocks until all workers are
complete. No shutdown manager thread is created. If you are merely finished. No shutdown manager thread is created.
waiting on tasks then you almost certainly want to use
`receive-result' instead. However there are rare cases where waiting
on a temporary kernel is warranted, for example when benchmarking with
a variety of kernels.
A list of the implementation-defined worker thread objects is A list of the implementation-defined worker thread objects is
returned. If `wait' is nil then the shutdown manager thread is also returned. If `wait' is nil then the shutdown manager thread is also
returned as the first element in the list." returned as the first element in the list.
Note that creating and destroying kernels is relatively expensive. A
kernel typically exists for lifetime of the Lisp process. Having more
than one kernel is fine -- simply use `let' to bind a kernel instance
to `*kernel*' when you need it. Use `kill-tasks' to terminate
deadlocked or infinite looping tasks."
(when *kernel* (when *kernel*
(let ((kernel *kernel*) (let ((kernel *kernel*)
(channel (make-channel))) (channel (make-channel)))
(setf *kernel* nil) (setf *kernel* nil)
(labels ((call-shutdown () (with-kernel-slots (workers) kernel
(shutdown channel kernel)) (let1 threads (map 'list #'thread workers)
(spawn-shutdown ()
(make-thread #'call-shutdown
:name "lparallel kernel shutdown manager")))
(let1 threads (map 'list #'thread (workers kernel))
(cond (wait (cond (wait
(call-shutdown) (shutdown channel kernel)
threads) threads)
(t (t
(cons (spawn-shutdown) threads)))))))) (cons (with-thread (:name "lparallel kernel shutdown manager")
(shutdown channel kernel))
threads))))))))


;;; deprecated ;;; deprecated
#-abcl #-abcl
Expand Down

0 comments on commit 10bd989

Please sign in to comment.