Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
removed unnecessary opvars from predicates
  • Loading branch information
Nathan Marz committed Apr 17, 2012
1 parent 0ee0225 commit 1be1971
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 65 deletions.
4 changes: 2 additions & 2 deletions src/clj/cascalog/api.clj
Expand Up @@ -293,7 +293,7 @@
(p/predicate p/predicate-macro
(fn [invars outvars]
(for [[op & vars] (pred-macro-fn invars outvars)]
[op nil vars]))))
[op vars]))))

(defmacro predmacro
"A more general but more verbose way to create predicate macros.
Expand All @@ -318,7 +318,7 @@ destructuring in a predicate macro, the & symbol must be stringified
as well."
[outvars preds]
(let [outvars (v/vars->str outvars)
preds (for [[p & vars] preds] [p nil (v/vars->str vars)])]
preds (for [[p & vars] preds] [p (v/vars->str vars)])]
(rules/build-rule outvars preds)))

(defn union
Expand Down
41 changes: 19 additions & 22 deletions src/clj/cascalog/predicate.clj
Expand Up @@ -196,7 +196,7 @@
args))

(defn- simpleop-build-predicate
[op _ hof-args infields outfields options]
[op hof-args infields outfields options]
(predicate operation
(apply op (hof-prepend hof-args infields :fn> outfields :> Fields/ALL))
infields
Expand All @@ -207,7 +207,7 @@
(w/fn-spec (cons avar args)))

(defn- simpleagg-build-predicate
[buffer? op _ hof-args infields outfields options]
[buffer? op hof-args infields outfields options]
(predicate aggregator buffer?
nil
identity
Expand All @@ -226,7 +226,7 @@
(defmethod predicate-default-var ::tap [& args] :>)
(defmethod hof-predicate? ::tap [& args] false)
(defmethod build-predicate-specific ::tap
[tap _ _ infields outfields options]
[tap _ infields outfields options]
(let [sourcename (uuid)
pname (init-pipe-name options)
pipe (w/assemble (w/pipe sourcename)
Expand All @@ -244,7 +244,7 @@
(defmethod predicate-default-var :generator [& args] :>)
(defmethod hof-predicate? :generator [& args] false)
(defmethod build-predicate-specific :generator
[gen _ _ infields outfields options]
[gen _ infields outfields options]
(let [pname (init-pipe-name options)
trapmap (merge (:trapmap gen)
(init-trap-map options))
Expand All @@ -262,7 +262,7 @@
(defmethod predicate-default-var ::parallel-aggregator [& args] :>)
(defmethod hof-predicate? ::parallel-aggregator [& args] false)
(defmethod build-predicate-specific ::parallel-aggregator
[pagg _ _ infields outfields options]
[pagg _ infields outfields options]
(let [init-spec (w/fn-spec (:init-var pagg))
combine-spec (w/fn-spec (:combine-var pagg))
cascading-agg (ClojureParallelAggregator. (w/fields outfields)
Expand All @@ -286,7 +286,7 @@
(defmethod predicate-default-var ::parallel-buffer [& args] :>)
(defmethod hof-predicate? ::parallel-buffer [op & args] (:hof? op))
(defmethod build-predicate-specific ::parallel-buffer
[pbuf _ hof-args infields outfields options]
[pbuf hof-args infields outfields options]
(let [temp-vars (v/gen-nullable-vars ((:num-intermediate-vars-fn pbuf)
infields
outfields))
Expand Down Expand Up @@ -323,7 +323,7 @@
(defmethod predicate-default-var ::vanilla-function [& args] :<)
(defmethod hof-predicate? ::vanilla-function [& args] false)
(defmethod build-predicate-specific ::vanilla-function
[afn _ _ infields outfields options]
[afn _ infields outfields options]
(let [opvar (search-for-var afn)
_ (u/safe-assert opvar "Vanilla functions must have vars associated with them.")
[func-fields out-selector] (if (not-empty outfields)
Expand Down Expand Up @@ -355,7 +355,7 @@
(defmethod predicate-default-var :filter [& args] :<)
(defmethod hof-predicate? :filter [op & args] (:hof? (meta op)))
(defmethod build-predicate-specific :filter
[op _ hof-args infields outfields options]
[op hof-args infields outfields options]
(let [[func-fields out-selector] (if (not-empty outfields)
[outfields Fields/ALL]
[nil nil])
Expand All @@ -370,7 +370,7 @@
(defmethod predicate-default-var ::cascalog-function [& args] :>)
(defmethod hof-predicate? ::cascalog-function [op & args] false)
(defmethod build-predicate-specific ::cascalog-function
[op _ _ infields outfields options]
[op _ infields outfields options]
(predicate operation
(w/raw-each (w/fields infields)
(CascalogFunctionExecutor. (w/fields outfields) op)
Expand All @@ -382,7 +382,7 @@
(defmethod predicate-default-var ::cascading-filter [& args] :<)
(defmethod hof-predicate? ::cascading-filter [op & args] false)
(defmethod build-predicate-specific ::cascading-filter
[op _ _ infields outfields options]
[op _ infields outfields options]
(u/safe-assert (#{0 1} (count outfields))
"Must emit 0 or 1 fields from filter")
(let [c-infields (w/fields infields)
Expand All @@ -399,9 +399,8 @@
(defmethod predicate-default-var :cascalog-tap [& args] :>)
(defmethod hof-predicate? :cascalog-tap [op & args] false)
(defmethod build-predicate-specific :cascalog-tap
[gen _ _ infields outfields options]
[gen _ infields outfields options]
(build-predicate-specific (:source gen)
nil
nil
infields
outfields
Expand All @@ -410,19 +409,17 @@
(defmethod predicate-default-var ::data-structure [& args] :>)
(defmethod hof-predicate? ::data-structure [op & args] false)
(defmethod build-predicate-specific ::data-structure
[tuples _ _ infields outfields options]
[tuples _ infields outfields options]
(build-predicate-specific (w/memory-source-tap tuples)
nil
nil
infields
outfields
options))

;; TODO: Does this need other multimethods?
(defmethod build-predicate-specific :generator-filter
[op _ _ infields outfields options]
[op _ infields outfields options]
(let [gen (build-predicate-specific (:generator op)
nil
nil
infields
outfields
Expand All @@ -431,12 +428,12 @@

;; TODO: Document: what is this?
(defmethod build-predicate-specific :outconstant-equal
[_ _ _ infields outfields options]
(-> (build-predicate-specific = #'= _ infields outfields options)
[_ _ infields outfields options]
(-> (build-predicate-specific = _ infields outfields options)
(assoc :allow-on-genfilter? true)))

(defmethod build-predicate-specific ::cascalog-buffer
[op _ _ infields outfields options]
[op _ infields outfields options]
(predicate aggregator
true
nil
Expand Down Expand Up @@ -521,18 +518,18 @@
[(conj newfields f) dupvars assem]))]
(reduce update [[] [] identity] infields)))

(defn mk-option-predicate [[op _ _ infields _]]
(defn mk-option-predicate [[op _ infields _]]
(predicate option op infields))

(defn build-predicate
"Build a predicate. Calls down to build-predicate-specific for
predicate-specific building and adds constant substitution and null
checking of ? vars."
[options op opvar hof-args orig-infields outvars]
[options op hof-args orig-infields outvars]
(let [outvars (replace-ignored-vars outvars)
[infields infield-subs] (variable-substitution orig-infields)
[infields dupvars duplicate-assem] (fix-duplicate-infields infields)
predicate (build-predicate-specific op opvar hof-args
predicate (build-predicate-specific op hof-args
infields outvars options)
new-outvars (concat outvars (keys infield-subs) dupvars)
in-insertion-assembly (when (seq infields)
Expand Down
40 changes: 20 additions & 20 deletions src/clj/cascalog/rules.clj
Expand Up @@ -441,10 +441,10 @@
(merge DEFAULT-OPTIONS)))

(defn- mk-var-uniquer-reducer [out?]
(fn [[preds vmap] [op opvar hof-args invars outvars]]
(fn [[preds vmap] [op hof-args invars outvars]]
(let [[updatevars vmap] (v/uniquify-vars (if out? outvars invars) out? vmap)
[invars outvars] (if out? [invars updatevars] [updatevars outvars])]
[(conj preds [op opvar hof-args invars outvars]) vmap])))
[(conj preds [op hof-args invars outvars]) vmap])))

;; TODO: Move mk-drift-map to graph?

Expand All @@ -460,33 +460,33 @@
[out-vars raw-predicates drift-map]))

(defn split-outvar-constants
[[op opvar hof-args invars outvars]]
[[op hof-args invars outvars]]
(let [[new-outvars newpreds] (reduce
(fn [[outvars preds] v]
(if (v/cascalog-var? v)
[(conj outvars v) preds]
(let [newvar (v/gen-nullable-var)]
[(conj outvars newvar)
(conj preds [(p/predicate p/outconstant-equal)
nil nil [v newvar] []])])))
nil [v newvar] []])])))
[[] []]
outvars)]
(cons [op opvar hof-args invars new-outvars] newpreds)))
(cons [op hof-args invars new-outvars] newpreds)))

(defn- rewrite-predicate [[op opvar hof-args invars outvars :as predicate]]
(defn- rewrite-predicate [[op hof-args invars outvars :as predicate]]
(if-not (and (p/generator? op) (seq invars))
predicate
(if (= 1 (count outvars))
[(p/predicate p/generator-filter op (first outvars)) nil hof-args [] invars]
[(p/predicate p/generator-filter op (first outvars)) hof-args [] invars]
(throw-illegal (str "Generator filter can only have one outvar -> "
outvars)))))

(defn- parse-predicate [[op opvar vars]]
(defn- parse-predicate [[op vars]]
(let [[vars hof-args] (if (p/hof-predicate? op)
[(rest vars) (s/collectify (first vars))]
[vars nil])
{invars :<< outvars :>>} (p/parse-variables vars (p/predicate-default-var op))]
[op opvar hof-args invars outvars]))
[op hof-args invars outvars]))

(defn- unzip-generators
"Returns a vector containing two sequences; the subset of the
Expand All @@ -500,20 +500,20 @@
to be used as a set, false otherwise."
[parsed-pred]
(and (p/generator? (first parsed-pred))
(not-empty (nth parsed-pred 3))))
(not-empty (nth parsed-pred 2))))

(defn- gen-as-set-ungrounding-vars
"Returns a sequence of ungrounding vars present in the
generators-as-sets contained within the supplied sequence of parsed
predicates (of the form `[op opvar hof-args invars outvars]`)."
predicates (of the form `[op hof-args invars outvars]`)."
[parsed-preds]
(mapcat (comp (partial filter v/unground-var?)
#(->> % (take-last 2) (apply concat)))
(filter gen-as-set? parsed-preds)))

(defn- parse-ungrounding-outvars
"For the supplied sequence of parsed cascalog predicates of the form
`[op opvar hof-args invars outvars]`, returns a vector of two
`[op hof-args invars outvars]`, returns a vector of two
entries: a sequence of all output ungrounding vars that appear
within generator predicates, and a sequence of all ungrounding vars
that appear within non-generator predicates."
Expand All @@ -536,7 +536,7 @@
(not-empty gen-as-set-vars)
(throw-illegal (str "Can't have unground vars in generators-as-sets."
(vec gen-as-set-vars)
" violate(s) the rules."))
" violate(s) the rules.\n\n" (pr-str parsed-preds)))

(not-empty extra-vars)
(throw-illegal (str "Ungrounding vars must originate within a generator. "
Expand Down Expand Up @@ -587,12 +587,12 @@
(swap! replacements assoc v new-name)
new-name))

(defn- pred-macro-updater [[replacements ret] [op opvar vars]]
(defn- pred-macro-updater [[replacements ret] [op vars]]
(let [newvars (postwalk #(if (v/cascalog-var? %)
(new-var-name! replacements %)
%)
vars)]
[replacements (conj ret [op opvar newvars])]))
[replacements (conj ret [op newvars])]))

(defn collectify-nil-as-seq [v]
(if v (s/collectify v)))
Expand Down Expand Up @@ -626,7 +626,7 @@
(pred-fn invars outvars)))

(defn- expand-predicate-macros [raw-predicates]
(mapcat (fn [[p _ vars :as raw-predicate]]
(mapcat (fn [[p vars :as raw-predicate]]
(let [p (if (var? p) (var-get p) p)]
(if (p/predicate-macro? p)
(expand-predicate-macros (expand-predicate-macro p vars))
Expand All @@ -636,10 +636,10 @@
(defn normalize-raw-predicates
"support passing around ops as vars."
[raw-predicates]
(for [[p v vars] raw-predicates]
(for [[p vars] raw-predicates]
(if (var? p)
[(var-get p) p vars]
[p v vars])))
[(var-get p) vars]
[p vars])))

(defn build-rule [out-vars raw-predicates]
(let [raw-predicates (-> raw-predicates
Expand All @@ -653,7 +653,7 @@
raw-predicates))))

(defn mk-raw-predicate [[op-sym & vars]]
[op-sym (u/try-resolve op-sym) (v/vars->str vars)])
[op-sym (v/vars->str vars)])

(defn- pluck-tuple [tap]
(with-open [it (.openForRead tap (hadoop/job-conf (conf/project-conf)))]
Expand Down
3 changes: 0 additions & 3 deletions src/clj/cascalog/util.clj
Expand Up @@ -40,9 +40,6 @@
[(conj newseq newval) (merge subs sub)]))
[[] {}] aseq))

(defn try-resolve [obj]
(when (symbol? obj) (resolve obj)))

(defn multi-set
"Returns a map of elem to count"
[aseq]
Expand Down
9 changes: 3 additions & 6 deletions test/cascalog/predicate_test.clj
Expand Up @@ -12,7 +12,6 @@
(deftest test-map-pred
(let [pred (build-predicate {}
timesplusone
(var timesplusone)
nil
["?f1" "?f2"]
["?q"])
Expand All @@ -36,7 +35,6 @@
(deftest test-variable-substitution
(let [pred (build-predicate {}
addplusone
(var addplusone)
nil
["?f1" "?f2" 3 4 "?f3"]
["?s" "?s2"])
Expand All @@ -63,7 +61,6 @@
(let [pred (build-predicate
{}
nilop
(var nilop)
nil
["?i"]
["?o1" "!o2"])
Expand All @@ -79,7 +76,7 @@
:else [(inc n)]))

(deftest test-mapcat-pred
(let [pred (build-predicate {} many-vals nil nil ["?a"] ["?b"])
(let [pred (build-predicate {} many-vals nil ["?a"] ["?b"])
source-data {:fields ["?a"]
:tuples [[1] [2] [3] [4]]}
sink-data {:fields ["?b"]
Expand All @@ -89,15 +86,15 @@
(future-fact "Test filter predicate.")

(deftest test-vanilla-filter
(let [pred (build-predicate {} odd? (var odd?) nil ["?f"] [])
(let [pred (build-predicate {} odd? nil ["?f"] [])
source-data {:fields ["?f"]
:tuples [[1] [2] [3] [4] [6] [9] [10]]}
sink-data {:fields ["?f"]
:tuples [[1] [3] [9]]} ]
(test-assembly source-data sink-data (:assembly pred))))

(deftest test-filter-func
(let [pred (build-predicate {} odd? (var odd?) nil ["?f"] ["?o"])
(let [pred (build-predicate {} odd? nil ["?f"] ["?o"])
source-data {:fields ["?f"]
:tuples [[1] [2] [3] [4] [6] [9] [10]]}
sink-data {:fields ["?f" "?o"]
Expand Down
12 changes: 0 additions & 12 deletions test/cascalog/util_test.clj
Expand Up @@ -2,18 +2,6 @@
(:use cascalog.util
midje.sweet))

(def p 5)

(facts "Test try-resolve."
(binding [*ns* (find-ns 'cascalog.util-test)]
(try-resolve 'qqq) => nil
(try-resolve '+) => #'+
(let [a 1]
(try-resolve 'a) => nil)
(try-resolve 10) => nil
(try-resolve [1 2 3]) => nil
(try-resolve 'p) => #'p))

(tabular
(fact "test-all-pairs"
(all-pairs ?input) => ?result)
Expand Down

0 comments on commit 1be1971

Please sign in to comment.