diff --git a/src/clj/cascalog/api.clj b/src/clj/cascalog/api.clj index ebce34ea..0a595454 100644 --- a/src/clj/cascalog/api.clj +++ b/src/clj/cascalog/api.clj @@ -293,7 +293,7 @@ (p/predicate p/predicate-macro (fn [invars outvars] (for [[op & vars] (pred-macro-fn invars outvars)] - [op nil vars])))) + [op vars])))) (defmacro predmacro "A more general but more verbose way to create predicate macros. @@ -318,7 +318,7 @@ destructuring in a predicate macro, the & symbol must be stringified as well." [outvars preds] (let [outvars (v/vars->str outvars) - preds (for [[p & vars] preds] [p nil (v/vars->str vars)])] + preds (for [[p & vars] preds] [p (v/vars->str vars)])] (rules/build-rule outvars preds))) (defn union diff --git a/src/clj/cascalog/predicate.clj b/src/clj/cascalog/predicate.clj index e0654325..121c39d1 100644 --- a/src/clj/cascalog/predicate.clj +++ b/src/clj/cascalog/predicate.clj @@ -14,7 +14,7 @@ ;; along with this program. If not, see . (ns cascalog.predicate - (:use [cascalog.util :only (uuid multifn? substitute-if)] + (:use [cascalog.util :only (uuid multifn? substitute-if search-for-var)] [jackknife.seq :only (transpose)] [clojure.tools.macro :only (name-with-attributes)]) (:require [jackknife.core :as u] @@ -196,7 +196,7 @@ args)) (defn- simpleop-build-predicate - [op _ hof-args infields outfields options] + [op hof-args infields outfields options] (predicate operation (apply op (hof-prepend hof-args infields :fn> outfields :> Fields/ALL)) infields @@ -207,7 +207,7 @@ (w/fn-spec (cons avar args))) (defn- simpleagg-build-predicate - [buffer? op _ hof-args infields outfields options] + [buffer? op hof-args infields outfields options] (predicate aggregator buffer? nil identity @@ -226,7 +226,7 @@ (defmethod predicate-default-var ::tap [& args] :>) (defmethod hof-predicate? ::tap [& args] false) (defmethod build-predicate-specific ::tap - [tap _ _ infields outfields options] + [tap _ infields outfields options] (let [sourcename (uuid) pname (init-pipe-name options) pipe (w/assemble (w/pipe sourcename) @@ -244,7 +244,7 @@ (defmethod predicate-default-var :generator [& args] :>) (defmethod hof-predicate? :generator [& args] false) (defmethod build-predicate-specific :generator - [gen _ _ infields outfields options] + [gen _ infields outfields options] (let [pname (init-pipe-name options) trapmap (merge (:trapmap gen) (init-trap-map options)) @@ -262,7 +262,7 @@ (defmethod predicate-default-var ::parallel-aggregator [& args] :>) (defmethod hof-predicate? ::parallel-aggregator [& args] false) (defmethod build-predicate-specific ::parallel-aggregator - [pagg _ _ infields outfields options] + [pagg _ infields outfields options] (let [init-spec (w/fn-spec (:init-var pagg)) combine-spec (w/fn-spec (:combine-var pagg)) cascading-agg (ClojureParallelAggregator. (w/fields outfields) @@ -286,7 +286,7 @@ (defmethod predicate-default-var ::parallel-buffer [& args] :>) (defmethod hof-predicate? ::parallel-buffer [op & args] (:hof? op)) (defmethod build-predicate-specific ::parallel-buffer - [pbuf _ hof-args infields outfields options] + [pbuf hof-args infields outfields options] (let [temp-vars (v/gen-nullable-vars ((:num-intermediate-vars-fn pbuf) infields outfields)) @@ -323,9 +323,10 @@ (defmethod predicate-default-var ::vanilla-function [& args] :<) (defmethod hof-predicate? ::vanilla-function [& args] false) (defmethod build-predicate-specific ::vanilla-function - [_ opvar _ infields outfields options] - (u/safe-assert opvar "Functions must have vars associated with them.") - (let [[func-fields out-selector] (if (not-empty outfields) + [afn _ infields outfields options] + (let [opvar (search-for-var afn) + _ (u/safe-assert opvar "Vanilla functions must have vars associated with them.") + [func-fields out-selector] (if (not-empty outfields) [outfields Fields/ALL] [nil nil]) assembly (w/filter opvar infields :fn> func-fields :> out-selector)] @@ -354,7 +355,7 @@ (defmethod predicate-default-var :filter [& args] :<) (defmethod hof-predicate? :filter [op & args] (:hof? (meta op))) (defmethod build-predicate-specific :filter - [op _ hof-args infields outfields options] + [op hof-args infields outfields options] (let [[func-fields out-selector] (if (not-empty outfields) [outfields Fields/ALL] [nil nil]) @@ -369,7 +370,7 @@ (defmethod predicate-default-var ::cascalog-function [& args] :>) (defmethod hof-predicate? ::cascalog-function [op & args] false) (defmethod build-predicate-specific ::cascalog-function - [op _ _ infields outfields options] + [op _ infields outfields options] (predicate operation (w/raw-each (w/fields infields) (CascalogFunctionExecutor. (w/fields outfields) op) @@ -381,7 +382,7 @@ (defmethod predicate-default-var ::cascading-filter [& args] :<) (defmethod hof-predicate? ::cascading-filter [op & args] false) (defmethod build-predicate-specific ::cascading-filter - [op _ _ infields outfields options] + [op _ infields outfields options] (u/safe-assert (#{0 1} (count outfields)) "Must emit 0 or 1 fields from filter") (let [c-infields (w/fields infields) @@ -398,9 +399,8 @@ (defmethod predicate-default-var :cascalog-tap [& args] :>) (defmethod hof-predicate? :cascalog-tap [op & args] false) (defmethod build-predicate-specific :cascalog-tap - [gen _ _ infields outfields options] + [gen _ infields outfields options] (build-predicate-specific (:source gen) - nil nil infields outfields @@ -409,9 +409,8 @@ (defmethod predicate-default-var ::data-structure [& args] :>) (defmethod hof-predicate? ::data-structure [op & args] false) (defmethod build-predicate-specific ::data-structure - [tuples _ _ infields outfields options] + [tuples _ infields outfields options] (build-predicate-specific (w/memory-source-tap tuples) - nil nil infields outfields @@ -419,9 +418,8 @@ ;; TODO: Does this need other multimethods? (defmethod build-predicate-specific :generator-filter - [op _ _ infields outfields options] + [op _ infields outfields options] (let [gen (build-predicate-specific (:generator op) - nil nil infields outfields @@ -430,12 +428,12 @@ ;; TODO: Document: what is this? (defmethod build-predicate-specific :outconstant-equal - [_ _ _ infields outfields options] - (-> (build-predicate-specific = #'= _ infields outfields options) + [_ _ infields outfields options] + (-> (build-predicate-specific = _ infields outfields options) (assoc :allow-on-genfilter? true))) (defmethod build-predicate-specific ::cascalog-buffer - [op _ _ infields outfields options] + [op _ infields outfields options] (predicate aggregator true nil @@ -520,18 +518,18 @@ [(conj newfields f) dupvars assem]))] (reduce update [[] [] identity] infields))) -(defn mk-option-predicate [[op _ _ infields _]] +(defn mk-option-predicate [[op _ infields _]] (predicate option op infields)) (defn build-predicate "Build a predicate. Calls down to build-predicate-specific for predicate-specific building and adds constant substitution and null checking of ? vars." - [options op opvar hof-args orig-infields outvars] + [options op hof-args orig-infields outvars] (let [outvars (replace-ignored-vars outvars) [infields infield-subs] (variable-substitution orig-infields) [infields dupvars duplicate-assem] (fix-duplicate-infields infields) - predicate (build-predicate-specific op opvar hof-args + predicate (build-predicate-specific op hof-args infields outvars options) new-outvars (concat outvars (keys infield-subs) dupvars) in-insertion-assembly (when (seq infields) diff --git a/src/clj/cascalog/rules.clj b/src/clj/cascalog/rules.clj index 0239f83d..d5cdbd8f 100644 --- a/src/clj/cascalog/rules.clj +++ b/src/clj/cascalog/rules.clj @@ -441,10 +441,10 @@ (merge DEFAULT-OPTIONS))) (defn- mk-var-uniquer-reducer [out?] - (fn [[preds vmap] [op opvar hof-args invars outvars]] + (fn [[preds vmap] [op hof-args invars outvars]] (let [[updatevars vmap] (v/uniquify-vars (if out? outvars invars) out? vmap) [invars outvars] (if out? [invars updatevars] [updatevars outvars])] - [(conj preds [op opvar hof-args invars outvars]) vmap]))) + [(conj preds [op hof-args invars outvars]) vmap]))) ;; TODO: Move mk-drift-map to graph? @@ -460,7 +460,7 @@ [out-vars raw-predicates drift-map])) (defn split-outvar-constants - [[op opvar hof-args invars outvars]] + [[op hof-args invars outvars]] (let [[new-outvars newpreds] (reduce (fn [[outvars preds] v] (if (v/cascalog-var? v) @@ -468,25 +468,25 @@ (let [newvar (v/gen-nullable-var)] [(conj outvars newvar) (conj preds [(p/predicate p/outconstant-equal) - nil nil [v newvar] []])]))) + nil [v newvar] []])]))) [[] []] outvars)] - (cons [op opvar hof-args invars new-outvars] newpreds))) + (cons [op hof-args invars new-outvars] newpreds))) -(defn- rewrite-predicate [[op opvar hof-args invars outvars :as predicate]] +(defn- rewrite-predicate [[op hof-args invars outvars :as predicate]] (if-not (and (p/generator? op) (seq invars)) predicate (if (= 1 (count outvars)) - [(p/predicate p/generator-filter op (first outvars)) nil hof-args [] invars] + [(p/predicate p/generator-filter op (first outvars)) hof-args [] invars] (throw-illegal (str "Generator filter can only have one outvar -> " outvars))))) -(defn- parse-predicate [[op opvar vars]] +(defn- parse-predicate [[op vars]] (let [[vars hof-args] (if (p/hof-predicate? op) [(rest vars) (s/collectify (first vars))] [vars nil]) {invars :<< outvars :>>} (p/parse-variables vars (p/predicate-default-var op))] - [op opvar hof-args invars outvars])) + [op hof-args invars outvars])) (defn- unzip-generators "Returns a vector containing two sequences; the subset of the @@ -500,12 +500,12 @@ to be used as a set, false otherwise." [parsed-pred] (and (p/generator? (first parsed-pred)) - (not-empty (nth parsed-pred 3)))) + (not-empty (nth parsed-pred 2)))) (defn- gen-as-set-ungrounding-vars "Returns a sequence of ungrounding vars present in the generators-as-sets contained within the supplied sequence of parsed - predicates (of the form `[op opvar hof-args invars outvars]`)." + predicates (of the form `[op hof-args invars outvars]`)." [parsed-preds] (mapcat (comp (partial filter v/unground-var?) #(->> % (take-last 2) (apply concat))) @@ -513,7 +513,7 @@ (defn- parse-ungrounding-outvars "For the supplied sequence of parsed cascalog predicates of the form - `[op opvar hof-args invars outvars]`, returns a vector of two + `[op hof-args invars outvars]`, returns a vector of two entries: a sequence of all output ungrounding vars that appear within generator predicates, and a sequence of all ungrounding vars that appear within non-generator predicates." @@ -536,7 +536,7 @@ (not-empty gen-as-set-vars) (throw-illegal (str "Can't have unground vars in generators-as-sets." (vec gen-as-set-vars) - " violate(s) the rules.")) + " violate(s) the rules.\n\n" (pr-str parsed-preds))) (not-empty extra-vars) (throw-illegal (str "Ungrounding vars must originate within a generator. " @@ -587,12 +587,12 @@ (swap! replacements assoc v new-name) new-name)) -(defn- pred-macro-updater [[replacements ret] [op opvar vars]] +(defn- pred-macro-updater [[replacements ret] [op vars]] (let [newvars (postwalk #(if (v/cascalog-var? %) (new-var-name! replacements %) %) vars)] - [replacements (conj ret [op opvar newvars])])) + [replacements (conj ret [op newvars])])) (defn collectify-nil-as-seq [v] (if v (s/collectify v))) @@ -626,7 +626,7 @@ (pred-fn invars outvars))) (defn- expand-predicate-macros [raw-predicates] - (mapcat (fn [[p _ vars :as raw-predicate]] + (mapcat (fn [[p vars :as raw-predicate]] (let [p (if (var? p) (var-get p) p)] (if (p/predicate-macro? p) (expand-predicate-macros (expand-predicate-macro p vars)) @@ -636,10 +636,10 @@ (defn normalize-raw-predicates "support passing around ops as vars." [raw-predicates] - (for [[p v vars] raw-predicates] + (for [[p vars] raw-predicates] (if (var? p) - [(var-get p) p vars] - [p v vars]))) + [(var-get p) vars] + [p vars]))) (defn build-rule [out-vars raw-predicates] (let [raw-predicates (-> raw-predicates @@ -653,7 +653,7 @@ raw-predicates)))) (defn mk-raw-predicate [[op-sym & vars]] - [op-sym (u/try-resolve op-sym) (v/vars->str vars)]) + [op-sym (v/vars->str vars)]) (defn- pluck-tuple [tap] (with-open [it (.openForRead tap (hadoop/job-conf (conf/project-conf)))] diff --git a/src/clj/cascalog/util.clj b/src/clj/cascalog/util.clj index 76a0de3e..3296e632 100644 --- a/src/clj/cascalog/util.clj +++ b/src/clj/cascalog/util.clj @@ -40,9 +40,6 @@ [(conj newseq newval) (merge subs sub)])) [[] {}] aseq)) -(defn try-resolve [obj] - (when (symbol? obj) (resolve obj))) - (defn multi-set "Returns a map of elem to count" [aseq] @@ -163,3 +160,12 @@ [(if (keyword? k) (name k) (str k)) v]))) + +(defn search-for-var [val] + (->> (loaded-libs) + (map ns-map) + (mapcat identity) + (map second) + (filter #(and (var? %) (= (var-get %) val))) + first )) + diff --git a/test/cascalog/predicate_test.clj b/test/cascalog/predicate_test.clj index be358006..7dccc6bc 100644 --- a/test/cascalog/predicate_test.clj +++ b/test/cascalog/predicate_test.clj @@ -12,7 +12,6 @@ (deftest test-map-pred (let [pred (build-predicate {} timesplusone - (var timesplusone) nil ["?f1" "?f2"] ["?q"]) @@ -36,7 +35,6 @@ (deftest test-variable-substitution (let [pred (build-predicate {} addplusone - (var addplusone) nil ["?f1" "?f2" 3 4 "?f3"] ["?s" "?s2"]) @@ -63,7 +61,6 @@ (let [pred (build-predicate {} nilop - (var nilop) nil ["?i"] ["?o1" "!o2"]) @@ -79,7 +76,7 @@ :else [(inc n)])) (deftest test-mapcat-pred - (let [pred (build-predicate {} many-vals nil nil ["?a"] ["?b"]) + (let [pred (build-predicate {} many-vals nil ["?a"] ["?b"]) source-data {:fields ["?a"] :tuples [[1] [2] [3] [4]]} sink-data {:fields ["?b"] @@ -89,7 +86,7 @@ (future-fact "Test filter predicate.") (deftest test-vanilla-filter - (let [pred (build-predicate {} odd? (var odd?) nil ["?f"] []) + (let [pred (build-predicate {} odd? nil ["?f"] []) source-data {:fields ["?f"] :tuples [[1] [2] [3] [4] [6] [9] [10]]} sink-data {:fields ["?f"] @@ -97,7 +94,7 @@ (test-assembly source-data sink-data (:assembly pred)))) (deftest test-filter-func - (let [pred (build-predicate {} odd? (var odd?) nil ["?f"] ["?o"]) + (let [pred (build-predicate {} odd? nil ["?f"] ["?o"]) source-data {:fields ["?f"] :tuples [[1] [2] [3] [4] [6] [9] [10]]} sink-data {:fields ["?f" "?o"] diff --git a/test/cascalog/util_test.clj b/test/cascalog/util_test.clj index 2d9d1fb9..6269692c 100644 --- a/test/cascalog/util_test.clj +++ b/test/cascalog/util_test.clj @@ -2,18 +2,6 @@ (:use cascalog.util midje.sweet)) -(def p 5) - -(facts "Test try-resolve." - (binding [*ns* (find-ns 'cascalog.util-test)] - (try-resolve 'qqq) => nil - (try-resolve '+) => #'+ - (let [a 1] - (try-resolve 'a) => nil) - (try-resolve 10) => nil - (try-resolve [1 2 3]) => nil - (try-resolve 'p) => #'p)) - (tabular (fact "test-all-pairs" (all-pairs ?input) => ?result)