Browse files

Done basic future creation and yield interface.

  • Loading branch information...
1 parent f2f2085 commit 4f9c1e69771cce3820dbd59ac90d94cf02a32ab2 @vsedach committed Nov 28, 2010
Showing with 112 additions and 28 deletions.
  1. +3 −2 LICENSE
  2. +3 −3 eager-future2.asd
  3. +8 −19 future.lisp
  4. +72 −0 make-future.lisp
  5. +18 −3 package.lisp
  6. +8 −1 scheduler.lisp
@@ -5,8 +5,9 @@ unless otherwise noted.
This library is free software; you can redistribute it and/or modify
it under the terms of the Lisp Lesser General Public License version
-3, which consists of the GNU Lesser General Public License version 3
-as published by the Free Software Foundation and the Franz preamble.
+3, which consists of the GNU Lesser General Public License, either
+version 3 or (at your option) any later version, as published by the
+Free Software Foundation, and the Franz preamble.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -1,10 +1,10 @@
-(asdf:defsystem :eager-future
+(asdf:defsystem :eager-future2
:name "eager-future2"
:author "Vladimir Sedach <>"
:license "LLGPLv3"
:serial t
:components ((:file "package")
(:file "scheduler")
- (:file "future")
- )
+ (:file "make-future")
+ (:file "future"))
:depends-on (:bordeaux-threads :trivial-garbage))
@@ -6,15 +6,16 @@
(lock :reader lock :initform (make-lock "future lock"))
(computing-thread :accessor computing-thread :initform nil)
(wait-list :accessor wait-list :initform ())
- (interrupt-tag :reader interrupt-tag :initarg :interrupt-tag)))
+ (future-id :reader future-id :initarg :future-id)))
+(defun make-future (task future-id)
+ (make-instance 'future :task task :future-id future-id))
(defun ready-to-yield? (future)
"Returns t if the future values have been computed, nil otherwise."
(with-lock-held ((lock future))
(slot-boundp future 'values)))
-(defvar %computing-future nil) ;; protocol for random pool threads
(defun force (future &rest values)
"If the future has not yet yielded a value, installs the given
values as the yield-values of the future (stopping any ongoing
@@ -27,12 +28,8 @@ computation of the future)."
(with-lock-held ((car x))
(condition-notify (cdr x))))
(with-slots (computing-thread) future
- (when (and (not (eq computing-thread (current-thread))) (thread-alive-p computing-thread))
- (ignore-errors ;; should probably log them or something
- (interrupt-thread computing-thread
- (lambda ()
- (when (eq %computing-future (interrupt-tag future))
- (throw 'task-done nil))))))
+ (unless (eq computing-thread (current-thread))
+ (abort-scheduled-future-task computing-thread (future-id future)))
(setf computing-thread nil
(wait-list future) nil
(task future) nil)))
@@ -51,7 +48,7 @@ computation of the future)."
(setf any-computing? t))
(push (cons select-lock notifier) (wait-list future))))
(unless any-computing?
- (schedule-future (first futures))))
+ (schedule-future (first futures) :speculative)))
(loop (dolist (future futures)
(with-lock-held ((lock future))
(when (slot-boundp future 'values)
@@ -80,12 +77,4 @@ In case of an eager future, blocks until the value is available."
(go compute)))))
select (select future) (go done)
compute (multiple-value-call #'force future (funcall (task future)))
- done (values-list (%values future))))
-;;; implementations
-;;; ask for a delayed future - get a future right back
-;;; ask for an eager future - schedule a future to run immediately and give you the future object
-;;; ask for a speculative future - put future on work queue and return the future
+ done (return-from yield (values-list (%values future)))))
@@ -0,0 +1,72 @@
+(in-package #:eager-future2)
+(defvar *default-future-type* :speculative
+ "One of :eager, :speculative (default) or :lazy.
+If eager, any newly created futures start their computation immediately.
+If speculative, newly created futures are computed when thread pool threads are available, in FIFO future creation order.
+If lazy, newly created futures are not computed until asked to yield their values.")
+(defvar *computing-future* nil
+ "Part of scheduling protocol for thread-pooled futures.")
+(defun abort-scheduled-future-task (thread future-id)
+ (when (thread-alive-p thread)
+ (ignore-errors ;; should probably log them or something
+ (interrupt-thread thread (lambda ()
+ (when (eql *computing-future* future-id)
+ (throw 'task-done nil)))))))
+(defun make-scheduler-task (future-ptr)
+ (lambda ()
+ (catch 'task-done
+ (flet ((get-future () (or (weak-pointer-value future-ptr) (throw 'task-done nil))))
+ (let ((*computing-future* (future-id (get-future))))
+ (with-lock-held ((lock (get-future)))
+ (if (slot-boundp (get-future) 'values)
+ (throw 'task-done nil)
+ (setf (computing-thread (get-future)) (current-thread))))
+ (finalize (get-future) (let ((thread (current-thread))
+ (future-id *computing-future*))
+ (lambda () (abort-scheduled-future-task thread future-id))))
+ (let ((values (multiple-value-list (funcall (task (get-future))))))
+ (apply #'force (get-future) values)))))))
+(defun schedule-future (future future-type)
+ (let ((scheduler-task (make-scheduler-task (make-weak-pointer future))))
+ (ccase future-type
+ (:eager (schedule-immediate scheduler-task))
+ (:speculative (schedule-last scheduler-task)))))
+(defun pcall (thunk &optional (future-type *default-future-type*))
+ "Given a function of no arguments, returns an object (called a
+future) that can later be used to retrieve the values computed by the
+future-type (by default the value of *default-future-type*) can either
+be :eager, :speculative, or :lazy. See the documentation of
+*default-future-type* for an explanation of the different future
+The function is called in an unspecified dynamic environment."
+ (let ((future (make-future thunk (gensym "FUTURE-ID"))))
+ (unless (eq future-type :lazy)
+ (schedule-future future future-type))
+ future))
+(defmacro pexec (&body body)
+ "A shorthand for (pcall (lambda () ...))."
+ `(pcall (lambda () ,@body)))
+(defmacro plet ((&rest bindings) &body body)
+ "Like LET, but all bindings are evaluated asynchronously."
+ (let ((bindings (mapcar (lambda (x) (if (consp x) x (list x nil)))
+ bindings)))
+ (let ((syms (mapcar (lambda (x) (gensym (string (car x))))
+ bindings)))
+ `(let ,(loop for (nil exp) in bindings
+ for sym in syms
+ collect `(,sym (pexec ,exp)))
+ (symbol-macrolet ,(loop for (var nil) in bindings
+ for sym in syms
+ collect `(,var (yield ,sym)))
+ ,@body)))))
@@ -1,5 +1,20 @@
(cl:defpackage #:eager-future2
(:use #:cl #:bordeaux-threads #:trivial-garbage)
- (:export #:yield #:ready-to-yield? #:select #:force
- #:advise-thread-pool-size
- ))
+ (:export
+ ;; making futures
+ #:*default-future-type*
+ #:pcall
+ #:pexec
+ #:plet
+ ;; dealing with futures
+ #:ready-to-yield?
+ #:yield
+ #:select
+ #:force
+ ;; thread pool management
+ #:thread-pool-size
+ #:advise-thread-pool-size
+ ))
@@ -26,13 +26,20 @@
:name "Eager Future2 Worker")
(with-lock-held (*thread-counter-lock*) (incf *total-threads*)))
+(defun thread-pool-size ()
+ (with-lock-held (*thread-counter-lock*)
+ *total-threads*))
(defun advise-thread-pool-size (new-size)
(with-lock-held (*thread-counter-lock*)
(if (< *total-threads* new-size)
(loop repeat (- new-size *total-threads*) do (make-pool-thread))
(with-lock-held (*thread-pool-lock*)
(loop repeat (- *total-threads* new-size) do
- (push (lambda () (throw 'die)) *waiting-tasks*))))))
+ (push (lambda () (throw 'die nil)) *waiting-tasks*))))))
+(eval-when (:load-toplevel)
+ (advise-thread-pool-size 10))
(defun schedule-last (task)
(with-lock-held (*thread-pool-lock*)

0 comments on commit 4f9c1e6

Please sign in to comment.