-
Notifications
You must be signed in to change notification settings - Fork 5
/
fork_middle.clj
55 lines (50 loc) · 1.91 KB
/
fork_middle.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
(ns parallel.fork-middle
(:require [clojure.core.reducers :as r])
(:import [java.util.concurrent Callable ForkJoinPool ForkJoinTask]
[java.util ArrayList List]))
(set! *warn-on-reflection* true)
(deftype ForkMiddle [^objects a
^int low ^int high ^int radius
^Callable mapf ^Callable f]
Callable
(call [this]
(let [size (- (- high low) (* 2 radius))]
(if (<= size radius)
(f mapf low high (inc (quot (- high low) 2)) a)
(#'r/fjinvoke
#(let [middle (ForkMiddle. a (+ low radius) (- high radius) radius mapf f)
t (.fork (ForkJoinTask/adapt middle))]
(f mapf low high radius a)
(.join ^ForkJoinTask t)))))))
(defn submit [mapf f radius ^objects a]
(let [n (alength a)
^ForkJoinPool pool @r/pool]
(.join (.submit pool (ForkMiddle. a 0 (dec n) radius mapf f)))))
;; Different strategy, similar results.
; (defn fork-tasks
; "Fork a collection of tasks by recusively
; splitting into halves."
; [^List tasks]
; (let [cnt (.size tasks)]
; (cond
; (= 1 cnt) (.call ^Callable (.get tasks 0))
; (> cnt 1)
; (let [mid (quot cnt 2)]
; (#'r/fjinvoke
; (fn []
; (let [task (#'r/fjtask #(fork-tasks (.subList tasks mid cnt)))]
; (#'r/fjfork task)
; (fork-tasks (.subList tasks 0 mid))
; (#'r/fjjoin task))))))))
; (defn submit
; "A forking strategy that chops off chunks
; at the edges and fork the rest in the middle."
; [mapf f ^long radius ^objects a]
; (let [tasks (ArrayList.)]
; (loop [low 0 high (dec (alength a))]
; (if (> (- (- high low) (* 2 radius)) radius)
; (do
; (.add tasks #(f mapf low high radius a))
; (recur (+ low radius) (- high radius)))
; (.add tasks #(f mapf low high (inc (quot (- high low) 2)) a))))
; (fork-tasks tasks)))