diff --git a/src/metabase/query_processor/middleware/metrics.clj b/src/metabase/query_processor/middleware/metrics.clj index 32aa8f48b6d31..70c9370b3c5ab 100644 --- a/src/metabase/query_processor/middleware/metrics.clj +++ b/src/metabase/query_processor/middleware/metrics.clj @@ -1,120 +1,182 @@ (ns metabase.query-processor.middleware.metrics (:require [medley.core :as m] + [metabase.lib.convert :as lib.convert] + [metabase.lib.core :as lib] [metabase.lib.metadata :as lib.metadata] [metabase.lib.util :as lib.util] [metabase.lib.util.match :as lib.util.match] [metabase.lib.walk :as lib.walk] [metabase.util :as u] - [metabase.util.log :as log])) + [metabase.util.log :as log] + [metabase.util.malli :as mu])) (defn- replace-metric-aggregation-refs [x lookup] - (lib.util.match/replace - x - [:metric & (_ :guard (fn [[{:keys [join-alias]} metric-id]] - (contains? lookup [join-alias metric-id])))] - (let [[_ {:keys [join-alias]} metric-id] &match - {replacement :aggregation metric-name :name} (get lookup [join-alias metric-id])] + (lib.util.match/replace x + [:metric _ metric-id] + (if-let [{replacement :aggregation metric-name :name} (get lookup metric-id)] (update (lib.util/fresh-uuids replacement) 1 merge - {:name (u/slugify (or join-alias metric-name))} - (select-keys (get &match 1) [:lib/uuid :name]))))) + {:name metric-name} + (select-keys (get &match 1) [:lib/uuid :name])) + (throw (ex-info "Incompatible metric" {:match &match + :lookup lookup}))))) + +(defn- find-metric-ids + [x] + (lib.util.match/match x + [:metric _ (id :guard pos-int?)] + id)) + +(defn- fetch-referenced-metrics + [query stage] + (let [metric-ids (find-metric-ids stage)] + (->> metric-ids + (lib.metadata/bulk-metadata-or-throw query :metadata/card) + (into {} + (map (fn [card-metadata] + (let [metric-query (lib.convert/->pMBQL + ((requiring-resolve 'metabase.query-processor.preprocess/preprocess) + (lib/query query (:dataset-query card-metadata))))] + [(:id card-metadata) + {:query metric-query + :aggregation (first (lib/aggregations metric-query)) + :name (:name card-metadata)}])))) + not-empty))) + +(defn- expression-with-name-from-source + [query [_ {:lib/keys [expression-name]} :as expression]] + (lib/expression query 0 expression-name expression)) + +(defn- temp-query-at-stage-path + [query stage-path] + (cond-> query + stage-path (lib.walk/query-for-stage-at-path stage-path) + stage-path :query)) + +(defn splice-compatible-metrics + "Splices in metric definitions that are compatible with the query." + [query path expanded-stages] + (if-let [lookup (fetch-referenced-metrics query (:aggregation (first expanded-stages)))] + (let [new-query (reduce + (fn [query [_metric-id {metric-query :query}]] + (if (and (= (lib.util/source-table-id query) (lib.util/source-table-id metric-query)) + (= 1 (lib/stage-count metric-query))) + (let [{:keys [expressions filters]} (lib.util/query-stage metric-query 0)] + (as-> query $q + (reduce expression-with-name-from-source $q expressions) + (reduce lib/filter $q filters) + (update-in $q [:stages 0 :aggregation] replace-metric-aggregation-refs lookup))) + (throw (ex-info "Incompatible metric" {:query query + :metric metric-query})))) + (temp-query-at-stage-path query path) + lookup)] + (:stages new-query)) + expanded-stages)) + +(defn- find-metric-transition + "Finds an unadjusted transition between a metric source-card and the next stage." + [query expanded-stages] + (some (fn [[[idx-a stage-a] [_idx-b stage-b]]] + (let [stage-a-source (:qp/stage-is-from-source-card stage-a) + metric-metadata (some->> stage-a-source (lib.metadata/card query))] + (when (and + stage-a-source + (not= stage-a-source (:qp/stage-is-from-source-card stage-b)) + (= (:type metric-metadata) :metric) + ;; This indicates this stage has not been processed + ;; because metrics must have aggregations + ;; if it is missing, then it has been removed in this process + (:aggregation stage-a)) + [idx-a metric-metadata]))) + (partition-all 2 1 (m/indexed expanded-stages)))) + +(defn- update-metric-transition-stages + "Adjusts source-card metrics referencing themselves in the next stage. + + The final stage of the metric is adjusted by removing: + ``` + :aggregation + :breakout + :order-by + ``` + + `:fields` are added explictly to pass previous-stage fields onto the following-stage + + The following stages will have `[:metric {} id]` clauses + replaced with the actual aggregation of the metric." + [query stage-path expanded-stages idx metric-metadata] + (mu/disable-enforcement + (let [[pre-transition-stages [last-metric-stage following-stage & following-stages]] (split-at idx expanded-stages) + metric-name (:name metric-metadata) + metric-aggregation (-> last-metric-stage :aggregation first) + stage-query (temp-query-at-stage-path query stage-path) + last-metric-stage-number idx + stage-query (update-in stage-query + [:stages idx] + (fn [stage] + (dissoc stage :breakout :order-by :aggregation :fields :lib/stage-metadata))) + ;; Needed for field references to resolve further in the pipeline + stage-query (lib/with-fields stage-query idx (lib/fieldable-columns stage-query idx)) + new-metric-stage (lib.util/query-stage stage-query last-metric-stage-number) + lookup {(:id metric-metadata) + {:name metric-name :aggregation metric-aggregation}} + new-following-stage (replace-metric-aggregation-refs + following-stage + lookup) + combined-stages (vec (remove nil? (concat pre-transition-stages + [new-metric-stage new-following-stage] + following-stages)))] + combined-stages))) (defn- adjust-metric-stages "`expanded-stages` are the result of :stages from fetch-source-query. All source-card stages have been spliced in already. - `metric-ref-lookup` this is a volatile that holds references to the original aggragation clause (count, sum etc...) - it is used to replace `[:metric {} id]` clauses. This depends on the order of `walk` as each join is touched depth-first, - a ref-lookup will be added for any metrics found during the stage. - To adjust: We look for the transition between the last stage of a metric and the next stage following it. We adjust those two stages - as explained in `expand`. " - [query expanded-stages metric-ref-lookup] + [query path expanded-stages] ;; Find a transition point, if it exists - (let [[idx metric-metadata] (some (fn [[[_idx-a stage-a] [idx-b stage-b]]] - (let [stage-a-source (:qp/stage-is-from-source-card stage-a) - metric-metadata (some->> stage-a-source (lib.metadata/card query))] - (when (and - stage-a-source - (not= stage-a-source (:qp/stage-is-from-source-card stage-b)) - (= (:type metric-metadata) :metric) - ;; This indicates this stage has not been processed - ;; because metrics must have aggregations - ;; if it is missing, then it has been removed in this process - (:aggregation stage-a)) - [idx-b metric-metadata]))) - (partition-all 2 1 (m/indexed expanded-stages)))] - (if idx - (let [[pre-transition-stages following-stages] (split-at idx expanded-stages) - metric-name (:name metric-metadata) - last-metric-stage (last pre-transition-stages) - metric-aggregation (-> last-metric-stage :aggregation first) - new-metric-stage (cond-> last-metric-stage - :always (dissoc :aggregation :fields :lib/stage-metadata) - (seq following-stages) (dissoc :breakout :order-by :limit)) - ;; Store lookup for metric references created in this set of stages. - ;; These will be adjusted later if these stages are in a join - _ (vswap! metric-ref-lookup assoc [nil (:id metric-metadata)] {:name metric-name :aggregation metric-aggregation}) - new-following-stages (replace-metric-aggregation-refs - following-stages - @metric-ref-lookup) - combined-stages (vec (remove nil? (concat (butlast pre-transition-stages) [new-metric-stage] new-following-stages)))] - (recur query combined-stages metric-ref-lookup)) + (let [[idx metric-metadata] (find-metric-transition query expanded-stages) + [first-stage] expanded-stages] + (cond + idx + (let [new-stages (update-metric-transition-stages query path expanded-stages idx metric-metadata)] + (recur (assoc-in query (conj path :stages) new-stages) path new-stages)) + + (:source-table first-stage) + (splice-compatible-metrics query path expanded-stages) + + :else expanded-stages))) (defn adjust - "Adjusts the final and following stages of `:source-card` of `:type` `:metric`. + "Looks for `[:metric {} id]` clause references and adjusts the query accordingly. Expects stages to have been processed by `fetch-source-query/resolve-source-cards` such that source card stages have been spliced in across the query. - The final stage of metric is adjusted by: - ``` - :aggregation - always removed - :breakout - removed if there are following-stages - :order-by - removed if there are following-stages - :limit - removed if there are following-stages - :fields - always removed - ``` - - Stages following this, and stages further up the query hierarchy will have - `[:metric {} id]` clauses replaced with the actual aggregation of the metric. - " + Metrics can be referenced in two scenarios: + 1. Compatible source table metrics. + Multiple metrics can be referenced in the first stage of a query that references a `:source-table` + Those metrics must: + - Be single stage metrics. + - Have the same `:source-table` as the query + 2. Metric source cards can reference themselves. + A query built from a `:source-card` of `:type :metric` can reference itself." [query] - (let [;; Once the stages are processed any ref-lookup missing a join alias must have - ;; come from this join's stages, so further references must include the join alias. - metric-ref-lookup (volatile! {}) - query (lib.walk/walk + (let [query (lib.walk/walk query - (fn [_query path-type _path stage-or-join] - (case path-type - :lib.walk/join - (let [result (update stage-or-join :stages #(adjust-metric-stages query % metric-ref-lookup))] - ;; Once the stages are processed any ref-lookup missing a join alias must have - ;; come from this join's stages, so further references must include the join alias. - (vswap! metric-ref-lookup #(m/map-kv (fn [[lookup-alias lookup-card] v] - [(if-not lookup-alias - [(:alias stage-or-join) lookup-card] - [lookup-alias lookup-card]) - (lib.util.match/replace - v - [:field (_ :guard (complement :join-alias)) _] - (update &match 1 assoc :join-alias (:alias stage-or-join)))]) - %)) - result) - stage-or-join))) - new-stages (adjust-metric-stages query (:stages query) metric-ref-lookup)] + (fn [_query path-type path stage-or-join] + (when (= path-type :lib.walk/join) + (update stage-or-join :stages #(adjust-metric-stages query path %)))))] (u/prog1 - (replace-metric-aggregation-refs - (assoc query :stages new-stages) - @metric-ref-lookup) - (when-let [match (lib.util.match/match-one <> - [:metric {} _] &match)] + (update query :stages #(adjust-metric-stages query nil %)) + (when-let [metric (lib.util.match/match-one <> + [:metric _ _] &match)] (log/warn "Failed to replace metric" - (pr-str {:match match - :lookup @metric-ref-lookup})))))) + (pr-str {:metric metric})))))) diff --git a/test/metabase/query_processor/middleware/metrics_test.clj b/test/metabase/query_processor/middleware/metrics_test.clj index 95d350a1f7fa9..ed5e84ca7ca13 100644 --- a/test/metabase/query_processor/middleware/metrics_test.clj +++ b/test/metabase/query_processor/middleware/metrics_test.clj @@ -1,6 +1,7 @@ (ns metabase.query-processor.middleware.metrics-test (:require [clojure.test :refer [deftest is]] + [medley.core :as m] [metabase.lib.convert :as lib.convert] [metabase.lib.core :as lib] [metabase.lib.metadata :as lib.metadata] @@ -26,53 +27,63 @@ ([query] (mock-metric meta/metadata-provider query)) ([metadata-provider query] - (let [metric {:lib/type :metadata/card - :id (swap! counter inc) - :database-id (meta/id) - :name "Mock metric" - :type :metric - :dataset-query query}] + (mock-metric metadata-provider query nil)) + ([metadata-provider query card-details] + (let [metric (merge {:lib/type :metadata/card + :id (swap! counter inc) + :database-id (meta/id) + :name "Mock metric" + :type :metric + :dataset-query query} + card-details)] [metric (lib/composed-metadata-provider metadata-provider (lib.tu/mock-metadata-provider {:cards [metric]}))]))) + (def adjust - (comp metrics/adjust fetch-source-query/resolve-source-cards)) + (comp #'metrics/adjust #'fetch-source-query/resolve-source-cards)) + +(deftest ^:parallel adjust-basic-source-table-test + (let [[source-metric mp] (mock-metric) + query (-> (lib/query mp (meta/table-metadata :products)) + (lib/aggregate (lib.metadata/metric mp (:id source-metric))))] + (is (=? {:stages [{:source-table (meta/id :products) + :aggregation [[:avg {} [:field {} (meta/id :products :rating)]]]}]} + (adjust query))))) + +(deftest ^:parallel adjust-basic-source-metric-test + (let [[source-metric mp] (mock-metric) + query (lib/query mp source-metric)] + (is (=? + {:stages [{:source-table (meta/id :products)} + {:aggregation [[:avg {} [:field {} (meta/id :products :rating)]]]}]} + (adjust query))))) (deftest ^:parallel adjust-aggregation-metric-ref-test (let [[source-metric mp] (mock-metric) - query (-> (lib/query mp source-metric) + query (-> (lib/query mp (meta/table-metadata :products)) (lib/aggregate (lib/+ (lib.options/ensure-uuid [:metric {} (:id source-metric)]) 1)))] (is (=? - {:stages [{:source-table (meta/id :products)} - {:aggregation [[:avg {} [:field {} (meta/id :products :rating)]] - [:+ {} [:avg {} [:field {} (meta/id :products :rating)]] 1]]}]} + {:stages [{:source-table (meta/id :products) + :aggregation [[:+ {} [:avg {} [:field {} (meta/id :products :rating)]] 1]]}]} (adjust query))))) (deftest ^:parallel adjust-aggregation-metric-ordering-test (let [[source-metric mp] (mock-metric) - query (-> (lib/query mp source-metric) + query (-> (lib/query mp (meta/table-metadata :products)) (lib/aggregate (lib/+ (lib.options/ensure-uuid [:metric {} (:id source-metric)]) 1)))] (doseq [agg-ref (map lib.options/uuid (lib/aggregations query)) :let [ordered (lib/order-by query (lib.options/ensure-uuid [:aggregation {} agg-ref])) adjusted (adjust ordered)]] (is (=? {:stages - [{:source-table (meta/id :products)} - {:aggregation - [[:avg {} [:field {} (meta/id :products :rating)]] - [:+ {} [:avg {} [:field {} (meta/id :products :rating)]] 1]] + [{:source-table (meta/id :products) + :aggregation + [[:+ {} [:avg {} [:field {} (meta/id :products :rating)]] 1]] :order-by [[:asc {} [:aggregation {} agg-ref]]]}]} adjusted))))) -(deftest ^:parallel adjust-basic-test - (let [[source-metric mp] (mock-metric) - query (-> (lib/query mp source-metric))] - (is (=? - {:stages [{:source-table (meta/id :products) :aggregation complement} - {:aggregation [[:avg {} [:field {} (meta/id :products :rating)]]]}]} - (adjust query))))) - (deftest ^:parallel adjust-join-test (let [[source-metric mp] (mock-metric) query (-> (lib/query mp source-metric) @@ -113,22 +124,6 @@ {:filters [[:= {} [:field {} (meta/id :products :category)] "Widget"]]}]} (adjust query))))) -(deftest ^:parallel adjust-multi-metric-test - (let [[first-metric mp] (mock-metric (lib/filter (basic-metric-query) (lib/> (meta/field-metadata :products :price) 1))) - [second-metric mp] (mock-metric mp (-> (lib/query mp first-metric) - (lib/filter (lib/< (meta/field-metadata :products :price) 100)))) - query (-> (lib/query mp second-metric) - (lib/filter (lib/= (meta/field-metadata :products :category) "Widget")))] - (is (=? - {:stages [{:source-table (meta/id :products) - :filters [[:> {} [:field {} (meta/id :products :price)] 1]] - :aggregation complement} - {:filters [[:< {} [:field {} (meta/id :products :price)] 100]] - :aggregation complement} - {:filters [[:= {} [:field {} (meta/id :products :category)] "Widget"]] - :aggregation some?}]} - (adjust query))))) - (deftest ^:parallel adjust-mixed-multi-source-test (let [[first-metric mp] (mock-metric lib.tu/metadata-provider-with-mock-cards (-> (lib/query lib.tu/metadata-provider-with-mock-cards (:products lib.tu/mock-cards)) @@ -148,21 +143,31 @@ :aggregation some?}]} (adjust query))))) -(deftest ^:parallel adjust-joined-metric-test - (let [[source-metric mp] (mock-metric) - query (as-> (lib/query mp (meta/table-metadata :orders)) $q - (lib/join $q (lib/join-clause (lib.metadata/card mp (:id source-metric)) - [(lib/= (meta/field-metadata :orders :product-id) - (meta/field-metadata :products :id))])) - (lib/aggregate $q (lib.options/ensure-uuid [:metric {:join-alias "Mock metric - Product"} (:id source-metric)])))] - (is (=? - ;; joins get an extra, empty stage from 'fetch-source-query' - {:stages [{:joins [{:stages [{:source-table (meta/id :products)} {}]}] - :aggregation [[:avg {:name "mock_metric___product"} - [:field {} (meta/id :products :rating)]]]}]} - (adjust query))))) +(deftest ^:parallel question-based-on-metric-based-on-metric-based-on-metric-test + (let [[first-metric mp] (mock-metric) + [second-metric mp] (mock-metric mp (lib/query mp first-metric)) + [third-metric mp] (mock-metric mp (lib/query mp second-metric)) + query (lib/query mp third-metric)] + (is (=? {:stages [{:aggregation complement} + {:aggregation complement} + {:aggregation complement} + {:aggregation [[:avg {} [:field {} (meta/id :products :rating)]]]}]} + (adjust query))))) -(deftest ^:parallel e2e-results-test +(deftest ^:parallel joined-question-based-on-metric-based-on-metric-based-on-metric-test + (let [[first-metric mp] (mock-metric) + [second-metric mp] (mock-metric mp (lib/query mp first-metric)) + [question mp] (mock-metric mp (lib/query mp second-metric) {:type :question}) + query (-> (lib/query mp (meta/table-metadata :products)) + (lib/join (lib/join-clause question [(lib/= 1 1)])))] + (is (=? {:stages [{:joins [{:stages [{:aggregation complement} + {:aggregation complement} + {:aggregation [[:avg {} [:field {} (meta/id :products :rating)]]]} + ;; Empty stage added by resolved-source-cards to nest join + #(= #{:lib/type :qp/stage-had-source-card :source-query/model?} (set (keys %)))]}]}]} + (adjust query))))) + +(deftest ^:parallel e2e-source-metric-results-test (let [mp (lib.metadata.jvm/application-database-metadata-provider (mt/id)) source-query (-> (lib/query mp (lib.metadata/table mp (mt/id :products))) (lib/filter (lib/< (lib.metadata/field mp (mt/id :products :price)) 3)) @@ -176,6 +181,23 @@ (mt/rows (qp/process-query source-query)) (mt/rows (qp/process-query query)))))))) +(deftest ^:parallel e2e-source-table-results-test + (let [mp (lib.metadata.jvm/application-database-metadata-provider (mt/id)) + source-query (-> (lib/query mp (lib.metadata/table mp (mt/id :products))) + (lib/filter (lib/< (lib.metadata/field mp (mt/id :products :price)) 30)) + (lib/aggregate (lib/avg (lib.metadata/field mp (mt/id :products :rating)))))] + (mt/with-temp [:model/Card source-metric {:dataset_query (lib.convert/->legacy-MBQL source-query) + :database_id (mt/id) + :name "new_metric" + :type :metric}] + (let [query (-> (lib/query mp (lib.metadata/table mp (mt/id :products))) + (lib/filter (lib/< (lib.metadata/field mp (mt/id :products :rating)) 3)) + (lib/aggregate (lib.metadata/metric mp (:id source-metric))))] + (is (= + (mt/rows (qp/process-query (-> source-query + (lib/filter (lib/< (lib.metadata/field mp (mt/id :products :rating)) 3))))) + (mt/rows (qp/process-query query)))))))) + (deftest ^:parallel e2e-source-card-test (let [mp (lib.metadata.jvm/application-database-metadata-provider (mt/id)) source-query (-> (lib/query mp (lib.metadata/table mp (mt/id :products))) @@ -194,6 +216,43 @@ (mt/rows (qp/process-query query)))))))) +(deftest ^:parallel execute-multi-stage-metric + (let [mp (lib.metadata.jvm/application-database-metadata-provider (mt/id)) + stage-one (-> (lib/query mp (lib.metadata/table mp (mt/id :orders))) + (lib/breakout (lib/with-temporal-bucket + (lib.metadata/field mp (mt/id :orders :created_at)) + :month)) + (lib/aggregate (lib/count)) + (lib/append-stage)) + stage-one-cols (lib/visible-columns stage-one) + source-query (-> stage-one + (lib/breakout (m/find-first (comp #{"Created At: Month"} :display-name) stage-one-cols)) + (lib/aggregate (lib/avg (m/find-first (comp #{"Count"} :display-name) stage-one-cols))))] + (mt/with-temp [:model/Card source-metric {:dataset_query (lib.convert/->legacy-MBQL source-query) + :database_id (mt/id) + :name "new_metric" + :type :metric}] + (let [query (lib/query mp (lib.metadata/card mp (:id source-metric)))] + (is (=? (mt/rows (qp/process-query source-query)) + (mt/rows (qp/process-query query)))))))) + +(deftest ^:parallel execute-single-stage-metric + (let [mp (lib.metadata.jvm/application-database-metadata-provider (mt/id)) + source-query (-> (lib/query mp (lib.metadata/table mp (mt/id :products))) + (lib/aggregate (lib/count)))] + (mt/with-temp [:model/Card source-metric {:dataset_query (lib.convert/->legacy-MBQL source-query) + :database_id (mt/id) + :name "new_metric" + :type :metric} + :model/Card next-metric {:dataset_query (-> (lib/query mp (lib.metadata/card mp (:id source-metric))) + (lib/filter (lib/= (lib.metadata/field mp (mt/id :products :category)) "Gadget"))) + :database_id (mt/id) + :name "new_metric" + :type :metric}] + (let [query (lib/query mp (lib.metadata/card mp (:id next-metric)))] + (is (=? [[53]] + (mt/rows (qp/process-query query)))))))) + (deftest ^:parallel available-metrics-test (let [mp (lib.metadata.jvm/application-database-metadata-provider (mt/id)) source-query (-> (lib/query mp (lib.metadata/table mp (mt/id :products)))