Skip to content

Commit

Permalink
Merge branch 'feature/var-search' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Apr 17, 2012
2 parents 9b438eb + 1be1971 commit 92fe78f
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 68 deletions.
4 changes: 2 additions & 2 deletions src/clj/cascalog/api.clj
Expand Up @@ -293,7 +293,7 @@
(p/predicate p/predicate-macro
(fn [invars outvars]
(for [[op & vars] (pred-macro-fn invars outvars)]
[op nil vars]))))
[op vars]))))

(defmacro predmacro
"A more general but more verbose way to create predicate macros.
Expand All @@ -318,7 +318,7 @@ destructuring in a predicate macro, the & symbol must be stringified
as well."
[outvars preds]
(let [outvars (v/vars->str outvars)
preds (for [[p & vars] preds] [p nil (v/vars->str vars)])]
preds (for [[p & vars] preds] [p (v/vars->str vars)])]
(rules/build-rule outvars preds)))

(defn union
Expand Down
48 changes: 23 additions & 25 deletions src/clj/cascalog/predicate.clj
Expand Up @@ -14,7 +14,7 @@
;; along with this program. If not, see <http://www.gnu.org/licenses/>.

(ns cascalog.predicate
(:use [cascalog.util :only (uuid multifn? substitute-if)]
(:use [cascalog.util :only (uuid multifn? substitute-if search-for-var)]
[jackknife.seq :only (transpose)]
[clojure.tools.macro :only (name-with-attributes)])
(:require [jackknife.core :as u]
Expand Down Expand Up @@ -196,7 +196,7 @@
args))

(defn- simpleop-build-predicate
[op _ hof-args infields outfields options]
[op hof-args infields outfields options]
(predicate operation
(apply op (hof-prepend hof-args infields :fn> outfields :> Fields/ALL))
infields
Expand All @@ -207,7 +207,7 @@
(w/fn-spec (cons avar args)))

(defn- simpleagg-build-predicate
[buffer? op _ hof-args infields outfields options]
[buffer? op hof-args infields outfields options]
(predicate aggregator buffer?
nil
identity
Expand All @@ -226,7 +226,7 @@
(defmethod predicate-default-var ::tap [& args] :>)
(defmethod hof-predicate? ::tap [& args] false)
(defmethod build-predicate-specific ::tap
[tap _ _ infields outfields options]
[tap _ infields outfields options]
(let [sourcename (uuid)
pname (init-pipe-name options)
pipe (w/assemble (w/pipe sourcename)
Expand All @@ -244,7 +244,7 @@
(defmethod predicate-default-var :generator [& args] :>)
(defmethod hof-predicate? :generator [& args] false)
(defmethod build-predicate-specific :generator
[gen _ _ infields outfields options]
[gen _ infields outfields options]
(let [pname (init-pipe-name options)
trapmap (merge (:trapmap gen)
(init-trap-map options))
Expand All @@ -262,7 +262,7 @@
(defmethod predicate-default-var ::parallel-aggregator [& args] :>)
(defmethod hof-predicate? ::parallel-aggregator [& args] false)
(defmethod build-predicate-specific ::parallel-aggregator
[pagg _ _ infields outfields options]
[pagg _ infields outfields options]
(let [init-spec (w/fn-spec (:init-var pagg))
combine-spec (w/fn-spec (:combine-var pagg))
cascading-agg (ClojureParallelAggregator. (w/fields outfields)
Expand All @@ -286,7 +286,7 @@
(defmethod predicate-default-var ::parallel-buffer [& args] :>)
(defmethod hof-predicate? ::parallel-buffer [op & args] (:hof? op))
(defmethod build-predicate-specific ::parallel-buffer
[pbuf _ hof-args infields outfields options]
[pbuf hof-args infields outfields options]
(let [temp-vars (v/gen-nullable-vars ((:num-intermediate-vars-fn pbuf)
infields
outfields))
Expand Down Expand Up @@ -323,9 +323,10 @@
(defmethod predicate-default-var ::vanilla-function [& args] :<)
(defmethod hof-predicate? ::vanilla-function [& args] false)
(defmethod build-predicate-specific ::vanilla-function
[_ opvar _ infields outfields options]
(u/safe-assert opvar "Functions must have vars associated with them.")
(let [[func-fields out-selector] (if (not-empty outfields)
[afn _ infields outfields options]
(let [opvar (search-for-var afn)
_ (u/safe-assert opvar "Vanilla functions must have vars associated with them.")
[func-fields out-selector] (if (not-empty outfields)
[outfields Fields/ALL]
[nil nil])
assembly (w/filter opvar infields :fn> func-fields :> out-selector)]
Expand Down Expand Up @@ -354,7 +355,7 @@
(defmethod predicate-default-var :filter [& args] :<)
(defmethod hof-predicate? :filter [op & args] (:hof? (meta op)))
(defmethod build-predicate-specific :filter
[op _ hof-args infields outfields options]
[op hof-args infields outfields options]
(let [[func-fields out-selector] (if (not-empty outfields)
[outfields Fields/ALL]
[nil nil])
Expand All @@ -369,7 +370,7 @@
(defmethod predicate-default-var ::cascalog-function [& args] :>)
(defmethod hof-predicate? ::cascalog-function [op & args] false)
(defmethod build-predicate-specific ::cascalog-function
[op _ _ infields outfields options]
[op _ infields outfields options]
(predicate operation
(w/raw-each (w/fields infields)
(CascalogFunctionExecutor. (w/fields outfields) op)
Expand All @@ -381,7 +382,7 @@
(defmethod predicate-default-var ::cascading-filter [& args] :<)
(defmethod hof-predicate? ::cascading-filter [op & args] false)
(defmethod build-predicate-specific ::cascading-filter
[op _ _ infields outfields options]
[op _ infields outfields options]
(u/safe-assert (#{0 1} (count outfields))
"Must emit 0 or 1 fields from filter")
(let [c-infields (w/fields infields)
Expand All @@ -398,9 +399,8 @@
(defmethod predicate-default-var :cascalog-tap [& args] :>)
(defmethod hof-predicate? :cascalog-tap [op & args] false)
(defmethod build-predicate-specific :cascalog-tap
[gen _ _ infields outfields options]
[gen _ infields outfields options]
(build-predicate-specific (:source gen)
nil
nil
infields
outfields
Expand All @@ -409,19 +409,17 @@
(defmethod predicate-default-var ::data-structure [& args] :>)
(defmethod hof-predicate? ::data-structure [op & args] false)
(defmethod build-predicate-specific ::data-structure
[tuples _ _ infields outfields options]
[tuples _ infields outfields options]
(build-predicate-specific (w/memory-source-tap tuples)
nil
nil
infields
outfields
options))

;; TODO: Does this need other multimethods?
(defmethod build-predicate-specific :generator-filter
[op _ _ infields outfields options]
[op _ infields outfields options]
(let [gen (build-predicate-specific (:generator op)
nil
nil
infields
outfields
Expand All @@ -430,12 +428,12 @@

;; TODO: Document: what is this?
(defmethod build-predicate-specific :outconstant-equal
[_ _ _ infields outfields options]
(-> (build-predicate-specific = #'= _ infields outfields options)
[_ _ infields outfields options]
(-> (build-predicate-specific = _ infields outfields options)
(assoc :allow-on-genfilter? true)))

(defmethod build-predicate-specific ::cascalog-buffer
[op _ _ infields outfields options]
[op _ infields outfields options]
(predicate aggregator
true
nil
Expand Down Expand Up @@ -520,18 +518,18 @@
[(conj newfields f) dupvars assem]))]
(reduce update [[] [] identity] infields)))

(defn mk-option-predicate [[op _ _ infields _]]
(defn mk-option-predicate [[op _ infields _]]
(predicate option op infields))

(defn build-predicate
"Build a predicate. Calls down to build-predicate-specific for
predicate-specific building and adds constant substitution and null
checking of ? vars."
[options op opvar hof-args orig-infields outvars]
[options op hof-args orig-infields outvars]
(let [outvars (replace-ignored-vars outvars)
[infields infield-subs] (variable-substitution orig-infields)
[infields dupvars duplicate-assem] (fix-duplicate-infields infields)
predicate (build-predicate-specific op opvar hof-args
predicate (build-predicate-specific op hof-args
infields outvars options)
new-outvars (concat outvars (keys infield-subs) dupvars)
in-insertion-assembly (when (seq infields)
Expand Down
40 changes: 20 additions & 20 deletions src/clj/cascalog/rules.clj
Expand Up @@ -441,10 +441,10 @@
(merge DEFAULT-OPTIONS)))

(defn- mk-var-uniquer-reducer [out?]
(fn [[preds vmap] [op opvar hof-args invars outvars]]
(fn [[preds vmap] [op hof-args invars outvars]]
(let [[updatevars vmap] (v/uniquify-vars (if out? outvars invars) out? vmap)
[invars outvars] (if out? [invars updatevars] [updatevars outvars])]
[(conj preds [op opvar hof-args invars outvars]) vmap])))
[(conj preds [op hof-args invars outvars]) vmap])))

;; TODO: Move mk-drift-map to graph?

Expand All @@ -460,33 +460,33 @@
[out-vars raw-predicates drift-map]))

(defn split-outvar-constants
[[op opvar hof-args invars outvars]]
[[op hof-args invars outvars]]
(let [[new-outvars newpreds] (reduce
(fn [[outvars preds] v]
(if (v/cascalog-var? v)
[(conj outvars v) preds]
(let [newvar (v/gen-nullable-var)]
[(conj outvars newvar)
(conj preds [(p/predicate p/outconstant-equal)
nil nil [v newvar] []])])))
nil [v newvar] []])])))
[[] []]
outvars)]
(cons [op opvar hof-args invars new-outvars] newpreds)))
(cons [op hof-args invars new-outvars] newpreds)))

(defn- rewrite-predicate [[op opvar hof-args invars outvars :as predicate]]
(defn- rewrite-predicate [[op hof-args invars outvars :as predicate]]
(if-not (and (p/generator? op) (seq invars))
predicate
(if (= 1 (count outvars))
[(p/predicate p/generator-filter op (first outvars)) nil hof-args [] invars]
[(p/predicate p/generator-filter op (first outvars)) hof-args [] invars]
(throw-illegal (str "Generator filter can only have one outvar -> "
outvars)))))

(defn- parse-predicate [[op opvar vars]]
(defn- parse-predicate [[op vars]]
(let [[vars hof-args] (if (p/hof-predicate? op)
[(rest vars) (s/collectify (first vars))]
[vars nil])
{invars :<< outvars :>>} (p/parse-variables vars (p/predicate-default-var op))]
[op opvar hof-args invars outvars]))
[op hof-args invars outvars]))

(defn- unzip-generators
"Returns a vector containing two sequences; the subset of the
Expand All @@ -500,20 +500,20 @@
to be used as a set, false otherwise."
[parsed-pred]
(and (p/generator? (first parsed-pred))
(not-empty (nth parsed-pred 3))))
(not-empty (nth parsed-pred 2))))

(defn- gen-as-set-ungrounding-vars
"Returns a sequence of ungrounding vars present in the
generators-as-sets contained within the supplied sequence of parsed
predicates (of the form `[op opvar hof-args invars outvars]`)."
predicates (of the form `[op hof-args invars outvars]`)."
[parsed-preds]
(mapcat (comp (partial filter v/unground-var?)
#(->> % (take-last 2) (apply concat)))
(filter gen-as-set? parsed-preds)))

(defn- parse-ungrounding-outvars
"For the supplied sequence of parsed cascalog predicates of the form
`[op opvar hof-args invars outvars]`, returns a vector of two
`[op hof-args invars outvars]`, returns a vector of two
entries: a sequence of all output ungrounding vars that appear
within generator predicates, and a sequence of all ungrounding vars
that appear within non-generator predicates."
Expand All @@ -536,7 +536,7 @@
(not-empty gen-as-set-vars)
(throw-illegal (str "Can't have unground vars in generators-as-sets."
(vec gen-as-set-vars)
" violate(s) the rules."))
" violate(s) the rules.\n\n" (pr-str parsed-preds)))

(not-empty extra-vars)
(throw-illegal (str "Ungrounding vars must originate within a generator. "
Expand Down Expand Up @@ -587,12 +587,12 @@
(swap! replacements assoc v new-name)
new-name))

(defn- pred-macro-updater [[replacements ret] [op opvar vars]]
(defn- pred-macro-updater [[replacements ret] [op vars]]
(let [newvars (postwalk #(if (v/cascalog-var? %)
(new-var-name! replacements %)
%)
vars)]
[replacements (conj ret [op opvar newvars])]))
[replacements (conj ret [op newvars])]))

(defn collectify-nil-as-seq [v]
(if v (s/collectify v)))
Expand Down Expand Up @@ -626,7 +626,7 @@
(pred-fn invars outvars)))

(defn- expand-predicate-macros [raw-predicates]
(mapcat (fn [[p _ vars :as raw-predicate]]
(mapcat (fn [[p vars :as raw-predicate]]
(let [p (if (var? p) (var-get p) p)]
(if (p/predicate-macro? p)
(expand-predicate-macros (expand-predicate-macro p vars))
Expand All @@ -636,10 +636,10 @@
(defn normalize-raw-predicates
"support passing around ops as vars."
[raw-predicates]
(for [[p v vars] raw-predicates]
(for [[p vars] raw-predicates]
(if (var? p)
[(var-get p) p vars]
[p v vars])))
[(var-get p) vars]
[p vars])))

(defn build-rule [out-vars raw-predicates]
(let [raw-predicates (-> raw-predicates
Expand All @@ -653,7 +653,7 @@
raw-predicates))))

(defn mk-raw-predicate [[op-sym & vars]]
[op-sym (u/try-resolve op-sym) (v/vars->str vars)])
[op-sym (v/vars->str vars)])

(defn- pluck-tuple [tap]
(with-open [it (.openForRead tap (hadoop/job-conf (conf/project-conf)))]
Expand Down
12 changes: 9 additions & 3 deletions src/clj/cascalog/util.clj
Expand Up @@ -40,9 +40,6 @@
[(conj newseq newval) (merge subs sub)]))
[[] {}] aseq))

(defn try-resolve [obj]
(when (symbol? obj) (resolve obj)))

(defn multi-set
"Returns a map of elem to count"
[aseq]
Expand Down Expand Up @@ -163,3 +160,12 @@
[(if (keyword? k)
(name k)
(str k)) v])))

(defn search-for-var [val]
(->> (loaded-libs)
(map ns-map)
(mapcat identity)
(map second)
(filter #(and (var? %) (= (var-get %) val)))
first ))

0 comments on commit 92fe78f

Please sign in to comment.