Skip to content

Commit

Permalink
Parallel streams... not very convincing
Browse files Browse the repository at this point in the history
  • Loading branch information
pkhuong committed Dec 20, 2011
1 parent cb7bc6c commit 29c8ae2
Showing 1 changed file with 40 additions and 31 deletions.
71 changes: 40 additions & 31 deletions parallel-primitives.lisp
@@ -1,14 +1,14 @@
(defpackage "PARALLEL"
(:use)
(:export "PROMISE" "PROMISE-VALUE" "LET"
(:export "PROMISE" "PROMISE-VALUE" "PROMISE-VALUE*" "LET"
"FUTURE" "FUTURE-VALUE" "FUTURE-VALUE*" "BIND"
"STREAM" "HEAD" "TAIL" "TAIL!" "TAIL-PREFETCH" "UNFOLD" "SCAN" "SCAN*" "FOREACH" "FOLD"
"DOTIMES" "MAP" "REDUCE"
"MAP-GROUP-REDUCE"))

(defpackage "PARALLEL-IMPL"
(:use "CL" "SB-EXT")
(:import-from "PARALLEL" "PROMISE" "PROMISE-VALUE"
(:import-from "PARALLEL" "PROMISE" "PROMISE-VALUE" "PROMISE-VALUE*"
"FUTURE" "FUTURE-VALUE"))

(in-package "PARALLEL-IMPL")
Expand Down Expand Up @@ -52,6 +52,19 @@
(%promise-wait promise :done)
(values-list (promise-%values promise)))

(defun parallel:promise-value* (promise)
(unless (promise-p promise)
(return-from parallel:promise-value* promise))
(multiple-value-call (lambda (&optional (value nil value-p) &rest args)
(cond ((promise-p value)
(parallel:promise-value* value))
(value-p
(multiple-value-call #'values
value (values-list args)))
(t
(values))))
(parallel:promise-value promise)))

(defmacro parallel:let ((&rest bindings) &body body)
(let* ((parallelp t)
(names '())
Expand Down Expand Up @@ -151,7 +164,7 @@
(:constructor %make-stream))
(index 0 :type (mod #.+chunk-size+))
(chunk nil :type simple-vector :read-only t)
(%next nil :type (or null function parallel:stream parallel:future)))
(%next nil :type (or null function parallel:stream parallel:promise)))

(defun make-stream (index chunk %next)
(and (< index (length chunk))
Expand All @@ -177,7 +190,7 @@
:chunk chunk
:%next (stream-%next stream))
(setf (stream-%next stream)
(future-value (stream-next stream)))))))
(promise-value* (stream-next stream)))))))

(defun parallel:tail! (stream)
(and (stream-p stream)
Expand All @@ -187,7 +200,7 @@
(prog1 stream
(setf (stream-index stream) index))
(setf (stream-%next stream)
(parallel:future-value* (stream-next stream)))))))
(promise-value* (stream-next stream)))))))

(defun parallel:tail-prefetch (stream)
(when (and (stream-p stream)
Expand Down Expand Up @@ -216,8 +229,7 @@
(car cache)
(setf (values (car cache) (cdr cache))
(values
(parallel:future
#()
(parallel:promise
(lambda ()
(multiple-value-bind (chunk live)
(make-chunk)
Expand Down Expand Up @@ -256,18 +268,20 @@
(car cache)
(setf (values (car cache) (cdr cache))
(values
(parallel:bind ((next (stream-next stream)))
(let ((next (promise-value* (stream-next stream))))
(and next
(multiple-value-bind (chunk live)
(make-chunk (stream-index stream) (stream-chunk next))
(setf stream next)
(and (plusp (length chunk))
(make-stream
0 chunk
(and live (stream-next next)
(let ((cache (list nil)))
(lambda ()
(next cache)))))))))
(promise
(lambda ()
(multiple-value-bind (chunk live)
(make-chunk (stream-index stream) (stream-chunk next))
(setf stream next)
(and (plusp (length chunk))
(make-stream
0 chunk
(and live (stream-next next)
(let ((cache (list nil)))
(lambda ()
(next cache)))))))))))
t)))))
(make-stream 0 (make-chunk (stream-index stream)
(stream-chunk stream))
Expand All @@ -283,8 +297,7 @@
(values seed seed finished)))
seed))

(defun parallel:fold (stream function seed &key (wait t))
(declare (type parallel:stream stream))
(defun parallel:fold (stream function seed)
(let ((function (if (functionp function)
function
(fdefinition function))))
Expand All @@ -299,29 +312,25 @@
(return nil)))
finally (return t)))
(iter (stream)
(declare (type parallel:stream stream))
(declare (type (or null parallel:stream) stream))
(unless stream
(return-from iter stream))
(let ((continue (consume-chunk (stream-index stream)
(stream-chunk stream))))
(if (not continue)
seed
(parallel:bind ((next (stream-next stream)))
(if next
(iter next)
seed))))))
(let ((future (iter stream)))
(if wait
(parallel:future-value* future)
future)))))
(iter (promise-value* (stream-next stream)))))))
(iter stream))))

(defun parallel:foreach (stream function &key (for-effect nil) (wait t))
(defun parallel:foreach (stream function &key (for-effect nil))
(declare (inline parallel:scan parallel:fold))
(let ((function (if (functionp function)
function
(fdefinition function))))
(cond (for-effect
(parallel:fold stream (lambda (value seed) seed
(funcall function value))
nil :wait wait)
nil)
nil)
(t
(parallel:scan stream (lambda (value seed) seed
Expand Down

0 comments on commit 29c8ae2

Please sign in to comment.