Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sandboxing cache issue #14388

Merged
merged 6 commits into from Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .dir-locals.el
Expand Up @@ -22,6 +22,7 @@
(merge-with 1)
(l/matche '(1 (:defn)))
(l/matcha '(1 (:defn)))
(p/defprotocol+ '(1 (:defn)))
(p.types/defprotocol+ '(1 (:defn)))
(p.types/def-abstract-type '(1 (:defn)))
(p.types/deftype+ '(2 nil nil (:defn)))
Expand Down
Expand Up @@ -2,6 +2,7 @@
(:require [clojure
[string :as str]
[test :refer :all]]
[clojure.core.async :as a]
[honeysql.core :as hsql]
[metabase
[driver :as driver]
Expand All @@ -11,13 +12,15 @@
[util :as u]]
[metabase-enterprise.sandbox.query-processor.middleware.row-level-restrictions :as row-level-restrictions]
[metabase-enterprise.sandbox.test-util :as mt.tu]
[metabase.api.common :as api]
[metabase.driver.sql.query-processor :as sql.qp]
[metabase.mbql
[normalize :as normalize]
[util :as mbql.u]]
[metabase.models
[permissions :as perms]
[permissions-group :as perms-group]]
[metabase.query-processor.middleware.cache-test :as cache-test]
[metabase.query-processor.util :as qputil]
[metabase.test.data.env :as tx.env]
[metabase.test.util :as tu]
Expand Down Expand Up @@ -551,3 +554,39 @@
:alias "Venue"}]
:order-by [[:asc $id]]
:limit 3})))))))

