Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming/single-pass transducing query processor #11832

Merged
merged 116 commits into from
Feb 19, 2020
Merged

Streaming/single-pass transducing query processor #11832

merged 116 commits into from
Feb 19, 2020

Conversation

camsaul
Copy link
Member

@camsaul camsaul commented Jan 31, 2020

Like #11554, but applies to all queries executed everywhere. This will fix issues #5373, #10073, #11441, #4216 and probably many others as well.

Supersedes #9702 and #11554

This PR reworks the query to processor to transduce the result rows so they can be consumed (e.g., written to JSON) immediately. This is of course both a massive performance improvement from both a memory standpoint (since only a single row needs to be in memory at any given moment) and API endpoint response time standpoint (since we can actually start streaming result rows immediately rather than having to wait for the entire result set).

I replaced our usage of clojure.java.jdbc with a custom low-level JDBC-based implementation that is dramatically more efficient (for example, it resolves the methods used to read columns from results once for the entire result set instead of once per row) and much more explicit in closing resources it holds on to (the PreparedStatement and ResultSet are closed immediately when the results stop being consumed, either because we've reached the end or because the query was canceled.)

The new pattern also eliminates some hacky situations where we had to perform multiple passes to preprocess a query. Only a single pass is needed and recursive preprocessing is no longer necessary. This results in dramatically less application DB load.

The asynchronous architecture is also drastically simplified, resulting in less time waiting to run queries and easier to debug code.

Preliminary Profiling Results: 3.25x faster

How much faster is the reworked QP? I used the following snippet to run 10,000 queries in parallel on my Ryzen 9 3950X (16-core/32-thread) machine with 32 GB of RAM:

(driver/with-driver :postgres
  (u/profile "10000 queries"
    (dorun
     (pmap
      (fn [_]
        (dotimes [_ 100]
          (-> (mt/run-mbql-query :checkins) :data :rows count))) ; or :venues
      (range 100)))))

Results: an ~4x increase in throughput for queries with 10k rows!

🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎

QP Run 10k queries - venues Table (100 rows) Avergae query duration -- venues Run 10k queries -- checkins Table (1000 rows) Average query duration -- checkins
Current impl 35.3 seconds ~3.5 ms 234.0 seconds ~23.4 ms
New Transducing QP impl 5.9 seconds ~0.59 ms 72.0 seconds ~7.2 ms

The results should be even more dramatic for larger result sets, but I didn't have one handy. Note also that these results only show time to complete the entire query, but do not show time until the first results start streaming, which will be dramatically faster in the new QP.

🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎 🏎

Patterns

NOTE: These examples are slightly out-of-date, I'll update them in a bit

This PR unifies the patterns used by QP middleware so all middleware is async and uses the same pattern (as opposed to the current situation where it is about half sync/half async).

Query processor middleware has the signature

(defn middleware [qp]
  (fn [query xformf chans]
    (qp query xformf chans)))

query is, of course, the query; xformf is a function that takes results metadata, and returns a transducer used to reduce the result rows; chans is a map of core.async promise channels used for various purposes.

Preprocessing the query can be done in-line by modifying query, e.g.:

(defn middleware [qp]
  (fn [query xformf {:keys [raise-chan], :as chans}]
    (try
      (qp (modify-query query) xformf chans)
      (catch Throwable e
        (a/>!! raise-chan e)))))

Post-processing results (i.e., modifying the results metadata or the rows) can be done by applying a composing transducing functions. xformf is called with results metadata before reducing results like:

(xformf metadata) -> xform

You can compose the xformfs as follows:

(defn my-xform [metadata]
  (fn [rf]
    ([]        (rf))
    ([acc]     (transform-final-result (rf acc)))
    ([acc row] (rf acc (transform-row row)))))

(defn middleware [qp]
  (fn [query xformf chans]
    (qp
     query
     (fn [metadata]
       (let [metadata' (transform-metadata metadata)]
         (comp (my-xform metadata') (xformf metadata'))))
     chans)))

Chans are a map of core.async channels used for different purposes. For writing most middleware you should only need raise-chan or canceled-chan. Instead of throwing Exceptions directly, you should send them to raise-chan, since the query processor is asynchronous, as demonstrated in the example above. canceled-chan can be used to listen for a message if the query is canceled before completion. Other channels exist for listening to various stages in the QP pipeline or for the final result.

Driver Interface Changes

driver/execute-query is deprecated in favor of driver/execute-reducible-query. execute-reducible-query has the signature

(driver/execute-reducible-query driver query chans return-results)

And should call return-results with initial results metadata and a object that can be reduced to get resulting rows. Because the driver effectively decorates the reduction process by managing the context in which return-results is called, it can hold handles to resources (e.g. connections or java.sql.ResultSets) open for the duration of the reduction and close them afterward.

return-results has the signature:

(return-results results-metadata rows)

Utility function qp.util.reducible/reducible-rows is introduced to simplify the implementation to create reducible, streaming results.

Example impl:

(defmethod driver/execute-reducible-query :my-driver
  [_ query chans return-results]
  (with-open [results (run-query! query)]
    (return-results
     {:cols [{:name \"my_col\"}]}
     (qp.util.reducible/reducible-rows (get-row results) chans)))

Other notes

Using other reducing functions

The reducing function can be passed as a parameter to the query processor which means you can use alternate reducing functions to stream results in CSV format, XLSX format, or anything you might desire -- no more need to take the "standard" result format and then transform it as with the old QP. This functionality could also be used to improve fingerprinters and make it possible to fingerprint rows more easily with less memory usage.

rffs and access to results metadata

The reducing function (optionally) passed to the query processor is actually a function that when passed results metadata returns a reducing function. It has the signature

(rff metadata) -> rf

I've been using the shorthand rff to refer to these functions because they are functions that return reducing functions. rff is thus short for reducing-function-function. xformf is similar because it is a function that returns a transducer; xform is already a common Clojure abbreviation for transducer functions.

This is useful if you wanted to do something like write a reducing function that returned rows as maps rather than vectors (you need the column names from the results metadata for the keys).

Example:

(defn- maps-rff [metadata]
  (let [ks (mapv (comp keyword :name) (:cols metadata))]
    (fn
      ([] [])

      ([acc] {:data (merge metadata {:rows acc})})

      ([acc row]
       (conj acc (zipmap ks row))))))

Manging resources in the reducing fn

The reducing function can actually be a special function created with qp.build/decorated-reducing-fn that decorates the reduction process itself. It is thus possible to write a reducer that opens a file only if a query completes successfully, then streams results to it:

(defn- print-rows-to-writer-rff [filename]
  (qp.build/decorated-reducing-fn
   (fn [reduce-with-rff]
     (with-open [w (io/writer filename)]
       (reduce-with-rff
        (fn [metadata]
          (fn
            ([] 0)
            ([row-count] {:rows row-count})
            ([row-count row]
             (.write w (format "ROW %d -> %s\n" (inc row-count) (pr-str row)))
             (inc row-count)))))))))

(defn print-rows-to-file-example
  "Writes results to a file. Only opens file if query is successful!"
  []
  (qp/process-query
   (driver/with-driver :postgres
     {:database (mt/id)
      :type     :query
      :query    {:source-table (mt/id :venues), :limit 20}})
   (print-rows-to-writer-rff "/tmp/results.txt")))

TODO:

  • About 95% of things are working, with the main Exception being caching. I'm still working out how to cache results without retaining the entire result set in memory -- I'm considering options like streaming rows to a temp file and then loading that into the DB if we decide to cache.
  • I need to tweak quite a few tests and remove a lot of code and namespaces that are no longer needed.
  • Probably need to tweak the API response logic to properly stream QP rows when they first come in.

@camsaul camsaul requested a review from sbelak as a code owner January 31, 2020 04:36
Copy link
Contributor

@sbelak sbelak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super dope. I'll try to make fingerprinting single pass, we should be able to get rid of that extra transduce on rows, and that will give me a better feel for the changes.

src/metabase/driver/sql_jdbc/reducible_execute.clj Outdated Show resolved Hide resolved
internal-only-dims
(->> (hydrate cols :values :dimensions)
(keep-indexed col->dim-map)
(filter identity))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why filter here, keep already skips nils?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question, I just moved that code so it was already like that. I'll have to read it carefully and see if we can take it out

(defn- add-column-info-xform [{query-type :type, :as query} metadata]
(fn [rf]
(redux/post-complete
(redux/juxt rf ((take column-info-sample-size) conj))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we comp (filter some?) and take here it will fix #6924 (driver.common/values->base-type needs to be changed slightly to return :type/* on empty values though). Or better yet, we change driver.common/values->base-type to be a transducer and have all the logic about how much to take there (that way we can also make it more efficient by eg. returning early if the first column-info-sample-size/2 values have the same type).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a great idea 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to do this if want to split the work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would be greatly appreciated. Let me get this to a point where the tests are passing first tho so you're not fighting stuff I haven't fixed yet

src/metabase/query_processor/middleware/limit.clj Outdated Show resolved Hide resolved
;; (Driver) execute-query-reducible:
;;
;; (execute-query-reducible: driver query respond raise canceled-chan) -> reducible result
;;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments are out-if-date please ignore. They were from a different pattern I experimented with

@camsaul camsaul merged commit aced697 into master Feb 19, 2020
@camsaul camsaul deleted the reducible-qp branch February 19, 2020 20:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants