Skip to content

Commit

Permalink
Implement restartable checkpointing by serializing steps
Browse files Browse the repository at this point in the history
  • Loading branch information
jeroenvandijk committed Jun 30, 2013
1 parent 7328a16 commit ebe3e7d
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 104 deletions.
34 changes: 19 additions & 15 deletions src/adgoji/cascalog/cli.clj
Original file line number Diff line number Diff line change
Expand Up @@ -117,33 +117,37 @@
[success? msg] (validate-options tap-opts)]
(when-not success?
(handle-result banner success? msg))
(callback (into {} (map (fn [[k v]]
[k (if-let [tap-fn (:tap-fn v)]
(tap-fn (:args v))
v)])
tap-opts)))))
(let [graph (graph/workflow-compile callback)]
(graph (into {} (map (fn [[k v]]
[k (if-let [tap-fn (:tap-fn v)]
(tap-fn (:args v))
v)])
tap-opts))))))

(defmethod run-mode :dot [{:keys [fn-like]}]
(graph/dot-compile fn-like))
(defmethod run-mode :dot [{:keys [callback]}]
(graph/dot-compile callback))

(defmethod run-mode :preview [{:keys [fn-like]}]
(defn preview-graph [graph]
(let [tmp-path (str "/tmp/job-" (java.util.UUID/randomUUID))
dot-path (str tmp-path ".dot")
png-path (str tmp-path ".png")
dot-str (with-out-str (graph/dot-compile fn-like))
dot-str (with-out-str (graph/dot-compile graph))
_ (spit dot-path dot-str)
{dot? :exit error :error} (shell/sh "dot" "-Tpng" dot-path "-o" png-path)]
(if dot?
{exit-status :exit error :err} (shell/sh "dot" "-Tpng" dot-path "-o" png-path)]
(if (zero? exit-status)
(do (println "opening" png-path "...")
(shell/sh "open" png-path)
(System/exit 0))
(do (println "Error: graphviz not installed?" error)
(System/exit 1)))))

(defmethod run-mode :preview [{:keys [callback]}]
(preview-graph callback))

;; TODO how can we make the printing of the workflow code pretty?
;; This doesn't work as expected http://clojuredocs.org/clojure_core/clojure.pprint
(defmethod run-mode :debug [{:keys [fn-like]}]
(println (graph/mk-workflow "/tmp/cascalog-checkpoint" fn-like)) )
(defmethod run-mode :debug [{:keys [callback]}]
(println (graph/mk-workflow "/tmp/cascalog-checkpoint" callback)) )

(defmethod run-mode :default [{:keys [opts banner]}]
(handle-result banner false (str "Error: run mode '" (:mode opts) "' not recognized")))
Expand All @@ -153,14 +157,14 @@

;; FIXME Graph meta contains data over output taps
(defn run-fn-cmd [job-fn cli-args graph-meta]
(let [{:keys [errors banner opts trailing-args] :as cli-validation} (validate-cli-args (derive-io-options job-fn)cli-args)]
(let [{:keys [errors banner opts trailing-args] :as cli-validation} (validate-cli-args (derive-io-options job-fn) cli-args)]
(run-mode {:callback job-fn
:graph-meta graph-meta
:opts opts :errors errors :banner banner
:trailing-args trailing-args})))

(defn run-graph-cmd [graph cli-args graph-meta]
(run-fn-cmd (graph/workflow-compile graph) cli-args graph-meta))
(run-fn-cmd graph cli-args graph-meta))

(defn run-graph-or-fn-cmd [graph-like output-mapping cli-args]
(if (fn? graph-like)
Expand Down
253 changes: 164 additions & 89 deletions src/adgoji/cascalog/graph.clj
Original file line number Diff line number Diff line change
@@ -1,57 +1,26 @@
(ns adgoji.cascalog.graph
(:require [plumbing.core :as gc]
;; from cascalog checkpoint
(:use [cascalog.api :only [with-job-conf]])
(:require [hadoop-util.core :as h]
[cascalog.conf :as conf]
[jackknife.core :as u]
[jackknife.seq :as seq])
(:import [java.util Collection]
[org.apache.log4j Logger]
[java.util.concurrent Semaphore]
[org.apache.hadoop.fs FileSystem Path]
[org.apache.hadoop.conf Configuration]
java.io.FileOutputStream
java.io.ObjectOutputStream
java.io.ObjectInputStream java.io.FileInputStream)

(:require
[clojure.stacktrace]
[plumbing.core :as gc]
[plumbing.graph :as graph]
[plumbing.fnk.pfnk :as pfnk]
[cascalog.api :refer [?- hfs-seqfile]]
[cascalog.checkpoint :refer [workflow] :as checkpoint]))

(defn- sym-dir [sym]
(symbol (str (name sym) "-dir")))

(defn- sym-step [keyw]
(symbol (str (name keyw) "-step")))

(defn- sym-fn [keyw]
(symbol (name keyw)))

(defn- mk-step-part [name deps tmp-dir body]
(list
(sym-step name)
(list (vec (concat [:deps (cond (= deps :all) deps
(seq deps) (vec deps))]
(when tmp-dir [:tmp-dirs [(sym-dir name)]])))
body)))

(defn- mk-state-fnk-part [name fn-args]
(list 'save-state name (list (list 'graph# name) fn-args)))

(defn- mk-query-state-fnk-part [step-name fn-args]
(list 'let (vector 'tmp-seqfile (list `hfs-seqfile (sym-dir step-name)))
(list `?- (str (name step-name)) 'tmp-seqfile (list (list 'graph# step-name) fn-args))
(list 'save-state step-name 'tmp-seqfile)))

(defn- mk-tmp-dir-fnk-step [{:keys [name deps fn-args]}]
(let [tmp-dir (sym-dir name)
fn-args (assoc fn-args :tmp-dir tmp-dir)]
(mk-step-part name deps tmp-dir (mk-state-fnk-part name fn-args))))

(defn- mk-fnk-step [{:keys [name deps fn-args]}]
(mk-step-part name deps nil (mk-state-fnk-part name fn-args)))

(defn- mk-final-fnk-step [{:keys [name deps fn-args]}]
(mk-step-part name :all nil (mk-state-fnk-part name fn-args)))

(defn- mk-query-fnk-step [{:keys [name deps fn-args]}]
(let [tmp-dir (sym-dir name)]
(mk-step-part name deps tmp-dir (mk-query-state-fnk-part name fn-args))))

(defn mk-fn-args [g ks]
(into {}
(map (fn [k]
[k
(if (g k)
(list 'fetch-state k)
(symbol (name k)))]) ks)))
[cascalog.checkpoint :as checkpoint]))

(defn dependency-graph [g]
(reduce (fn [acc [k v]]
Expand All @@ -62,17 +31,6 @@
(defn steps-dependent [g k]
(k (dependency-graph g)))

(defn mk-step [g [k v]]
(let [dep-keys (keys (pfnk/input-schema v))
fn-args (mk-fn-args g dep-keys)
deps (map sym-step (clojure.set/intersection (set dep-keys) (set (keys g))))
args {:name k :deps deps :fn-args fn-args}]
(condp = (-> v meta ::fnk-type)
:query (mk-query-fnk-step args)
:tmp-dir (mk-tmp-dir-fnk-step args)
:final (mk-final-fnk-step args)
(mk-fnk-step args))))

;; Duplicate graphs fnk for convenience
(defmacro fnk [& args]
`(gc/fnk ~@args))
Expand All @@ -87,6 +45,7 @@
`(mk-graph-fnk {} {:alpha :alpha-tap})"
[graph output-mapping]
{:pre [(clojure.set/superset? (set (keys graph)) (set (keys output-mapping)))]}
(graphify (reduce (fn [g [k v]]
(if (seq (steps-dependent g k))
(let [output-node (-> v name (str "-sink") keyword)]
Expand All @@ -102,23 +61,6 @@
(update-in (pfnk/io-schemata prev-fn) [0] assoc v true))))))
graph (filter (comp graph key) output-mapping))))

(defn mk-workflow
([tmp-dir graph]
(let [graph (graph/->graph graph)
input-keys (map (comp symbol name key) (pfnk/input-schema graph))]
(list `fn ['graph#]
(list `fnk (vec input-keys)
(list
'let '[state (atom {})
save-state (fn [k v] (swap! state assoc k v))
fetch-state (fn [k] (@state k))]
(list 'do (concat (list `checkpoint/workflow [tmp-dir])
(mapcat (partial mk-step graph) graph))
'state))))))
([tmp-dir orig-graph output-mapping]
(let [graph (select-nodes orig-graph output-mapping)]
(mk-workflow tmp-dir graph))))

(defn fnk-type [fnk]
(::fnk-type (meta fnk)))

Expand All @@ -142,17 +84,150 @@
m (assoc (meta f) ::fnk-type :final)]
`(with-meta ~f ~m)))

;; TODO improve function signature
;; REMOVE input options
(defn workflow-compile
([graph] (workflow-compile "/tmp/cascalog-checkpoint" graph))
([tmp-dir graph]
((eval (mk-workflow tmp-dir graph)) graph))
([graph input-m output-m] (workflow-compile "/tmp/cascalog-checkpoint" graph {} output-m))
([tmp-dir graph input-m output-m]
((eval (mk-workflow tmp-dir graph output-m))) graph))94


;; IDEA for wrapping all nodes in a graph
(defn transact [graph before-key before-fnk after-key after-fnk]
(let [all-nodes (keys graph)
adapted-after-fnk (vary-meta after-fnk update-in
[:plumbing.fnk.pfnk/io-schemata 0]
(partial apply assoc) (interleave all-nodes (repeat true)))]
(assoc graph
before-key before-fnk
after-key adapted-after-fnk)))

(defn serializable? [obj]
(instance? java.io.Serializable obj))

(defn serialize-state [state-file state]
(let [serializable-state (into {} (filter (comp serializable? val) state))]
(with-open [file-out (FileOutputStream. state-file)
object-out (ObjectOutputStream. file-out)]
(.writeObject object-out serializable-state))))

(defn deserialize-state [state-file]
(with-open [file-in (FileInputStream. state-file)
object-in (ObjectInputStream. file-in)]
(.readObject object-in)))

(defstruct WorkflowNode ::tmp-dirs ::fn ::deps ::error ::value)
(defstruct Workflow ::fs ::checkpoint-dir ::graph-atom ::previous-steps ::sem ::log)

(defn mk-workflow [checkpoint-dir initial-graph]
(let [fs (h/filesystem)
log (Logger/getLogger "checkpointed-workflow")
sem (Semaphore. 0)
state-file (clojure.java.io/file (str checkpoint-dir "/state.bin"))
previous-steps (when (.exists state-file) (deserialize-state state-file))]
(h/mkdirs fs checkpoint-dir)
(struct Workflow fs checkpoint-dir (atom initial-graph) previous-steps sem log)))

(defn- mk-runner
[node workflow]
(let [log (::log workflow)
graph-atom (::graph-atom workflow)
previous-steps (::previous-steps workflow)
sem (::sem workflow)
node-name (::name node)
config conf/*JOB-CONF*]
(Thread.
(fn []
(with-job-conf config
(try
;; Check previous value, doesn't take into account changes graph
;; removing checkpoint dir is the only option for these case
;; TODO smarter graph checking
(let [value (if-let [previous-value (get previous-steps node-name)]
(do (.info log (str "Skipping " node-name "..."))
previous-value)
(do (.info log (str "Calculating " node-name "..."))
((::fn node) @graph-atom)))]
(swap! graph-atom assoc node-name value)
(reset! (::value node) value))
(reset! (::status node) :successful)
(catch Throwable t
(.error log (str "Component failed " node-name) t)
(reset! (::error node) t)
(reset! (::status node) :failed))
(finally (.release sem))))))))

(defn- fail-workflow!
[workflow nodes-map]
(let [log (::log workflow)
nodes (vals nodes-map)
failed-nodes (filter (comp deref ::error) nodes)
succesful-nodes (filter (comp (partial = :successful) deref ::status) nodes)
running-nodes (filter (comp (partial = :running) deref ::status) nodes)
threads (map ::runner-thread running-nodes)]
(.info log "Workflow failed - interrupting components")
(doseq [t threads] (.interrupt t))
(.info log "Waiting for running components to finish")
(doseq [t threads] (.join t))
(.info log "Serializing succesful nodes")
(serialize-state (str (::checkpoint-dir workflow) "/state.bin")
(into {} (map (fn [n] [(::name n) @(::value n)]) succesful-nodes)))
;; TODO we would like to print the Exceptions that help us fix bugs, but it is unclear
;; how to reach those Exceptions (e.g. TupleExceptions)
(u/throw-runtime (str "Workflow failed during " (clojure.string/join ", " (map ::name failed-nodes))))))

(defn graph->nodes [workflow graph]
(let [previous-steps (::previous-steps workflow)
steps (set (keys graph))]
(into {}
(map (fn [[k f]]
(let [node-deps (filter steps (keys (pfnk/input-schema f)))
tmp-dir (str (::checkpoint-dir workflow) "/" (name k))

;; TODO can we do without the fnk macros, only needed for this:
f-wrapped (condp = (fnk-type f)
:tmp-dir (fn [deps]
(f (assoc deps :tmp-dir tmp-dir)))
:query (fn [deps]
(let [query (f deps)
;; Create seqfile with outfields of query to support convenience functions
;; such as (select-fields my-tap ["?a"])
intermediate-seqfile (apply hfs-seqfile tmp-dir
(apply concat (select-keys query [:outfields])))]
;; Run query and return seqfile
(?- (name k) intermediate-seqfile query)
intermediate-seqfile))
f)
node
(struct-map WorkflowNode
::name k
::fn f-wrapped
::status (atom :unstarted)
::error (atom nil)
::value (atom nil)
::tmp-dir tmp-dir
::deps node-deps)]
[k (assoc node ::runner-thread (mk-runner node workflow))]))
graph))))

(defn exec-workflow! [workflow nodes]
;; run checkpoint as graph
(let [log (::log workflow)]
(loop []
;; Start nodes t
(doseq [[name node] nodes
:when (and (= :unstarted @(::status node))
(every? (fn [[_ n]] (= :successful @(::status n)))
(select-keys nodes (::deps node))))]
(reset! (::status node) :running)
(.start (::runner-thread node)))
;; Wait for nodes to finish
(.acquire (::sem workflow))
;; Check for failures
(let [statuses (set (map (comp deref ::status val) nodes))]
(cond (contains? statuses :failed) (fail-workflow! workflow nodes)
(some #{:running :unstarted} statuses) (recur)
:else (.info log "Workflow completed successfully"))))
;; TODO make deleting checkpoint dir optional
#_(h/delete (::fs workflow) (::checkpoint-dir workflow) true)))

(defn workflow-compile [graph]
(let [graph (graph/->graph graph)]
(pfnk/fn->fnk (fn [{:keys [] :as graph-args}]
(let [workflow (mk-workflow "/tmp/cascalog-checkpoint-graph" graph-args)]
(exec-workflow! workflow (graph->nodes workflow graph)))) (pfnk/io-schemata graph))))

;; Adopted from https://github.com/stuartsierra/flow/blob/master/src/com/stuartsierra/flow.clj#L138
(defn dot-compile
Expand Down

0 comments on commit ebe3e7d

Please sign in to comment.