(deftest dont-cache-sandboxes-test
(cache-test/with-mock-cache [save-chan]
(mt/with-gtaps {:gtaps {:venues (venues-category-mbql-gtap-def)}
:attributes {"cat" 50}}
(letfn [(run-query []
(qp/process-query (assoc (mt/mbql-query venues {:aggregation [[:count]]})
:cache-ttl 100)))]
(testing "Run the query, should not be cached"
(let [result (run-query)]
(is (= nil
(:cached result)))
(is (= [[10]]
(mt/rows result)))))
(testing "Cache entry should be saved within 5 seconds"
(let [[_ chan] (a/alts!! [save-chan (a/timeout 5000)])]
(is (= save-chan
chan))))

(testing "Run it again, should be cached"
(let [result (run-query)]
(is (= true
(:cached result)))
(is (= [[10]]
(mt/rows result)))))
(testing "Run the query with different User attributes, should not get the cached result"
(mt.tu/with-user-attributes :rasta {"cat" 40}
;; re-bind current user so updated attributes come in to effect
(mt/with-test-user :rasta
(is (= {"cat" 40}
(:login_attributes @api/*current-user*)))
(let [result (run-query)]
(is (= nil
(:cached result)))
(is (= [[9]]
(mt/rows result)))))))))))
6 changes: 4 additions & 2 deletions src/metabase/async/util.clj
Expand Up @@ -46,7 +46,9 @@
;; * `result-chan` will get the result of `(f)`, *after* `done-chan` is closed
(let [done-chan (a/promise-chan)
result-chan (a/promise-chan)
f* (bound-fn []
binds (clojure.lang.Var/getThreadBindingFrame)
f* (fn []
(clojure.lang.Var/resetThreadBindingFrame binds)
(let [result (try
(f)
(catch Throwable e
Expand All @@ -56,7 +58,7 @@
(when (some? result)
(a/>!! result-chan result)))
(a/close! result-chan))
futur (.submit ^ThreadPoolExecutor (var-get (resolve 'clojure.core.async/thread-macro-executor)) ^Runnable f*)]
futur (.submit ^ThreadPoolExecutor @#'a/thread-macro-executor ^Runnable f*)]
;; if `result-chan` gets a result/closed *before* `done-chan`, it means it was closed by the caller, so we should
;; cancel the thread running `f*`
(a/go
Expand Down
16 changes: 14 additions & 2 deletions src/metabase/models/card.clj
Expand Up @@ -115,8 +115,20 @@
;; Cards with queries they wouldn't be allowed to run!
(when *current-user-id*
(when-not (query-perms/can-run-query? query)
(throw (Exception. (tru "You do not have permissions to run ad-hoc native queries against Database {0}."
(:database query))))))
(let [required-perms (try
(query-perms/perms-set query :throw-exceptions? true)
(catch Throwable e
e))]
(throw (ex-info (tru "You do not have permissions to run ad-hoc native queries against Database {0}."
(:database query))
{:status-code 403
:query query
:required-perms (if (instance? Throwable required-perms)
:error
required-perms)
:actual-perms @api/*current-user-permissions-set*}
(when (instance? Throwable required-perms)
required-perms))))))
;; make sure this Card doesn't have circular source query references
(check-for-circular-source-query-references card)
(collection/check-collection-namespace Card (:collection_id card))))
Expand Down
9 changes: 4 additions & 5 deletions src/metabase/models/query/permissions.clj
Expand Up @@ -126,11 +126,10 @@
;; if for some reason we can't expand the Card (i.e. it's an invalid legacy card) just return a set of permissions
;; that means no one will ever get to see it (except for superusers who get to see everything)
(catch Throwable e
(when throw-exceptions?
(throw e))
(log/error (tru "Error calculating permissions for query: {0}" (.getMessage e))
"\n"
(u/pprint-to-str (u/filtered-stacktrace e)))
(let [e (ex-info "Error calculating permissions for query" {:query query} e)]
(when throw-exceptions?
(throw e))
(log/error e))
#{"/db/0/"}))) ; DB 0 will never exist

(s/defn ^:private perms-set* :- #{perms/ObjectPath}
Expand Down
4 changes: 2 additions & 2 deletions src/metabase/query_processor.clj
Expand Up @@ -76,6 +76,8 @@
"The default set of middleware applied to queries ran via `process-query`."
[#'mbql-to-native/mbql->native
#'check-features/check-features
#'limit/limit
#'cache/maybe-return-cached-results
#'optimize-datetime-filters/optimize-datetime-filters
#'auto-parse-filter-values/auto-parse-filter-values
#'wrap-value-literals/wrap-value-literals
Expand All @@ -89,7 +91,6 @@
#'resolve-joins/resolve-joins
#'add-implicit-joins/add-implicit-joins
#'large-int-id/convert-id-to-string
#'limit/limit
#'format-rows/format-rows
#'desugar/desugar
#'binning/update-binning-strategy
Expand All @@ -110,7 +111,6 @@
#'resolve-database-and-driver/resolve-database-and-driver
#'fetch-source-query/resolve-card-id-source-tables
#'store/initialize-store
#'cache/maybe-return-cached-results
#'validate/validate-query
#'normalize/normalize
#'add-rows-truncated/add-rows-truncated
Expand Down
5 changes: 3 additions & 2 deletions src/metabase/query_processor/context/default.clj
Expand Up @@ -32,8 +32,9 @@
{:data metadata})

([result]
{:pre [(map? result)]}
(-> result
{:pre [(map? (unreduced result))]}
;; if the result is a clojure.lang.Reduced, unwrap it so we always get back the standard-format map
(-> (unreduced result)
(assoc :row_count @row-count
:status :completed)
(assoc-in [:data :rows] @rows)))
Expand Down
16 changes: 8 additions & 8 deletions src/metabase/query_processor/middleware/cache.clj
Expand Up @@ -130,15 +130,15 @@
([]
(rf))

([acc]
;; if results are in the 'normal format' then use the final metadata from the cache rather than
;; whatever `acc` is right now since we don't run the entire post-processing pipeline for cached results
(let [normal-format? (and (map? acc) (seq (get-in acc [:data :cols])))
acc* (-> (if normal-format?
@final-metadata
acc)
([result]
(let [normal-format? (and (map? (unreduced result))
(seq (get-in (unreduced result) [:data :cols])))
result* (-> (if normal-format?
(merge-with merge @final-metadata (unreduced result))
(unreduced result))
(assoc :cached true, :updated_at last-ran))]
(rf acc*)))
(rf (cond-> result*
(reduced? result) reduced))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you call reduced if the result is already reduced?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see now its just moved from elsewhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the original reduced? was Reduced then we call (reduced result*) to make that a Reduced as well. Basically wrap it back up if it was wrapped


([acc row]
(if (map? row)
Expand Down
2 changes: 1 addition & 1 deletion src/metabase/query_processor/middleware/cache/impl.clj
Expand Up @@ -134,7 +134,7 @@
(reduce [_ rf init]
(loop [acc init]
(if (reduced? acc)
(reduced acc)
acc
(let [row (thaw! is)]
(if (= ::eof row)
acc
Expand Down
2 changes: 1 addition & 1 deletion src/metabase/query_processor/middleware/limit.clj
Expand Up @@ -25,7 +25,7 @@
(let [result' (rf result row)
new-row-count (vswap! row-count inc)]
(if (>= new-row-count max-rows)
(reduced result')
(ensure-reduced result')
result'))))))

(defn limit
Expand Down
17 changes: 11 additions & 6 deletions test/metabase/api/card_test.clj
Expand Up @@ -461,12 +461,17 @@
(mt/with-temp Collection [collection]
(perms/grant-collection-readwrite-permissions! (perms-group/all-users) collection)
(mt/with-model-cleanup [Card]
(mt/user-http-request :rasta :post 202 "card"
(assoc (card-with-name-and-query card-name)
:collection_id (u/get-id collection), :collection_position 1))
(is (= #metabase.models.card.CardInstance{:collection_id true, :collection_position 1}
(some-> (db/select-one [Card :collection_id :collection_position] :name card-name)
(update :collection_id (partial = (u/get-id collection))))))))))))
(is (schema= {:collection_id (s/eq (u/get-id collection))
:collection_position (s/eq 1)
:name (s/eq card-name)
s/Keyword s/Any}
(mt/user-http-request :rasta :post 202 "card"
(assoc (card-with-name-and-query card-name)
:collection_id (u/get-id collection), :collection_position 1))))
(is (schema= {:collection_id (s/eq (u/get-id collection))
:collection_position (s/eq 1)
s/Keyword s/Any}
(db/select-one Card :name card-name)))))))))

(deftest need-permission-for-collection
(testing "You need to have Collection permissions to create a Card in a Collection"
Expand Down
Expand Up @@ -10,7 +10,7 @@

(def ^:private objects [{:metadata? true} -200.0 3 "HELLO!" {:x 100, :y #t "2020-02-02", :z #{:a :b :c}} (Z. 100)])

(defn- deserialize
(defn deserialize
"Deserialize objects serialized with `serialize-async` using reducing function `rf`."
([^bytes bytea]
(deserialize bytea (fn [metadata]
Expand Down
57 changes: 52 additions & 5 deletions test/metabase/query_processor/middleware/cache_test.clj
Expand Up @@ -13,16 +13,23 @@
[test :as mt]
[util :as u]]
[metabase.driver.sql-jdbc.execute :as sql-jdbc.execute]
[metabase.middleware.session :as session]
[metabase.models
[permissions :as perms]
[permissions-group :as group]]
[metabase.query-processor
[streaming :as qp.streaming]
[util :as qputil]]
[metabase.query-processor.middleware.cache :as cache]
[metabase.query-processor.middleware.cache
[impl :as impl]
[impl-test :as impl-test]]
[metabase.query-processor.middleware.cache-backend.interface :as i]
[metabase.query-processor.middleware.cache.impl :as impl]
[metabase.test
[fixtures :as fixtures]
[util :as tu]]
[pretty.core :as pretty]))
[pretty.core :as pretty]
[schema.core :as s]))

(use-fixtures :once (fixtures/initialize :db))

Expand All @@ -35,6 +42,9 @@
"Gets a message whenever old entries are purged from the test backend."
nil)

(defprotocol ^:private CacheContents
(^:private contents [cache-backend]))

(defn- test-backend
"In in-memory cache backend implementation."
[save-chan purge-chan]
Expand All @@ -45,7 +55,12 @@
(str "\n"
(metabase.util/pprint-to-str 'blue
(for [[hash {:keys [created]}] @store]
[hash (metabase.util/format-nanoseconds (.getNano (t/duration created (t/instant))))]))))
[hash (u/format-nanoseconds (.getNano (t/duration created (t/instant))))]))))

CacheContents
(contents [_]
(into {} (for [[k v] store]
[k (impl-test/deserialize v)])))

i/CacheBackend
(cached-results [this query-hash max-age-seconds respond]
Expand All @@ -62,7 +77,7 @@
(respond nil))))

(save-results! [this query-hash results]
(let [hex-hash (buddy.core.codecs/bytes->hex query-hash)]
(let [hex-hash (codecs/bytes->hex query-hash)]
(swap! store assoc hex-hash {:results results
:created (t/instant)})
(log/tracef "Save results for %s --> store: %s" hex-hash this))
Expand All @@ -76,7 +91,7 @@
(log/tracef "Purge old entries --> store: %s" this)
(a/>!! purge-chan ::purge)))))

(defn- do-with-mock-cache [f]
(defn do-with-mock-cache [f]
(mt/with-open-channels [save-chan (a/chan 1)
purge-chan (a/chan 1)]
(mt/with-temporary-setting-values [enable-query-caching true
Expand Down Expand Up @@ -401,3 +416,35 @@
(is (= max-bytes (count (transduce identity
(#'cache/save-results-xform 0 {} (byte 0) conj)
(repeat max-bytes [1]))))))))

(deftest perms-checks-should-still-apply-test
(testing "Double-check that perms checks still happen even for cached results"
(mt/with-temp-copy-of-db
(perms/revoke-permissions! (group/all-users) (mt/db))
(mt/with-test-user :rasta
(with-mock-cache [save-chan]
(letfn [(run-forbidden-query []
(qp/process-query (assoc (mt/mbql-query checkins {:aggregation [[:count]]})
:cache-ttl 100)))]
(testing "Shouldn't be allowed to run a query if we don't have perms for it"
(is (thrown-with-msg?
clojure.lang.ExceptionInfo
#"You do not have permissions to run this query"
(run-forbidden-query))))
(testing "Run forbidden query as superuser to populate the cache"
(session/with-current-user (mt/user->id :crowberto)
(is (= [[1000]]
(mt/rows (run-forbidden-query))))))
(testing "Cache entry should be saved within 5 seconds"
(let [[_ chan] (a/alts!! [save-chan (a/timeout 5000)])]
(is (= save-chan
chan))))
(testing "Run forbidden query again as superuser again, should be cached"
(session/with-current-user (mt/user->id :crowberto)
(is (schema= {:cached (s/eq true), s/Keyword s/Any}
(run-forbidden-query)))))
(testing "Run query as regular user, should get perms Exception even though result is cached"
(is (thrown-with-msg?
clojure.lang.ExceptionInfo
#"You do not have permissions to run this query"
(run-forbidden-query))))))))))
17 changes: 10 additions & 7 deletions test/metabase/test/data/impl.clj
Expand Up @@ -112,12 +112,14 @@
;; make sure we're returing an up-to-date copy of the DB
(Database (u/get-id db))
(catch Throwable e
(db/delete! Database :id (u/get-id db))
(throw (ex-info "Failed to create test database"
{:driver driver
:database-name database-name
:connection-details connection-details}
e)))))
(let [e (ex-info "Failed to create test database"
{:driver driver
:database-name database-name
:connection-details connection-details}
e)]
(println (u/pprint-to-str 'red (Throwable->map e)))
(db/delete! Database :id (u/get-id db))
(throw e)))))
(catch Throwable e
(let [message (format "Failed to create %s '%s' test database" driver database-name)]
(println message "\n" e)
Expand Down Expand Up @@ -261,7 +263,8 @@
(try
(copy-db-tables-and-fields! old-db-id new-db-id)
(do-with-db new-db f)
(finally (db/delete! Database :id new-db-id))))))
(finally
(db/delete! Database :id new-db-id))))))


;;; +----------------------------------------------------------------------------------------------------------------+
Expand Down