Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

a bunch of stuff

  • Loading branch information...
commit ccbac8fe00d6c102f4e002967fbd9b60af683877 1 parent cae0e1d
@jpalmucci authored
Showing with 48 additions and 11 deletions.
  1. +48 −11 future.lisp
View
59 future.lisp
@@ -24,6 +24,9 @@ from the table" )
"List of functions that are executed in a newly spawned child before
anything else is done")
+(defvar *before-spawn-hooks* nil
+ "List of functions that are executed in the parent before forking
+ the child")
(defun fork ()
"We need to handle fork differently in different lisps or else they get really confused"
@@ -42,7 +45,7 @@ from the table" )
(code :reader code
:initform nil
:documentation "The exit code for the subprocess. Null if process has not been reaped")
- (slave :initform :slave
+ (slave :initarg :slave
:documentation "The number (1 to *total-slaves*) of the
slave this future is currently running on. Zero if on the
controller.")))
@@ -93,6 +96,7 @@ from the table" )
*futures-awaiting-status*)
;; reap them so they don't confuse us later on
(loop while (> (nix:waitpid 0) 0))
+ (setq *free-slaves* (loop for i from 1 to *total-slaves* collect i))
(clrhash *futures-awaiting-status*))
(defun execute-future (fn)
@@ -102,12 +106,10 @@ from the table" )
(return-from execute-future
(make-instance 'future :pid nil :result (funcall fn)))))
(wait-for-slave)
+ (mapc #'funcall *before-spawn-hooks*)
(let ((this-slave (pop *free-slaves*))
(pid (fork)))
(cond ((eql pid 0)
- ;; we are the child - evaluate the expression and write it to disk
- (mapc #'funcall *spawn-child-hooks*)
-
(let* ((in (make-string-input-stream ""))
(out (make-string-output-stream))
(tw (make-two-way-stream in out))
@@ -125,10 +127,13 @@ from the table" )
(*is-slave* t))
(handler-case
- (let ((result (funcall fn)))
+ (let ((result (progn
+ ;; we are the child - evaluate the expression and write it to disk
+ (mapc #'funcall *spawn-child-hooks*)
+ (funcall fn))))
(cl-store:store (list (get-output-stream-string out) result)
output-pathname)
- (nix:exit 0)
+ (nix:exit 0)
(close tw) (close in) (close out)
(nix:exit 0))
@@ -163,8 +168,40 @@ evaluated values. Blocks if a future is still running."
(t
expr)))
-(defun future-mapcar (fn list &key (chunk-size 1))
- (let ((futures (mapcar #'(lambda (x)
- (future (funcall fn x)))
- list)))
- (mapcar #'get-future-value futures)))
+(defun future-mapcar (fn list)
+ (cond ((< (length list) (* *total-slaves* 3.0))
+ (get-future-value (mapcar #'(lambda (x) (future (funcall fn x))) list)))
+ (t
+ ;; list is big enough that we can split it into big chunks to minimize # of forks required
+ (let* ((chunk-size (/ (length list) (* *total-slaves* 3.0)))
+ (cur-chunk nil)
+ (chunks nil))
+ ;; split the input list into chunks of size "chunk-size"
+ ;; (or as close as possiblt)
+ ;; note that because we are pushing, everything is reversed
+ (loop for e in list
+ with cur-count = 0.0
+ do
+ (cond ((< cur-count chunk-size)
+ (push e cur-chunk)
+ (incf cur-count)))
+ (cond ((>= cur-count chunk-size)
+ (jp:pvalues cur-count chunk-size)
+ (push cur-chunk chunks)
+ (setq cur-chunk nil
+ cur-count (- cur-count chunk-size)))))
+ (cond (cur-chunk
+ (push cur-chunk chunks)))
+ (jp:pvalues chunk-size chunks)
+
+ ;;evaluate the chunks in child processes
+ (let ((results (get-future-value
+ (mapcar #'(lambda (sublist)
+ (future (mapcar fn sublist)))
+ chunks)))
+ (sorted-results nil))
+ ;; reverse things again so they come back in the correct order
+ (loop for result in results do
+ (loop for sublist-result in result do
+ (push sublist-result sorted-results)))
+ sorted-results)))))
Please sign in to comment.
Something went wrong with that request. Please try again.