Parallel fetching #1

Merged
merged 4 commits into from Sep 1, 2013

Conversation

Projects
None yet
3 participants
Contributor

orivej commented Sep 1, 2013

Reposting my comment at the blog:

I explored an implementation with lparallel ptrees.

PMAP-SOURCES builds a dependency ptree where each source, except for the first ones, depends on the previous source with the same hostname; then it adds a node which depends on all of the sources, and makes lparallel carry out its computation. CALL-WITH-SKIPPING is adjusted to establish its restart in worker treads and to guard its debug output with a mutex. UPDATE-WHAT-YOU-CAN takes additional optional argument to turn parallel run on.

The downside of ptrees is that they are, as it seems, uninterruptible.

Owner

quicklisp commented Sep 1, 2013

This is fantastic, thanks for doing it!

quicklisp added a commit that referenced this pull request Sep 1, 2013

@quicklisp quicklisp merged commit d2a6845 into quicklisp:master Sep 1, 2013

lmj commented Sep 6, 2013

Hello, it's an honor to see this used in quicklisp.

@orivej, it's not clear what you mean by uninterruptable. If you expected that interrupting (C-c-c) call-ptree would result in its child tasks being killed, you can easily achieve that using something like with-kill-on-abort. (The lparallel blog has other tips in the form of blog posts, a system which admittedly is not very organized and not obvious to users.)

None of the lparallel API defaults to such child-killing behavior (ouch, bad turn of phrase). lparallel can't know what the user wants, of course, so it defaults to the most general case. I originally wrote the ptree algorithm to parallelize Makefile-like tasks, a case where you probably don't want to kill child tasks. For example just because one C file failed to compile doesn't mean that compilation of unrelated files should be aborted.

[Update] Note this fetching could also be done by partitioning the data according to hostname then using pmap,

(pmap nil (lambda (part) (map nil #'fetch part)) :parts (length partition) partition)

Ptrees have an advantage for long-running stuff like fetching because a computation can be resumed after being aborted (either by error or user-interrupt), if you ever have need for that functionality.

Contributor

orivej commented Sep 7, 2013

@lmj, I have not yet looked at the implementation to give an informed answer, yet I mean that after interrupting CALL-PTREE, new tasks continue to be spawned, and KILL-TASKS can not stop it.

For example, this runs to completion under SBCL after being interrupted with C-c C-c and calling (lparallel:kill-tasks :default):

(let ((tree (lparallel:make-ptree))
      (range (loop for i from 0 to 100 collect i)))
  (dolist (i range) (lparallel:ptree-fn 
                     i nil (lambda (&aux (i i)) (print i) (sleep 1)) tree))
  (lparallel:ptree-fn 'all range (constantly nil) tree)
  (lparallel:call-ptree 'all tree))

lmj commented Sep 7, 2013

@orivej Yes you're right, I hadn't considered that task production can exceed consumption in this case. This particular tree has many parallelizable tasks, presumably many more than the number of workers one would choose. The ptree algorithm finds all the parallelizable tasks and queues them up. After kill-tasks kills the currently-running tasks, there are still more to execute. It doesn't continue executing the whole tree, just the tasks that were parallelizable at the time of the interrupt.

Instead of with-kill-on-abort (killing tasks is brutish anyway), one could use

(defmacro with-cancel-on-abort (&body body)
  (alexandria:with-gensyms (canceledp)
    `(let ((,canceledp nil))
       (alexandria:unwind-protect-case ()
           (macrolet ((cancelable-lambda (lambda-list &body body)
                        (multiple-value-bind (body declares)
                            (alexandria:parse-body body :documentation t)
                          `(lambda ,lambda-list
                             ,@declares
                             (unless ,',canceledp
                               ,@body)))))
             ,@body)
         (:abort (setf ,canceledp t))))))

In pmap-sources put everything inside with-cancel-on-abort and replace lambda with cancelable-lambda. I'll need to think about the best way to cover this case in the lparallel API.

I also notice the need for pmaphash, which I hadn't really considered. E.g.,

(defun pmaphash (function hash-table)
  (let ((channel (lparallel:make-channel)))
    (maphash (lambda (key value)
               (lparallel:submit-task channel function key value))
             hash-table)
    (loop repeat (hash-table-size hash-table)
          do (lparallel:receive-result channel))))

(defun partition (key iterate &key (test 'eql))
  (let ((partition (make-hash-table :test test)))
    (funcall iterate (lambda (element)
                       (push element (gethash (funcall key element)
                                              partition))))
    partition))

(defun pmap-sources (fun &optional (parallel-key #'source-host))
  (with-cancel-on-abort
    (pmaphash (lambda (key sources)
                (declare (ignore key))
                (map nil (cancelable-lambda (source) (map-source fun source))
                     sources))
              (partition parallel-key #'map-sources :test 'equal))))

In general deciding how to cancel a computation gracefully may require human judgement, as in the above case where cancelable-lambda is used with non-parallel map.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment