Skip to content

Commit

Permalink
working checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
ngrunwald committed Dec 5, 2017
1 parent 3e17721 commit 482fbb6
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 1 deletion.
6 changes: 6 additions & 0 deletions checkpoints.org
@@ -0,0 +1,6 @@
* Install gs nio

- install gcloud sdk
- gcloud auth login
- gcloud config set project [YOUR PROJECT ID]
- gcloud auth application-default login
3 changes: 2 additions & 1 deletion project.clj
Expand Up @@ -11,7 +11,8 @@
[org.clojure/math.combinatorics "0.1.4"]
[org.clojure/tools.logging "0.3.1"]
[clj-time "0.13.0"]
[superstring "2.1.0"]]
[superstring "2.1.0"]
[com.google.cloud/google-cloud-nio "0.20.0-alpha"]]
:source-paths ["src/clj"]
:java-source-paths ["src/java"]
:profiles {:dev {:dependencies [[junit/junit "4.12"]
Expand Down
45 changes: 45 additions & 0 deletions src/clj/datasplash/checkpoint.clj
@@ -0,0 +1,45 @@
(ns datasplash.checkpoint
(:require [taoensso.nippy :as nippy]
[cheshire.core :as json])
(:import [java.nio.file Paths Files StandardOpenOption LinkOption]
[java.net URI]
[java.util Iterator]
[java.io DataInputStream EOFException]))

(def open-options {:create (StandardOpenOption/valueOf "CREATE")})

(def formats {:nippy {:read-fn nippy/thaw-from-in!
:coerce-path-fn
(fn [path] (DataInputStream.
(Files/newInputStream path (make-array StandardOpenOption 0))))}
:json {:read-fn
(fn [^Iterator it]
(when (.hasNext it) (json/decode (.next it) true)))
:coerce-path-fn
(fn [path]
(.iterator (Files/lines path)))}})

(defn regular-file?
[path]
(Files/isRegularFile path (make-array LinkOption 0)))

(defn read-elements-from-paths
([{:keys [read-fn coerce-path-fn] :as format-fns} current-input other-paths]
(if-let [element (try (read-fn current-input)
(catch EOFException e nil))]
(cons element
(lazy-seq (read-elements-from-paths format-fns current-input other-paths)))
(do
(.close current-input)
(if-let [new-path (first other-paths)]
(read-elements-from-paths format-fns (coerce-path-fn new-path) (rest other-paths))
nil))))
([{:keys [coerce-path-fn] :as format-fns} paths]
(read-elements-from-paths format-fns (coerce-path-fn (first paths)) (rest paths))))

(defn read-checkpoint
[path {:keys [format]}]
(let [p (Paths/get (URI/create path))
ds (Files/newDirectoryStream p)
paths (filter regular-file? (seq ds))]
(read-elements-from-paths (formats format) paths)))

0 comments on commit 482fbb6

Please sign in to comment.