In the beginning ... there was a function to test if a value is a prime number or not:

In [1]:
(def is_prime?
  (fn [v]
    (cond
      (= v 1) false
      (= v 2) true
      (= v 3) true
      (= (mod v 2) 0) false
      :else (loop [d 3]
              (cond (= (mod v d) 0) false
                    (> (* d d) v) true
                    :else (recur (+ d 2))
                    )
              )
      )
    )
  )

#'user/is_prime?

In [2]:
(is_prime? 11)

true

In [3]:
(is_prime? 44)

false

## Count primes in an interval


In [4]:
(filter is_prime? (range 1 200))

(2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 101 103 107 109 113 127 131 137 139 149 151 157 163 167 173 179 181 191 193 197 199)

In [5]:
(count (filter is_prime? (range 1 100)))

25

In [6]:
(time (count (filter is_prime? (range 1 100))))

"Elapsed time: 0.345498 msecs"


25

In [6]:
(time (count (filter is_prime? (range 1 20000000))))

"Elapsed time: 81392.513735 msecs"


1270607

The C version of this program needs approx. 7 seconds to count the number of primes between 1 and 20 million. Be careful when uncommenting the function above, because execution time can be 100 seconds and more.

Let's make it a function:

In [7]:
(defn count_primes [a b]
     (count (filter is_prime? (range a b))))

#'user/count_primes

In [8]:
(count_primes 1 100)

25

## Approaching pmap
Partitioning the work of counting primes in a large interval can be accelerated on a multi core CPU.

In [9]:
(defn partition_range [a b p]
    (map
        (fn [i] [(+ a (* i p)) (+ a (* (+ i 1) p))]) ; Create interval [a+i*p a+(i+1)*p]
        (range 0 (+ (quot (- b a) p) 1))
    )
)

#'user/partition_range

In [10]:
(partition_range 1 100 10)

([1 11] [11 21] [21 31] [31 41] [41 51] [51 61] [61 71] [71 81] [81 91] [91 101])

In [11]:
(defn count_primes_in_interval [i] (apply count_primes i))

#'user/count_primes_in_interval

In [12]:
(count_primes_in_interval [1 100])

25

In [14]:
(def n 20000000)
(def p 1000)

#'user/p

In [15]:
(time (reduce + (map count_primes_in_interval (partition_range 1 n p))))

"Elapsed time: 81145.136777 msecs"


1270607

In [16]:
(time (count (filter is_prime? (range 1 n))))

"Elapsed time: 82002.433942 msecs"


1270607

In [17]:
(time (reduce + (pmap count_primes_in_interval (partition_range 1 n p))))

"Elapsed time: 23779.277542 msecs"


1270607

## What about futures?

In [18]:
(def n_primes (future (count_primes 1 100)))

#'user/n_primes

In [19]:
@n_primes ; or (deref n_primes)

25

In [20]:
(time (reduce
       (fn [s f] (+ s @f))
       0
       (map 
        (fn [i] (future (count_primes_in_interval i)))
        (partition_range 1 n p))))

"Elapsed time: 23776.333454 msecs"


1270607

## Change focus: agents do the work

In [14]:
(defn work_package [a i] (+ a (apply count_primes i)))

#'user/work_package

In [15]:
(defn dispatch [worker intervals]
    (let [n_worker (count worker)]
        (loop [w 0
               intervals intervals]
          (if (seq intervals) ; common idiom to test for non-empty sequences
            (do ; required to execute mltiple expressions inside if case
              (send (worker (mod w n_worker)) work_package (first intervals))
              (recur (inc w) (rest intervals))
              )
          )
        )
    )
)

#'user/dispatch

In [16]:
(defn collect [worker]
  (doseq [w worker] (await w))
  (reduce + 0 (map deref worker))
  )

#'user/collect

In [17]:
(defn count_primes_with_agents [n p_size n_worker]
     (let [intervals (partition_range 1 n p_size)
           worker (vec (map (fn [_] (agent 0)) (range 0 n_worker)))]
       (dorun (dispatch worker intervals))
       (collect worker)        
       )
)

#'user/count_primes_with_agents

In [18]:
(time (count_primes_with_agents n p 8))

"Elapsed time: 8569.828809 msecs"


664579