# Parallelism

## Future

In [4]:
(ns joy.futures
  (:require (clojure [xml :as xml]))
  (:require (clojure [zip :as zip]))
  (:import  (java.util.regex Pattern)))

nil

In [5]:
; simple example
(time (let [x (future 
                  (do (Thread/sleep 3000) (+ 41 1)))]
          [@x @x]))

"Elapsed time: 3005.788292 msecs"


[42 42]

In [7]:
(defn feed->zipper [uri-str]
    (->> (xml/parse uri-str)
        zip/xml-zip))

#'joy.futures/feed->zipper

In [8]:
(defn normalize [feed]
    (if (= :feed (:tag (first feed)))
        feed
        (zip/down feed)))

#'joy.futures/normalize

In [9]:
(defn feed-children [uri-str]
    (->> uri-str
        feed->zipper
        normalize
        zip/children
        (filter (comp #{:item :entry} :tag))))

#'joy.futures/feed-children

In [10]:
(defn title [entry]
    (some->> entry
        :content
        (some #(when (= :title (:tag %)) %))
        :content
        first))

#'joy.futures/title

In [13]:
(defn count-text-task [extractor txt feed]
    (let [items (feed-children feed)
          re    (Pattern/compile (str "(?i)" txt))]
        (->> items
        (map extractor)
        (mapcat #(re-seq re %))
        count)))

#'joy.futures/count-text-task

In [14]:
(count-text-task 
 title
 "Elixir"
 "http://feeds.feedburner.com/ElixirLang")

44

In [15]:
(def feeds #{"http://feeds.feedburner.com/ElixirLang" 
             "http://blog.fogus.me/feed/"})

#'joy.futures/feeds

In [18]:
(let [results (for [feed feeds]
                  (future
                      (count-text-task title "Elixir" feed)))]
    (reduce + (map deref results)))

44

In [22]:
(defmacro as-futures [[a args] & body]
    (let [parts          (partition-by #{'=>} body)
          [acts _ [res]] (partition-by #{:as} (first parts))
           [_ _ task]     parts]
        `(let [~res (for [~a ~args] (future  ~@acts))]
             ~@task)))


;(as-futures [<arg-name> <all-args>]
;  <actions-using-args>
;  :as <results-name>
;  =>
;  <actions-using-results>)


#'joy.futures/as-futures

In [21]:
(defn occurrences [extractor tag & feeds]
    (as-futures [feed feeds]
                (count-text-task extractor tag feed)
                :as results
                =>
                (reduce + (map deref results))))

#'joy.futures/occurrences

In [23]:
(occurrences title "released"
  "http://blog.fogus.me/feed/"
  "http://feeds.feedburner.com/ElixirLang"
  "http://www.ruby-lang.org/en/feeds/news.rss")

31

## Promise

In [26]:
(def x (promise))
(def y (promise))
(def z (promise))
x

#promise[{:status :pending, :val nil} 0x411273c6]

In [29]:
(deliver x 5)
@x

3

In [32]:
; @y this is a blocking call until the y is deliver
(do (Thread/sleep 1000) (deliver y 3))
@y

3

In [37]:
(deliver x 6)
x; it'll not change value bc it's already delivered

#promise[{:status :ready, :val 3} 0x411273c6]

In [36]:
(reset! x 6)

Execution error (ClassCastException) at joy.futures/eval5682 (REPL:1).
clojure.core$promise$reify__8501 cannot be cast to clojure.lang.IAtom


class java.lang.ClassCastException: 

## Pmap, pcall 

In [40]:
(defn sleeper [s thing] (Thread/sleep (* 1000 s)) thing)
(sleeper 2 (prn "done"))

"done"


nil

In [42]:
(->> [1 2 3]
     (pmap (comp inc (partial sleeper 2)))
     doall
     time)

"Elapsed time: 1.135583 msecs"


(2 3 4)

In [43]:
(-> (pcalls
      #(sleeper 2 :first)
      #(sleeper 3 :second)
      #(keyword "3rd"))
    doall
    time)

"Elapsed time: 3007.429375 msecs"


(:first :second :3rd)

## reducer / fold

In [67]:
(require '[clojure.core.reducers :as r])
(def big-vec (vec (range (* 20000 1000))))

#'joy.futures/big-vec

In [68]:
(time (reduce + big-vec))

"Elapsed time: 1823.102 msecs"


199999990000000

In [69]:
(time (r/fold + big-vec))

"Elapsed time: 182.029834 msecs"


199999990000000

In [61]:
; the reduce is guaranateed to be run in order
; whereas r/fold is not
; or you can say that r/fold parallieize the work and reduce does not