Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial commit

  • Loading branch information...
commit 6e770cc20b3e52d16cc584ad0bc57d8e7a12e57d 0 parents
@jeroenvandijk authored
10 .gitignore
@@ -0,0 +1,10 @@
+/target
+/lib
+/classes
+/checkouts
+pom.xml
+*.jar
+*.class
+.lein-deps-sum
+.lein-failures
+.lein-plugins
145 README.md
@@ -0,0 +1,145 @@
+# Cascalog Graph
+
+** WARNING - ALPHA QUALITY, USE AT YOUR OWN RISK, SUBJECT TO BUGS AND API CHANGES **
+
+Cascalog Graph is a library that combines two ideas: [workflow checkpoint](https://github.com/nathanmarz/cascalog-contrib/tree/master/cascalog.checkpoint) and [declarative abstractions](http://blog.getprismatic.com/blog/2012/10/1/prismatics-graph-at-strange-loop.html).
+
+The result is a library that provides the following:
+
+* an easy way to declare dependencies between Cascalog queries
+* modularity of Cascalog workflows; they can be merged now, allowing easy reuse of common parts.
+* Easier debugging, because you can focus on certain parts of a workflow without the need to copy code.
+* A way to visualize complex flows
+* Maybe more, if we can think of new execution strategies e.g. for logging, profiling, debugging etc. [see the Graph blog post](http://blog.getprismatic.com/blog/2012/10/1/prismatics-graph-at-strange-loop.html)
+
+## Constraints
+
+* The name of a function is the name that is used to match dependencies
+* Internal dependencies of a graph are treated as intermediate steps which will always output to a temporary seqfile dir.
+* External dependencies are treated as endpoints, and never as intermediate dependencies, thus a requirement for these steps is to define their own output strategy.
+
+## Motivation
+
+Cascalog.checkpoint is great for when you start to build complex flows. It makes your jobs easier to manage, more robust, and it allows you to easily run parts of a job in parallel. Also as a result of all of this, my jobs did often run much faster. So Cascalog.checkpoint is a must for me when writing new jobs. However, when jobs grow more complex or when you have jobs that have workflow parts in common, you will notice that the `workflow` macro makes things hard to reuse and a lot of the code needed feels verbose. This library solves this by generating the workflow code for you based on steps you provide. In the future it might be worthwhile to integrate with Cascalog.checkpoint more directly, but that depends on the success of this library.
+
+## Releases
+
+Cascalog-Graph is on [Clojars](https://clojars.org/adgoji/cascalog-graph)
+
+Latest release is 0.0.1
+
+[Leiningen](https://github.com/technomancy/leiningen) dependency information:
+
+ [adgoji/cascalog-graph "0.0.1"]
+
+The Git master branch is at version 0.0.1-SNAPSHOT.
+
+## Usage
+
+The following example is a copy of [Stuart Sierra's flow example](https://github.com/stuartsierra/flow) molded into Cascalog Graph form.
+
+ (require '[cascalog.api :as cascalog]
+ '[adgoji.cascalog.graph :as g])
+
+ (def result (g/flow-fn [gamma delta epsilon output-tap]
+ (?<- output-tap
+ [?result]
+ (gamma ?idx ?gamma)
+ (delta ?idx ?delta)
+ (epsilon ?idx ?epsilon)
+ (+ ?gamma ?delta ?epsilon :> ?result))))
+
+ (def gamma (g/flow-fn [alpha beta]
+ (<- [?idx ?gamma]
+ (alpha ?idx ?alpha)
+ (beta ?idx ?beta)
+ (+ ?alpha ?beta :> ?gamma))))
+
+ (def delta (g/flow-fn [alpha gamma]
+ (<- [?idx ?delta]
+ (alpha ?idx ?alpha)
+ (gamma ?idx ?gamma)
+ (+ ?gamma ?alpha :> ?delta))))
+
+ (def epsilon (g/flow-fn [gamma delta]
+ (<- [?idx ?epsilon]
+ (gamma ?idx ?gamma)
+ (delta ?idx ?delta)
+ (+ ?gamma ?delta :> ?epsilon))))
+
+ (def complete-flow (g/fns-to-flow #'result #'gamma #'delta #'epsilon))
+
+ (def complete-flow (g/fns-to-flow #'result #'gamma #'delta #'epsilon))
+
+Print workflow for debugging
+
+ user=> (g/pp-workflow complete-flow)
+ gamma-step ([:deps nil :tmp-dirs [gamma-dir]]
+ :gamma:
+ (cascalog.api/?- (cascalog.api/hfs-seqfile gamma-dir) (user/gamma {:beta beta, :alpha alpha})))
+ delta-step ([:deps [gamma-step] :tmp-dirs [delta-dir]]
+ :delta:
+ (cascalog.api/?- (cascalog.api/hfs-seqfile delta-dir) (user/delta {:gamma (cascalog.api/hfs-seqfile gamma-dir), :alpha alpha})))
+ epsilon-step ([:deps [delta-step gamma-step] :tmp-dirs [epsilon-dir]]
+ :epsilon:
+ (cascalog.api/?- (cascalog.api/hfs-seqfile epsilon-dir) (user/epsilon {:gamma (cascalog.api/hfs-seqfile gamma-dir), :delta (cascalog.api/hfs-seqfile delta-dir)})))
+ result-step ([:deps [epsilon-step delta-step gamma-step]]
+ :result:
+ (user/result {:gamma (cascalog.api/hfs-seqfile gamma-dir), :delta (cascalog.api/hfs-seqfile delta-dir), :output-tap output-tap, :epsilon (cascalog.api/hfs-seqfile epsilon-dir)}))
+ nil
+
+
+Output dot for visualization:
+
+ user=> (g/dot complete-flow)
+ digraph "flow" {
+ "alpha" -> "delta" ;
+ "alpha" -> "gamma" ;
+ "epsilon" -> "result" ;
+ "output-tap" -> "result" ;
+ "delta" -> "epsilon" ;
+ "delta" -> "result" ;
+ "beta" -> "gamma" ;
+ "gamma" -> "epsilon" ;
+ "gamma" -> "result" ;
+ "gamma" -> "delta" ;
+ }
+ nil
+
+Write that out to a file:
+
+ (g/write-dotfile process-flow "flow.dot")
+
+Then run Graphviz:
+
+ $ dot -Tpng -o flow.png flow.dot
+
+Run the workflow
+
+ ((g/mk-workflow-fn complete-flow) {:output-tap (cascalog/stdout) :alpha 1 :beta 2 })
+
+ ...
+
+ RESULTS
+ -----------------------
+ 14
+
+## Todo
+
+* Provide better examples
+* Add tests
+* Internal code cleanup
+* Gather feedback from Cascalog.Checkpoint users
+
+## Credits
+
+* Prismatic for coining the ideas of Graph (see [blogpost]([http://blog.getprismatic.com/blog/2012/10/1/prismatics-graph-at-strange-loop.html))
+* Stuart Sierra for building flow [an implementation of the Graph idea](https://github.com/stuartsierra/flow) and an important dependency for this project
+* Sam Ritchie and Contributors for building Cascalog.checkpoint that makes this library actually do anything
+* [AdGoji](http://www.adgoji.com/) for giving me time to work on this
+
+## License
+
+Copyright © 2013 Jeroen van Dijk
+
+Distributed under the Eclipse Public License, the same as Clojure.
35 dev/src/example.clj
@@ -0,0 +1,35 @@
+(ns example
+ (:require
+ [cascalog.api :as cascalog :refer :all]
+ [adgoji.cascalog.graph :as g]))
+
+(def result (g/flow-fn [gamma delta epsilon output-tap]
+ (?<- output-tap
+ [?result]
+ (gamma ?idx ?gamma)
+ (delta ?idx ?delta)
+ (epsilon ?idx ?epsilon)
+ (+ ?gamma ?delta ?epsilon :> ?result))))
+
+(def gamma (g/flow-fn [alpha beta]
+ (<- [?idx ?gamma]
+ (alpha ?idx ?alpha)
+ (beta ?idx ?beta)
+ (+ ?alpha ?beta :> ?gamma))))
+
+(def delta (g/flow-fn [alpha gamma]
+ (<- [?idx ?delta]
+ (alpha ?idx ?alpha)
+ (gamma ?idx ?gamma)
+ (+ ?gamma ?alpha :> ?delta))))
+
+(def epsilon (g/flow-fn [gamma delta]
+ (<- [?idx ?epsilon]
+ (gamma ?idx ?gamma)
+ (delta ?idx ?delta)
+ (+ ?gamma ?delta :> ?epsilon))))
+
+(def complete-flow (g/fns-to-flow #'result #'gamma #'delta #'epsilon))
+
+(defn -main [& args]
+ ((g/mk-workflow-fn complete-flow) {:output-tap (cascalog/stdout) :alpha [[0 1]] :beta [[0 2]] }))
21 project.clj
@@ -0,0 +1,21 @@
+(defproject adgoji/cascalog-graph "0.0.1"
+ :description "Graph implementation for Cascalog"
+ :url "http://github.com/jeroenvandijk/cascalog-graph"
+ :license { :name "Eclipse Public License"
+ :url "http://www.eclipse.org/legal/epl-v10.html" }
+
+ :dependencies [
+ [com.stuartsierra/flow "0.1.0"]
+ [org.clojure/tools.namespace "0.2.2"]
+
+ [cascalog "1.9.0"]
+ [cascalog-checkpoint "0.2.0"]
+ ]
+
+ :profiles {
+ :dev {
+ :source-paths ["dev/src"]
+ :dependencies [[org.clojure/clojure "1.4.0"]
+ [org.apache.hadoop/hadoop-core "0.20.2-dev" :exclusions [log4j org.slf4j/slf4j-log4j12 org.slf4j/slf4j-api]]]
+ }
+ })
194 src/adgoji/cascalog/graph.clj
@@ -0,0 +1,194 @@
+(ns adgoji.cascalog.graph
+ (:require [com.stuartsierra.flow :as f]
+ [clojure.tools.namespace.dependency :as dep]
+ [cascalog.api :refer [?- hfs-seqfile]]
+ [cascalog.checkpoint :refer [workflow] :as checkpoint]))
+
+(defn- sym-dir [sym]
+ (symbol (str (name sym) "-dir")))
+
+(defn- sym-step [keyw]
+ (symbol (str (name keyw) "-step")))
+
+(defn- sym-fn [keyw]
+ (symbol (name keyw)))
+
+(defn- mk-intermediate-command [tmp-dir-sym fn-sym input-tap-sym]
+ (list `?- (list `hfs-seqfile tmp-dir-sym) (list fn-sym input-tap-sym)))
+
+(defn- mk-intermediate-step [{:keys [name deps fn-sym fn-args fn-doc]}]
+ (let [tmp-dir (sym-dir name)
+ step-sym (sym-step name)
+ doc-str (str name ": " fn-doc)]
+ (list
+ step-sym (list [:deps (if (empty? deps) nil (vec deps)) :tmp-dirs [tmp-dir]]
+ doc-str
+ (mk-intermediate-command tmp-dir (sym-fn fn-sym) fn-args)))))
+
+(defn- mk-endpoint-step [{:keys [name deps fn-sym fn-args fn-doc]}]
+ (let [step-sym (sym-step name)
+ doc-str (str name ": " fn-doc)]
+ (list
+ step-sym (list [:deps (if (empty? deps) nil (vec deps))]
+ doc-str
+ (list (sym-fn fn-sym) fn-args)))))
+
+(defn- required-args [flow]
+ (mapcat (comp :com.stuartsierra.flow/inputs meta) (vals flow)))
+
+;; We need this function in the mk-workflow-fn macro, and therefore it needs to be public?
+(defn external-args [flow]
+ (clojure.set/difference (set (required-args flow)) (set (keys flow))))
+
+(defn- internal-args [flow]
+ (clojure.set/difference (set (required-args flow)) (set (external-args flow))))
+
+(defn calc-steps [flow]
+ (let [ext-args (external-args flow)
+ inputs (set (mapcat (comp :com.stuartsierra.flow/inputs meta) (vals flow)))]
+ (reduce (fn [acc [k v]]
+ (let [m (meta v)
+ deps (remove ext-args (:com.stuartsierra.flow/inputs m))
+ args-deps (:com.stuartsierra.flow/inputs m)
+ aliases (:aliases m)
+ args-names (map #(get aliases % %) args-deps)
+ step-deps (remove (external-args flow) args-deps)
+ intermediate? (not (nil? (inputs k)))]
+ (assoc acc k { :args-deps args-deps :args-names args-names :step-deps step-deps :deps deps :intermediate? intermediate? :fn v }))) {} flow)))
+
+(defn- mk-taps [deps internal-args]
+ (map
+ (fn [k]
+ (let [k-sym (symbol (name k))]
+ (if (internal-args k)
+ (list `hfs-seqfile (sym-dir k-sym))
+ k-sym)))
+ deps))
+
+(defn- flow-graph [flow]
+ (reduce (fn [graph [output f]]
+ (reduce (fn [g input] (dep/depend g output input))
+ graph (::com.stuartsierra.flow/inputs (meta f))))
+ (dep/graph) flow))
+
+(defn- sort-graph [flow steps]
+ (let [sorted-keys (dep/topo-sort (flow-graph flow))]
+ (map #(list % (% steps)) (filter (set (keys steps)) sorted-keys))))
+
+(defn- pp-step [step]
+ (let [[name & [[conf doc-str & [body]]]] step]
+ [(str name " (" conf)
+ (str " " doc-str)
+ (str " " body ")")]))
+
+;;; Public API
+
+(defn flow->workflow [flow]
+ (->> (calc-steps flow)
+ (sort-graph flow)
+ (mapcat (fn [[k v]]
+ (let [f-m (-> v :fn meta)
+ f (str (:ns f-m) "/" (name (:name f-m)))
+ fn-doc (:doc f-m)
+ dir (sym-dir k)
+ step-deps (map sym-step (:step-deps v))
+ arg-keys (:args-deps v)
+ args-names (:args-names v)
+ args-taps (mk-taps arg-keys (internal-args flow))
+ f-args (zipmap args-names args-taps)]
+ (if (:intermediate? v)
+ (mk-intermediate-step {:name k :deps step-deps :fn-doc fn-doc :fn-sym f :fn-args f-args})
+ (mk-endpoint-step {:name k :deps step-deps :fn-doc fn-doc :fn-sym f :fn-args f-args})))))))
+
+(defn pp-workflow
+ "Pretty prints a complete checkpoint workflow for debugging"
+ [flow]
+ (->> (flow->workflow flow)
+ (partition 2)
+ (mapcat pp-step)
+ (clojure.string/join "\n")
+ println))
+
+(defn fns-to-flow
+ "Given a list of flow functions a graph will be generated that can be printed or executed"
+ [& fns]
+ (apply hash-map (mapcat
+ (fn [f]
+ (let [m (meta f)
+ name (keyword (:name m))
+ v (var-get f)
+ v (with-meta v (assoc (meta v) :name name :ns (:ns m)))
+ ]
+ [name v])) fns)))
+
+;; TODO remove duplication between rename-meta,rename-meta-all,update-flow,update-flow-all
+(defn rename-meta-all
+ "Rename steps and all inputs of a graph"
+ [v inner-deps func]
+ (let [m (meta v)
+ inputs (:com.stuartsierra.flow/inputs m)
+ new-inputs (map func inputs)
+ aliases (zipmap new-inputs inputs)]
+ (with-meta v (assoc m :com.stuartsierra.flow/inputs new-inputs :aliases aliases))))
+
+(defn rename-meta
+ "Rename steps of a graph"
+ [v inner-deps func]
+ (let [m (meta v)
+ [inner-inputs external-inputs] (split-with inner-deps (:com.stuartsierra.flow/inputs m))
+ new-inputs (map func inner-inputs)
+ aliases (zipmap new-inputs inner-inputs)
+ all-inputs (set (concat new-inputs external-inputs))]
+ (with-meta v (assoc m :com.stuartsierra.flow/inputs all-inputs :aliases aliases))))
+
+(defn update-flow
+ "Update step names of a graph"
+ [graph func]
+ (let [inner-deps (set (keys graph))]
+ (reduce (fn [acc [k v]] (assoc acc (func k) (rename-meta v inner-deps func))) {} graph)))
+
+(defn update-flow-all
+ "Update step names and input names of a graph
+ Useful for reusing a graph structure with different inputs
+ "
+ [graph func]
+ (let [inner-deps (set (keys graph))]
+ (reduce (fn [acc [k v]] (assoc acc (func k) (rename-meta-all v inner-deps func))) {} graph)))
+
+(declare flow-fn)
+
+(defmacro mk-workflow-fn
+ "Create a function that can run a workflow
+ Accepts keyword arguments"
+ [flow]
+ (let [workflow# (eval `(flow->workflow ~flow))
+ external-symbols# (mapv (comp symbol name) (eval `(external-args ~flow)))]
+ ;; Use flow-fn to generate a function that matches keywords and asserts on presence
+ ;; The :inputs metadata is also useful for introspection
+ `(f/flow-fn ~external-symbols#
+ (assert ~external-symbols#)
+ (checkpoint/workflow ["/tmp/cascalog-checkpoint"] ~@workflow#))))
+
+;; Steal functions from Flow library, without the need for people to manage the dependency themselves
+
+;; TODO can we make a defn-like form of flow-fn?
+(defmacro flow-fn
+ "Returns a function for use in a flow. The function will take a
+ single map argument. inputs is either a destructuring map form or a
+ vector of symbols to destructure as with {:keys [...]}."
+ [inputs & body]
+ `(f/flow-fn ~inputs ~@body))
+
+(def dot
+ "Prints a representation of the flow to standard output,
+ suitable for input to the Graphviz 'dot' program. Options are
+ key-value pairs from:
+
+ :graph-name string/symbol/keyword naming the graph. Must not be
+ a Graphiviz reserved word such as \"graph\"."
+ f/dot)
+
+(def write-dotfile
+ "Writes a Graphviz dotfile for a Flow. options are the same as for
+ 'dot'."
+ f/write-dotfile)
7 test/cascalog_graph/core_test.clj
@@ -0,0 +1,7 @@
+(ns cascalog-graph.core-test
+ (:use clojure.test
+ cascalog-graph.core))
+
+(deftest a-test
+ (testing "FIXME, I fail."
+ (is (= 0 1))))
Please sign in to comment.
Something went wrong with that request. Please try again.