Skip to content
This repository has been archived by the owner on May 2, 2023. It is now read-only.

Commit

Permalink
start working on S3 processes to look for directories and start watchers
Browse files Browse the repository at this point in the history
  • Loading branch information
pingles committed May 22, 2014
1 parent e41b5d8 commit b782943
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 3 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -7,3 +7,4 @@ pom.xml.asc
*.class
/.lein-*
/.nrepl-port
/etc/config.edn
24 changes: 24 additions & 0 deletions dev/user.clj
@@ -0,0 +1,24 @@
(ns user
(:require [uswitch.blueshift.system :refer (build-system)]
[clojure.tools.namespace.repl :refer (refresh)]
[com.stuartsierra.component :as component]))

(def system nil)

(defn init []
(alter-var-root #'system
(constantly (build-system (read-string (slurp "./etc/config.edn"))))))

(defn start []
(alter-var-root #'system component/start))

(defn stop []
(alter-var-root #'system (fn [s] (when s (component/stop s)))))

(defn go []
(init)
(start))

(defn reset []
(stop)
(refresh :after 'user/go))
4 changes: 4 additions & 0 deletions etc/config.edn.example
@@ -0,0 +1,4 @@
{:s3 {:credentials {:access-key ""
:secret-key ""}
:bucket "blueshift-data"
:poll-interval {:seconds 30}}}
15 changes: 12 additions & 3 deletions project.clj
@@ -1,6 +1,15 @@
(defproject blueshift "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:description "Automate importing S3 data into Amazon Redshift"
:url "https://github.com/uswitch/blueshift"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.5.1"]])
:dependencies [[org.clojure/clojure "1.6.0"]
[org.clojure/tools.logging "0.2.6"]
[com.stuartsierra/component "0.2.1"]
[org.clojure/core.async "0.1.303.0-886421-alpha"]
[org.clojure/tools.cli "0.3.1"]
[clj-aws-s3 "0.3.9"]]
:profiles {:dev {:dependencies [[org.slf4j/slf4j-simple "1.7.7"]
[org.clojure/tools.namespace "0.2.3"]]
:source-paths ["./dev"]}}
:main uswitch.blueshift.main)
21 changes: 21 additions & 0 deletions src/uswitch/blueshift/main.clj
@@ -0,0 +1,21 @@
(ns uswitch.blueshift.main
(:require [clojure.tools.logging :refer (info)]
[clojure.tools.cli :refer (parse-opts)]
[uswitch.blueshift.system :refer (build-system)]
[com.stuartsierra.component :refer (start)])
(:gen-class))

(def cli-options
[["-c" "--config CONFIG" "Path to EDN configuration file"
:default "./etc/config.edn"
:validate [string?]]
["-h" "--help"]])

(defn -main [& args]
(let [{:keys [options summary]} (parse-opts args cli-options)]
(when (:help options)
(println summary)
(System/exit 0))
(let [{:keys [config]} options]
(info "Starting Blueshift with configuration" config)
(start (build-system (read-string (slurp config)))))))
133 changes: 133 additions & 0 deletions src/uswitch/blueshift/s3.clj
@@ -0,0 +1,133 @@
(ns uswitch.blueshift.s3
(:require [com.stuartsierra.component :refer (Lifecycle system-map using start stop)]
[clojure.tools.logging :refer (info error)]
[aws.sdk.s3 :refer (list-objects)]
[clojure.set :refer (difference)]
[clojure.core.async :refer (go-loop chan >! <! alts! timeout close!)]))

(defn listing
[credentials bucket & opts]
(let [options (apply hash-map opts)]
(loop [marker nil
results []]
(let [{:keys [next-marker truncated? objects]}
(list-objects credentials bucket (assoc options :marker marker))]
(if (not truncated?)
results
(recur next-marker (concat results objects)))))))

(defn directories
([credentials bucket]
(:common-prefixes (list-objects credentials bucket {:delimiter "/"})))
([credentials bucket path]
{:pre [(.endsWith path "/")]}
(:common-prefixes (list-objects credentials bucket {:delimiter "/" :prefix path}))))

(defn leaf-directories
[credentials bucket]
(loop [work (directories credentials bucket)
result nil]
(if (seq work)
(let [sub-dirs (directories credentials bucket (first work))]
(recur (concat (rest work) sub-dirs)
(if (seq sub-dirs)
result
(cons (first work) result))))
result)))



(defn close-channels [state & ks]
(doseq [k ks]
(when-let [ch (get state k)]
(close! ch)))
(apply dissoc state ks))




(defrecord Watcher [bucket directory]
Lifecycle
(start [this]
(info "Starting Watcher for" (str bucket "/" directory))
this)
(stop [this]
(info "Stopping Watcher for" (str bucket "/" directory))
this))



(defn spawn-watcher! [bucket directory]
(doto (Watcher. bucket directory)
(start)))

(defrecord Spawner [poller]
Lifecycle
(start [this]
(info "Starting Spawner")
(let [ch (:new-directories-ch poller)
bucket (:bucket poller)
watchers (atom nil)]
(go-loop [dirs (<! ch)]
(doseq [dir dirs]
(swap! watchers conj (spawn-watcher! (:bucket poller) dir)))
(recur (<! ch)))
(assoc this :watchers watchers)))
(stop [this]
(info "Stopping Spawner")
(when-let [watchers (:watchers this)]
(doseq [watcher @watchers] (stop watcher)))
(dissoc this :watchers)))

(defn spawner []
(map->Spawner {}))

(defrecord Poller [credentials bucket poll-interval-seconds]
Lifecycle
(start [this]
(info "Starting S3 Poller. Polling" bucket "every" poll-interval-seconds "seconds")
(let [new-directories-ch (chan)
control-ch (chan)]
(go-loop [dirs nil]
(let [available-dirs (set (leaf-directories credentials bucket))
new-dirs (difference available-dirs dirs)]
(when (seq new-dirs)
(info "New directories:" new-dirs "spawning watchers")
(>! new-directories-ch new-dirs))
(let [[v c] (alts! [(timeout (* 1000 poll-interval-seconds)) control-ch])]
(when-not (= c control-ch)
(recur available-dirs)))))
(assoc this :control-ch control-ch :new-directories-ch new-directories-ch)))
(stop [this]
(info "Stopping S3 Poller")
(close-channels this :control-ch :new-directories-ch)))

(defn poller
"Creates a process watching for objects in S3 buckets."
[config]
(map->Poller {:credentials (-> config :s3 :credentials)
:bucket (-> config :s3 :bucket)
:poll-interval-seconds (-> config :s3 :poll-interval :seconds)}))


(defrecord PrintSink [prefix chan-k component]
Lifecycle
(start [this]
(let [ch (get component chan-k)]
(go-loop [msg (<! ch)]
(when msg
(info prefix msg)
(recur (<! ch)))))
this)
(stop [this]
this))

(defn print-sink
[prefix chan-k]
(map->PrintSink {:prefix prefix :chan-k chan-k}))


(defn s3-system [config]
(system-map :poller (poller config)
:spawner (using (spawner)
[:poller])))
6 changes: 6 additions & 0 deletions src/uswitch/blueshift/system.clj
@@ -0,0 +1,6 @@
(ns uswitch.blueshift.system
(:require [uswitch.blueshift.s3 :refer (s3-system)]
[com.stuartsierra.component :refer (system-map)]))

(defn build-system [config]
(system-map :s3-system (s3-system config)))

0 comments on commit b782943

Please sign in to comment.