Skip to content

Commit

Permalink
Fixing serialising on hdfs
Browse files Browse the repository at this point in the history
  • Loading branch information
jeroenvandijk committed Jul 10, 2013
1 parent 9a14926 commit 975252f
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions src/adgoji/cascalog/checkpoint.clj
Expand Up @@ -18,16 +18,21 @@
(defn serializable? [obj]
(instance? java.io.Serializable obj))

(defn serialize-state [state-file state]
(let [serializable-state (into {} (filter (comp serializable? val) state))]
(with-open [file-out (FileOutputStream. state-file)
object-out (ObjectOutputStream. file-out)]
(defn serialize-state [fs state-file state]
(let [serializable-state (into {} (filter (comp serializable? val) state))
state-file-path (Path. state-file)
_ (when-not (.exists fs state-file-path)
(.createNewFile fs state-file-path))
out-stream (.create fs state-file-path true)]
(with-open [object-out (ObjectOutputStream. out-stream)]
(.writeObject object-out serializable-state))))

(defn deserialize-state [state-file]
(with-open [file-in (FileInputStream. state-file)
object-in (ObjectInputStream. file-in)]
(.readObject object-in)))
(defn deserialize-state [fs state-file]
(let [state-file-path (Path. state-file)]
(when (.exists fs state-file-path)
(let [in-stream (.open fs state-file-path)]
(with-open [object-in (ObjectInputStream. in-stream)]
(.readObject object-in))))))

(defstruct WorkflowNode ::tmp-dirs ::fn ::deps ::error ::value)
(defstruct Workflow ::fs ::checkpoint-dir ::graph-atom ::previous-steps ::sem ::log ::options)
Expand All @@ -36,8 +41,7 @@
(let [fs (h/filesystem)
log (Logger/getLogger "checkpointed-workflow")
sem (Semaphore. 0)
state-file (clojure.java.io/file (str checkpoint-dir "/state.bin"))
previous-steps (when (.exists state-file) (deserialize-state state-file))]
previous-steps (deserialize-state fs (str checkpoint-dir "/state.bin"))]
(h/mkdirs fs checkpoint-dir)
(struct Workflow fs checkpoint-dir (atom initial-graph) previous-steps sem log options)))

Expand Down Expand Up @@ -109,7 +113,7 @@
(.info log "Waiting for running components to finish")
(doseq [t threads] (.join t))
(.info log "Serializing succesful nodes")
(serialize-state (str (::checkpoint-dir workflow) "/state.bin")
(serialize-state (::fs workflow) (str (::checkpoint-dir workflow) "/state.bin")
(into {} (map (fn [n] [(::name n) @(::value n)]) succesful-nodes)))

(run-failure-callback workflow failed-nodes)
Expand Down Expand Up @@ -139,4 +143,5 @@
(run-success-callback workflow)
))))
;; TODO make deleting checkpoint dir optional
(.info log "Cleaning checkpoint dir")
(h/delete (::fs workflow) (::checkpoint-dir workflow) true)))

0 comments on commit 975252f

Please sign in to comment.