Permalink
Browse files

support for *current-slave*

  • Loading branch information...
1 parent 0c5be10 commit cae0e1dcc0a9789c64e108dac3e240416768fc4f @jpalmucci committed Aug 11, 2009
Showing with 28 additions and 10 deletions.
  1. +1 −1 cl-future.asd
  2. +27 −9 future.lisp
View
@@ -3,7 +3,7 @@
(defpackage future
(:use :cl)
(:export #:terminate-children #:future #:get-future-value #:future-mapcar
- #:*total-slaves*))
+ #:*total-slaves* #:*current-slave* #:*spawn-child-hooks*))
(defvar *future-path* *load-truename*)
View
@@ -6,13 +6,23 @@
(defvar *total-slaves* 3
"The maximum number of slaves to run at any one time.")
+(defvar *free-slaves* (loop for i from 1 to *total-slaves* collect i))
+
+(defvar *current-slave* 0
+ "if we are running on a slave, the slave number (from 1 to
+ *total-slaves*), 0 for the controller. This is useful for making
+ sure that processes don't collide on shared resources (like ports,
+ for example). Only one process will be running using a given
+ *current-slave* number at any time.")
+
(defvar *futures-awaiting-status* (make-hash-table :test #'eql)
- "This table contains those futures that we haven't noticed have terminated yet.
-When wait comes around and reaps them, we remove them from the table"
-)
+ "This table contains those futures that we haven't noticed have
+terminated yet. When wait comes around and reaps them, we remove them
+from the table" )
(defvar *spawn-child-hooks* nil
- "List of functions that are executed in a newly spawned child before anything else is done")
+ "List of functions that are executed in a newly spawned child before
+ anything else is done")
(defun fork ()
@@ -31,7 +41,11 @@ When wait comes around and reaps them, we remove them from the table"
:documentation "The result returned by the child process")
(code :reader code
:initform nil
- :documentation "The exit code for the subprocess. Null if process has not been reaped")))
+ :documentation "The exit code for the subprocess. Null if process has not been reaped")
+ (slave :initform :slave
+ :documentation "The number (1 to *total-slaves*) of the
+ slave this future is currently running on. Zero if on the
+ controller.")))
(defmethod initialize-instance :after ((f future) &key &allow-other-keys)
(with-slots (pid) f
@@ -40,15 +54,16 @@ When wait comes around and reaps them, we remove them from the table"
f)))))
(defmethod read-result ((f future) status-code)
- (with-slots (pid result code) f
+ (with-slots (pid result code slave) f
(setf code status-code)
(let* ((path (format nil "/tmp/pid.~d" pid)))
(destructuring-bind (output stored-result)
(cl-store:restore path)
(princ output)
(setf result stored-result)
(delete-file path)
- (remhash pid *futures-awaiting-status*)))
+ (remhash pid *futures-awaiting-status*)
+ (push slave *free-slaves*)))
(cond ((not (eql code 0))
(error "Future terminated with error ~a" result)))))
@@ -70,6 +85,7 @@ When wait comes around and reaps them, we remove them from the table"
(defun terminate-children ()
"Kill all currently running children."
(maphash #'(lambda (key value)
+ (declare (ignore key))
(with-slots (pid result code) value
(ignore-errors (nix:kill (pid value) 9))
(setf result (make-condition 'simple-error :format-control "Future killed by terminate-children")
@@ -86,7 +102,8 @@ When wait comes around and reaps them, we remove them from the table"
(return-from execute-future
(make-instance 'future :pid nil :result (funcall fn)))))
(wait-for-slave)
- (let ((pid (fork)))
+ (let ((this-slave (pop *free-slaves*))
+ (pid (fork)))
(cond ((eql pid 0)
;; we are the child - evaluate the expression and write it to disk
(mapc #'funcall *spawn-child-hooks*)
@@ -101,6 +118,7 @@ When wait comes around and reaps them, we remove them from the table"
(*terminal-io* tw)
(*debug-io* tw)
(*query-io* tw)
+ (*current-slave* this-slave)
)
(let* ((output-pathname (format nil "/tmp/pid.~d" (nix:getpid)))
@@ -121,7 +139,7 @@ When wait comes around and reaps them, we remove them from the table"
(nix:exit 1))))))
(t
(setf (gethash pid *futures-awaiting-status*)
- (make-instance 'future :pid pid))))))
+ (make-instance 'future :pid pid :slave this-slave))))))
(defmacro future (&body body)
"Evaluate expr in parallel using a forked child process. Returns a

0 comments on commit cae0e1d

Please sign in to comment.