-
Notifications
You must be signed in to change notification settings - Fork 6k
use disk at query time of notification payload creation #64390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| :public-csv-download :public-xlsx-download :public-json-download | ||
| :embedded-csv-download :embedded-xlsx-download :embedded-json-download}] | ||
| :embedded-csv-download :embedded-xlsx-download :embedded-json-download | ||
| :pulse}] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for pulses (alerts) we set autocommit false so we don't put postgres results all in memory at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first RFF tripped me up somewhat being quite big, but once you start reading, the logic is sound and the code is clean. I hope I'm not missing anything important, and all I have is little nitpicks, this is really great!
| ;; Already streaming - write row directly to file | ||
|
|
||
| (let [{:keys [^DataOutputStream output-stream ^File file]} @streaming-state] | ||
| (vswap! rows conj! row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This vswap! tripped me up a little bit, I think it deserves a comment, like "buffering rows to avoid touching disk on every incoming row" or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also if you do not mind, I'd prefer (if-not (zero? .... in the next line, this way there is an early return of result and it'll be easier to visually match what's going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. good call out.
| (write-row-block-to-stream! output-stream remaining-rows)) | ||
| (.close output-stream) | ||
| (let [file-size (.length file) | ||
| file-size-mb (/ file-size 1024.0 1024.0)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be human-readable-size too?
| ([] (rf)) | ||
| ([acc] | ||
| (analytics/inc! :metabase-query-processor/query | ||
| {:driver driver/*driver* :status "success"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not prepared enough to judge this 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm glad you called this out! I meant to put a comment. But each rff had been in charge, in its completion arity, to mark the analytics counter for a successful query. But we have this nice wrapper where we can put it that will be used by all rffs, so we don't have to have each one do it, they can all be in charge of storing results and such, and the pipeline that assembles and uses the rff can mark a successful query on this completion step.
| @@ -1,14 +1,25 @@ | |||
| (ns metabase.notification.payload.temp-storage | |||
| "Util to put data into a temporary file and schedule it for deletion after a specified time period. | |||
| "Util to put data into a temporary file and delete after the notification sends. Cleanup happens with notification.send/do-after-notification-sent for dashboards and cards which calls cleanup! on each part. This is exetended to Object as a no-op and on the type defined here deletes the temporary file. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📚 The docstring says "exetended" but it should be "extended" (typo). 🎯
| When getting query results for notifications (alerts, subscriptions) once the query row count | ||
| exceeds [[metabase.notification.payload.execute/rows-to-disk-threshold]], we then start streaming all rows to | ||
| disk. This ensures that smaller queries don't needlessly write to disk and then reload, while large results don't | ||
| attempt to reside in memory and kill and instance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔍 The docstring says "kill and instance" but should be "kill an instance" (typo). 🦅
|
|
||
| ;; Step arity - accumulate rows | ||
| ([result row] | ||
| ;; unconditionally incrememt row count and add rows to internal volatile transient collector. If we are |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📖 Per the style guide, "acc" is an acceptable conventional abbreviation, but "incrememt" should be "increment" (typo). Also this comment is quite long and could be broken into multiple lines for better readability. 🎯
| (vswap! rows conj! row) | ||
| (if @streaming? | ||
| ;; Already streaming - write row directly to file | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notification-rf function definition. According to the style guide, there should be No Blank Lines Within Definition Forms. Blank lines should not be placed in the middle of a function definition. Consider removing this line or if you're trying to indicate grouping in a conditional, you might restructure this to avoid the blank line. 🚨
| [qp-result] | ||
| (update-in qp-result [:data :viz-settings] merge (get-in qp-result [:json_query :viz-settings]))) | ||
|
|
||
| (def rows-to-disk-threshold |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔧 The var name is rows-to-disk-threshold but in the comment on line 133 it was spelled rows-to-disk-threadhold (with extra "h"). This inconsistency has been fixed here but worth noting the improvement! ✨
🔮 Code Review SummaryThis PR introduces streaming query results to disk during notification payload creation to prevent OOM issues. The implementation is solid and follows most Metabase conventions! 🎯 Key Style Issues Found
✨ Good Practices Observed
🎴 Tarot Reading for This PRCard Drawn: The Tower (Reversed) 🗼✨ Interpretation: The Tower reversed represents avoiding disaster rather than experiencing it! 🎊 This PR literally prevents the catastrophic collapse (OOM crashes) that was plaguing the notification system. Just as The Tower reversed indicates recovering from near-disaster and implementing protective measures, this code introduces safeguards (file size limits, streaming thresholds) to prevent the "tower" of your Metabase instance from crumbling under the weight of massive query results. The 1.3x buffer and 10MB limits are like architectural reinforcements preventing structural failure. The streaming approach is the foundation repair that will keep everything stable! 🏗️✨ 🎰 Today's Lucky Lottery Numbers7, 13, 42, 2000, 10485760, 86 🍀 (2000 = your row threshold; 10485760 = your byte limit; 86 = the number of changed files you didn't have to touch! Good luck!) 🎲✨ |
| @@ -1,14 +1,25 @@ | |||
| (ns metabase.notification.payload.temp-storage | |||
| "Util to put data into a temporary file and schedule it for deletion after a specified time period. | |||
| "Util to put data into a temporary file and delete after the notification sends. Cleanup happens with notification.send/do-after-notification-sent for dashboards and cards which calls cleanup! on each part. This is exetended to Object as a no-op and on the type defined here deletes the temporary file. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎨 Typo in namespace docstring 🐛
Line 2 has a typo: "exetended" should be "extended" 📝
| :output-stream os})) | ||
|
|
||
| (defn- write-row-block-to-stream! | ||
| "Write a single row to the output stream using nippy serialization." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📝 Docstring issue: misleading description
The docstring says "Write a single row" but the parameter name row-block and the actual usage (line 263 shows (persistent! @rows) being passed) suggests this writes multiple rows as a block. The docstring should accurately describe what the function does. 🤔
Consider: "Write a block of rows to the output stream using nippy serialization." ✍️
|
|
||
| ;; Step arity - accumulate rows | ||
| ([result row] | ||
| ;; unconditionally incrememt row count and add rows to internal volatile transient collector. If we are |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔤 Typo in comment
"incrememt" should be "increment" 📚
| (throw (ex-info "Temp file no longer exists" {:file file}))) | ||
|
|
||
| (let [file-size (.length file) | ||
| file-size-mb (/ file-size 1024.0 1024.0)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Consider using kebab-case for variable names
The variable file-size-mb uses the abbreviation mb. According to the style guide, you should prefer longer, more verbose names and avoid abbreviations unless they are well-known and conventional. While "MB" is fairly standard, file-size-megabytes would be clearer and more consistent with Clojure conventions. Same applies to other *-mb variables in this file. 📏
That said, mb in the context of file sizes is pretty universally understood, so this is a minor point! 🎯
| [qp-result] | ||
| (update-in qp-result [:data :viz-settings] merge (get-in qp-result [:json_query :viz-settings]))) | ||
|
|
||
| (def rows-to-disk-threshold |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📖 Missing helpful context in docstring
The docstring says "Maximum rows to hold in memory..." but doesn't explain what happens after the threshold is exceeded. Consider expanding it to: "Maximum rows to hold in memory when running notification queries. After this threshold is reached, query results are streamed straight to disk. See [[metabase.notification.payload.temp-storage]] for more details."
Oh wait, I see you do have this! But the second sentence feels tacked on. Consider making it flow better as one complete description. ✨
| (log/debugf "File context: %s" (pr-str preamble)))) | ||
|
|
||
| ;; Read row count/marker | ||
| (let [count-or-marker (nippy/thaw-from-in! is)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable name count-or-marker suggests it could be either a count (number) or a marker (keyword), but doesn't clearly indicate what determines which it is or what the marker value would be. The code shows ::streaming is the marker, but someone reading just the variable name wouldn't know that.
Consider row-count-or-streaming-marker or adding a comment explaining the dual nature. 🎯
🎴 Tarot Reading for This PR 🎴The Eight of Pentacles ⚒️✨ The Eight of Pentacles represents dedication to mastery, attention to detail, and iterative improvement through focused work. 🎯 This card resonates deeply with your PR's meticulous approach to solving OOM issues. Just as the craftsperson in the Eight of Pentacles carefully hones each pentacle one at a time, your code methodically handles each query result—streaming rows to disk with precision, monitoring file sizes, and implementing careful thresholds. 🔨💾 The card's energy of incremental refinement is evident in how you've evolved the notification system: from keeping everything in memory (novice level), to Ngoc's work dumping completed results to disk (journeyman level), to now streaming during query execution (master craftsperson level). Each iteration builds upon the last with careful attention to the craft. 🌟 The detailed logging, the thoughtful variable naming, the careful balance between memory and disk usage—all reflect the Eight of Pentacles' message: excellence through dedication to detail. The dual-mode reducing function (in-memory vs streaming) shows the wisdom of a true craftsperson who knows when to use each tool. 🛠️ Your thorough testing with 50MB queries, consideration of edge cases (weird types, truncation), and comprehensive performance analysis demonstrate the Eight of Pentacles' insistence on measuring twice, cutting once. 📏✂️ The only caution from this card: don't get so focused on perfecting each individual pentacle that you lose sight of completing the full set. But with settings for easy rollback and clear documentation, you've already heeded that wisdom! 🎓 🎰 Today's Lucky Lottery Numbers 🎰6, 13, 20, 42, 54, 💾 (Bonus ball: 2000 - representing your threshold!) Play responsibly! 🍀✨ |
| (format "%.1f %s" current unit)))))) | ||
|
|
||
| (deftype TempFileStorage [^File file] | ||
| (deftype StreamingTempFileStorage [^File file context] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🌟 Style violation: The type name should follow :model/ style for consistency with the rest of the codebase. Consider using a protocol or record type instead of deftype with a clearer naming convention.
Additionally, context parameter lacks a docstring explaining what context information should be provided.
| (toString [_] | ||
| (str "#TempFileStorage{:file " file "}")) | ||
| (if (.exists file) | ||
| (format "#StreamingTempFileStorage{:file %s, :size %.2f KB, :context %s}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐛 Issue: The format string uses %.2f KB but passes the result of human-readable-size which already returns a formatted string like "7.7 mb". This will cause a type error.
| (format "#StreamingTempFileStorage{:file %s, :size %.2f KB, :context %s}" | |
| (format "#StreamingTempFileStorage{:file %s, :size %s, :context %s}" | |
| (.getName file) | |
| (human-readable-size (.length file)) | |
| (pr-str context)) |
| (if @streaming? | ||
| ;; Already streaming - write row directly to file | ||
| (let [{:keys [^DataOutputStream output-stream ^File file]} @streaming-state] | ||
| (if-not (zero? (mod @row-count 5000)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✨ Consider extracting the magic number 5000 to a named constant at the top of the namespace, similar to how rows-to-disk-threshold is defined in execute.clj. This would make it easier to tune and understand the relationship between the two thresholds.
| (if (and (pos? (notification.settings/notification-temp-file-size-max-bytes)) | ||
| (> (.length file) (* 1.3 (notification.settings/notification-temp-file-size-max-bytes)))) | ||
| (do (vswap! streaming-state assoc :notification/truncated? true) | ||
| (log/warnf "Results have exceeded 1.3 times of `notification-temp-file-size-max-bytes` of %s (max: %s). Truncating query results. %s" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✨ Style: The log message uses backticks around notification-temp-file-size-max-bytes which isn't standard Clojure formatting. Based on CLAUDE.md conventions, var references should either be plain text or use proper markdown formatting in docstrings. In log messages, consider using plain text or proper interpolation.
Additionally, the 1.3 multiplier is a magic number that should be extracted to a named constant for clarity.
|
|
||
| ;; Completion arity | ||
| ([result] | ||
| {:pre [(map? (unreduced result))]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎯 The :pre condition checks (map? (unreduced result)) but doesn't verify that result contains the expected keys like :data. Consider adding more specific validation or a schema check here for robustness.
| [_chart-type _render-type _timezone-id _card _dashcard _data] | ||
| @card-error-rendered-info) | ||
|
|
||
| (mu/defmethod render :card-error/results-too-large :- ::RenderedPartCard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📝 Missing docstring for this new multimethod implementation. Per CLAUDE.md: "Every public var in src or enterprise/backend/src must have a useful docstring."
| (mu/defmethod render :card-error/results-too-large :- ::RenderedPartCard | |
| (mu/defmethod render :card-error/results-too-large :- ::RenderedPartCard | |
| "Render an error message when query results are too large to include in the notification. | |
| This occurs when result file size exceeds [[metabase.notification.settings/notification-temp-file-size-max-bytes]]." | |
| [_chart-type _render-type _timezone-id _card _dashcard _data] | |
| {:attachments nil, | |
| :content [:div | |
| {:style "font-family: Lato, \"Helvetica Neue\", Helvetica, Arial, sans-serif; color: #EF8C8C; font-weight: 700; padding: 16px;"} | |
| "The results returned from this question were too large. Perhaps add an aggregation or a filter."]}) |
| @x | ||
| x)) | ||
|
|
||
| (defn maybe-realize-data-rows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📖 The updated docstring is good, but it should use proper markdown reference format for the var reference. Per CLAUDE.md, mentions of other vars should use [[metabase.notification.settings/notification-temp-file-size-max-bytes]] format.
| [qp-result] | ||
| (update-in qp-result [:data :viz-settings] merge (get-in qp-result [:json_query :viz-settings]))) | ||
|
|
||
| (def rows-to-disk-threshold |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Nice improvement! The public def with docstring follows the style guide better than the previous ^:private var. The docstring properly references another namespace using [[metabase.notification.payload.temp-storage]].
The real gist here is removing the gzip on the output. This was crushing our performance. 18 seconds instead of ~4 seconds. Real bad. And it's better to not gzip anyways. We really want to be tied pretty closely to the size of the results. GZiping uses a ton of cpu and gets it very compact. But we want a number that is pretty representative of the results. Repeated values get gzipped down to almost nothing and then they could still OOM the instance. But 50mb of results might get squashed down to ~8mb of text. We want to be tethered close the size in a csv or in memory. Also changed the serialization to write chunks of rows and not a single row. Its totally possible that the gzip fix did all of the work and chunks or individual rows all get put together into the bufferedoutputstream so it doesn't matter anyways. But the good thing is that the format of this file is entirely the work of this namespace and no one else gets to see it.
timing wise: running the same card 356 (a join of people, products,
orders) and verify timing stays consistent (we are only 100ms longer
with disk)
```clojure
temp-storage=> (time
(-> (toucan2.core/select-one-fn :dataset_query :model/Card :id 356)
metabase.query-processor/process-query
:data :rows count))
"Elapsed time: 740.709583 msecs"
18760
temp-storage=>
(time (->
(metabase.notification.payload.execute/execute-card 1 356)
:result
(select-keys [:notification/truncated?
:status :running_time
:data.rows-file-size
:data])
(update :data (fn [d] (select-keys d [:rows])))))
"Elapsed time: 840.608 msecs"
{:status :completed,
:running_time 724,
:data.rows-file-size 8056845,
:data {:rows #object[metabase.notification.payload.temp_storage.StreamingTempFileStorage
"0x3336077d"
{:status :pending, :val nil}]}}
```
Compare in memory results:
```clojure
temp-storage=> (let [card-id 356
regular-rows (-> (toucan2.core/select-one-fn :dataset_query :model/Card :id card-id)
(metabase.query-processor/process-query)
:data :rows)
disk-rows (-> (metabase.notification.payload.execute/execute-card 1 356)
:result :data :rows)
dereffed @disk-rows]
(doseq [[f results] [["/tmp/results.from-disk" dereffed]
["/tmp/results.from-qp" regular-rows]]]
(with-open [w (clojure.java.io/writer f)]
(doseq [row results]
(io/copy (str (pr-str row) \newline) w))))
[(= regular-rows dereffed) (= (count regular-rows) (count dereffed))])
[true true]
```
and then compare the files written above:
```shell
/tmp
❯ diff results.from-disk results.from-qp
/tmp
❯ echo $?
0
```
everyone else promised they wouldn't use 7e3cd49d-bfe1-4620-83dd-0c163719175c for anything important
unconditionally add to rowcount and in-memory collection of rows. For two of three branches, this is "done": (1) collecting in memory, and (2) streaming to disk but our "batch" isn't full. Only when the batch is a multiple of 5,000 rows (number completely picked at random, balancing this) we can then write the row block to disk. Totally possible this can switch to just writing to disk on each row and rely on the buffered writer to handle batching. But the use of the in-memory collection being similar across almost all of the branches makes this solution feel a bit cleaner for the moment. also does things like inverting `if-not` so the short branch is at the top and not hanging at the end where it's esay to miss.
used to have some notion of `enforce` and `file-size-limits` Rather than juggle both, you can just set the filesize to 0 to make it unbounded
I’ve added a new metric
```clojure
(prometheus/counter :metabase-notification/temp-storage
{:description "Number and type of temporary storage uses"
;; memory, disk, above-threshold, truncated, not-limited
:labels [:storage]})
```
to record the different scenarios of query results:
- stayed in `memory`
- wrote them to `disk` but remained under the threshold
- wrote to disk and they went `above-threshold`
- wrote to disk and they went so far above threshold that we abandoned the query so the results were `truncated`
- wrote to disk but the limit was set to 0 so the results were `not-limited`
and these look like: (replaced # with ♯ so git message will include)
```
❯ http get :9191/metrics | grep temp
♯ HELP metabase_notification_temp_storage_total Number and type of temporary storage uses
♯ TYPE metabase_notification_temp_storage_total counter
metabase_notification_temp_storage_total{storage="memory",} 2.0
metabase_notification_temp_storage_total{storage="disk",} 2.0
metabase_notification_temp_storage_total{storage="truncated",} 2.0
```
thread the human readable size through
0a04f92 to
2216ebe
Compare
…tification-temp-storage
|
👋 Deploying a preview environment for commit d891472. |
* use disk at query time of notification payload creation
We have some nice work to dump query results to disk if they are larger
than a row-count threshold. But this presupposes that results can fit
into memory. It's completely possible, and relatively easy, to do some
kind of "select *" operation in a dashboard where the only limit that
prevents billions of rows coming back is the excel limit we add in.
This tries to optimistically use the disk. If creates a new rff that has
two modes: in-memory and a gzipped dataoutput stream. The first is in
operation for the first 2,000 rows. If we never exceed this limit it
remains a regular reducing function. Once this threshold is exceeded, we
open a new file and stuff the rows in there and don't keep them in
memory any longer. This prevents us from having to hold the entire
result set in memory at once.
On the consumption side, we have a threshold such that if a file is
_larger_ than this, we won't attempt to load it and render it for static
viz. This is what lets us defensively decline to do an operation that
would reasonably lead to an OOM.
Optimizations for the future:
- give up on file creation once it exceeds the threshold size. We can
easily monitor the size of the file as we go and stop the transduction
or results.
- clear out files more frequently. This will probably litter lots of
temp files, some possibly quite large, on teh disk. We need to more
frequently clear them up. The directory is marked as deleteOnExit, but
we need to ensure we don't put lots of files in there during process
lifetime.
- better error rendering of "too large" error. Right now it hits the
default error rendering (previously it would oom). We can make this
better.
- explore how parallel we can get now. Presumably we are now quite
memory effecient. Can we go back to 3, or 10, worker threads?
* fix typo and reintroduce constant for row threshold
* more references to typo constant
* missed one somehow?
* include pulses (alerts) for autocommit off for downloads
we want to stream results. and now we do into a file so this can respect
the page size which only happens in pg when using autocommit false
* special error msg for too large results/dont attach large results
feedback that query results were too large
alerts also by default include results. let's check for the
render/too-large and not include them in that case. Was actually getting
a seq error on the deftype that holds the file. maybe want to give that
a seqable method?
* fix tests
we had an unrealized temp storage file going through
* unification all to streaming before moving to temp-storage
* move streaming to the temp-storage namespace
* don't deref results if no one wants attachments
* don't log the error here. it's pretty normal
* doc strings
* better logging and truncate results above threshold
once we are above the size that we won't read the file, we don't have to
keep piping results to it.
Concretely, what does this get us? If someone absentmindedly did
`select * from transactions` it could be 100s of GBs or more. Without
this, we would fill up the disk for the instance even though we don't
try to render it in the email. But once we know that there's no way we
will read the contents of the file, we really don't have to write to it
any longer.
* Make settings for size limit and whether to enforce
maybe someone has a beefy machine and they don't mind chugging through
843mb of query results to get a graph. they can easily set
`MB_ENFORCE_NOTIFICATION_TEMP_FILE_SIZE_LIMIT` to false and the old
behavior will be restored. If someone doesn't like the 10mb limit and
prefers a 50mb limit they can do
`MB_NOTIFICATION_TEMP_FILE_SIZE_MAX_BYTES=52428800` and then they will
have the higher limit
* Huge performance improvements
The real gist here is removing the gzip on the output. This was crushing
our performance. 18 seconds instead of ~4 seconds. Real bad. And it's
better to not gzip anyways. We really want to be tied pretty closely to
the size of the results. GZiping uses a ton of cpu and gets it very
compact. But we want a number that is pretty representative of the
results. Repeated values get gzipped down to almost nothing and then
they could still OOM the instance. But 50mb of results might get
squashed down to ~8mb of text. We want to be tethered close the size in
a csv or in memory.
Also changed the serialization to write chunks of rows and not a single
row. Its totally possible that the gzip fix did all of the work and
chunks or individual rows all get put together into the
bufferedoutputstream so it doesn't matter anyways. But the good thing is
that the format of this file is entirely the work of this namespace and
no one else gets to see it.
* Check for fidelity of regular results and disk based
timing wise: running the same card 356 (a join of people, products,
orders) and verify timing stays consistent (we are only 100ms longer
with disk)
```clojure
temp-storage=> (time
(-> (toucan2.core/select-one-fn :dataset_query :model/Card :id 356)
metabase.query-processor/process-query
:data :rows count))
"Elapsed time: 740.709583 msecs"
18760
temp-storage=>
(time (->
(metabase.notification.payload.execute/execute-card 1 356)
:result
(select-keys [:notification/truncated?
:status :running_time
:data.rows-file-size
:data])
(update :data (fn [d] (select-keys d [:rows])))))
"Elapsed time: 840.608 msecs"
{:status :completed,
:running_time 724,
:data.rows-file-size 8056845,
:data {:rows #object[metabase.notification.payload.temp_storage.StreamingTempFileStorage
"0x3336077d"
{:status :pending, :val nil}]}}
```
Compare in memory results:
```clojure
temp-storage=> (let [card-id 356
regular-rows (-> (toucan2.core/select-one-fn :dataset_query :model/Card :id card-id)
(metabase.query-processor/process-query)
:data :rows)
disk-rows (-> (metabase.notification.payload.execute/execute-card 1 356)
:result :data :rows)
dereffed @disk-rows]
(doseq [[f results] [["/tmp/results.from-disk" dereffed]
["/tmp/results.from-qp" regular-rows]]]
(with-open [w (clojure.java.io/writer f)]
(doseq [row results]
(io/copy (str (pr-str row) \newline) w))))
[(= regular-rows dereffed) (= (count regular-rows) (count dereffed))])
[true true]
```
and then compare the files written above:
```shell
/tmp
❯ diff results.from-disk results.from-qp
/tmp
❯ echo $?
0
```
* weird postgres types
* postgres 12 didn't like gen_random_uuid so just hardcode one
everyone else promised they wouldn't use
7e3cd49d-bfe1-4620-83dd-0c163719175c for anything important
* formatting, lints
* alignment
* reformat, remove unused aliases
* remove errant comment
* cleanup logic and add some comments
unconditionally add to rowcount and in-memory collection of rows. For
two of three branches, this is "done": (1) collecting in memory, and (2)
streaming to disk but our "batch" isn't full. Only when the batch is a
multiple of 5,000 rows (number completely picked at random, balancing
this) we can then write the row block to disk.
Totally possible this can switch to just writing to disk on each row and
rely on the buffered writer to handle batching. But the use of the
in-memory collection being similar across almost all of the branches
makes this solution feel a bit cleaner for the moment.
also does things like inverting `if-not` so the short branch is at the
top and not hanging at the end where it's esay to miss.
* unify two file size settings
used to have some notion of `enforce` and `file-size-limits` Rather than
juggle both, you can just set the filesize to 0 to make it unbounded
* reuse human-readable-size in more places, fix some typos
* typos, fixes
* prometheus metrics
* followup to metrics
I’ve added a new metric
```clojure
(prometheus/counter :metabase-notification/temp-storage
{:description "Number and type of temporary storage uses"
;; memory, disk, above-threshold, truncated, not-limited
:labels [:storage]})
```
to record the different scenarios of query results:
- stayed in `memory`
- wrote them to `disk` but remained under the threshold
- wrote to disk and they went `above-threshold`
- wrote to disk and they went so far above threshold that we abandoned the query so the results were `truncated`
- wrote to disk but the limit was set to 0 so the results were `not-limited`
and these look like: (replaced # with ♯ so git message will include)
```
❯ http get :9191/metrics | grep temp
♯ HELP metabase_notification_temp_storage_total Number and type of temporary storage uses
♯ TYPE metabase_notification_temp_storage_total counter
metabase_notification_temp_storage_total{storage="memory",} 2.0
metabase_notification_temp_storage_total{storage="disk",} 2.0
metabase_notification_temp_storage_total{storage="truncated",} 2.0
```
* make predicates a bit more clojurey
* missed a refactor somehow
* better styling
thread the human readable size through
* styling changes
* bit of formatting
---------
Co-authored-by: Alexander Polyankin <alexander.polyankin@metabase.com>
) * use disk at query time of notification payload creation We have some nice work to dump query results to disk if they are larger than a row-count threshold. But this presupposes that results can fit into memory. It's completely possible, and relatively easy, to do some kind of "select *" operation in a dashboard where the only limit that prevents billions of rows coming back is the excel limit we add in. This tries to optimistically use the disk. If creates a new rff that has two modes: in-memory and a gzipped dataoutput stream. The first is in operation for the first 2,000 rows. If we never exceed this limit it remains a regular reducing function. Once this threshold is exceeded, we open a new file and stuff the rows in there and don't keep them in memory any longer. This prevents us from having to hold the entire result set in memory at once. On the consumption side, we have a threshold such that if a file is _larger_ than this, we won't attempt to load it and render it for static viz. This is what lets us defensively decline to do an operation that would reasonably lead to an OOM. Optimizations for the future: - give up on file creation once it exceeds the threshold size. We can easily monitor the size of the file as we go and stop the transduction or results. - clear out files more frequently. This will probably litter lots of temp files, some possibly quite large, on teh disk. We need to more frequently clear them up. The directory is marked as deleteOnExit, but we need to ensure we don't put lots of files in there during process lifetime. - better error rendering of "too large" error. Right now it hits the default error rendering (previously it would oom). We can make this better. - explore how parallel we can get now. Presumably we are now quite memory effecient. Can we go back to 3, or 10, worker threads? * fix typo and reintroduce constant for row threshold * more references to typo constant * missed one somehow? * include pulses (alerts) for autocommit off for downloads we want to stream results. and now we do into a file so this can respect the page size which only happens in pg when using autocommit false * special error msg for too large results/dont attach large results feedback that query results were too large alerts also by default include results. let's check for the render/too-large and not include them in that case. Was actually getting a seq error on the deftype that holds the file. maybe want to give that a seqable method? * fix tests we had an unrealized temp storage file going through * unification all to streaming before moving to temp-storage * move streaming to the temp-storage namespace * don't deref results if no one wants attachments * don't log the error here. it's pretty normal * doc strings * better logging and truncate results above threshold once we are above the size that we won't read the file, we don't have to keep piping results to it. Concretely, what does this get us? If someone absentmindedly did `select * from transactions` it could be 100s of GBs or more. Without this, we would fill up the disk for the instance even though we don't try to render it in the email. But once we know that there's no way we will read the contents of the file, we really don't have to write to it any longer. * Make settings for size limit and whether to enforce maybe someone has a beefy machine and they don't mind chugging through 843mb of query results to get a graph. they can easily set `MB_ENFORCE_NOTIFICATION_TEMP_FILE_SIZE_LIMIT` to false and the old behavior will be restored. If someone doesn't like the 10mb limit and prefers a 50mb limit they can do `MB_NOTIFICATION_TEMP_FILE_SIZE_MAX_BYTES=52428800` and then they will have the higher limit * Huge performance improvements The real gist here is removing the gzip on the output. This was crushing our performance. 18 seconds instead of ~4 seconds. Real bad. And it's better to not gzip anyways. We really want to be tied pretty closely to the size of the results. GZiping uses a ton of cpu and gets it very compact. But we want a number that is pretty representative of the results. Repeated values get gzipped down to almost nothing and then they could still OOM the instance. But 50mb of results might get squashed down to ~8mb of text. We want to be tethered close the size in a csv or in memory. Also changed the serialization to write chunks of rows and not a single row. Its totally possible that the gzip fix did all of the work and chunks or individual rows all get put together into the bufferedoutputstream so it doesn't matter anyways. But the good thing is that the format of this file is entirely the work of this namespace and no one else gets to see it. * Check for fidelity of regular results and disk based timing wise: running the same card 356 (a join of people, products, orders) and verify timing stays consistent (we are only 100ms longer with disk) ```clojure temp-storage=> (time (-> (toucan2.core/select-one-fn :dataset_query :model/Card :id 356) metabase.query-processor/process-query :data :rows count)) "Elapsed time: 740.709583 msecs" 18760 temp-storage=> (time (-> (metabase.notification.payload.execute/execute-card 1 356) :result (select-keys [:notification/truncated? :status :running_time :data.rows-file-size :data]) (update :data (fn [d] (select-keys d [:rows]))))) "Elapsed time: 840.608 msecs" {:status :completed, :running_time 724, :data.rows-file-size 8056845, :data {:rows #object[metabase.notification.payload.temp_storage.StreamingTempFileStorage "0x3336077d" {:status :pending, :val nil}]}} ``` Compare in memory results: ```clojure temp-storage=> (let [card-id 356 regular-rows (-> (toucan2.core/select-one-fn :dataset_query :model/Card :id card-id) (metabase.query-processor/process-query) :data :rows) disk-rows (-> (metabase.notification.payload.execute/execute-card 1 356) :result :data :rows) dereffed @disk-rows] (doseq [[f results] [["/tmp/results.from-disk" dereffed] ["/tmp/results.from-qp" regular-rows]]] (with-open [w (clojure.java.io/writer f)] (doseq [row results] (io/copy (str (pr-str row) \newline) w)))) [(= regular-rows dereffed) (= (count regular-rows) (count dereffed))]) [true true] ``` and then compare the files written above: ```shell /tmp ❯ diff results.from-disk results.from-qp /tmp ❯ echo $? 0 ``` * weird postgres types * postgres 12 didn't like gen_random_uuid so just hardcode one everyone else promised they wouldn't use 7e3cd49d-bfe1-4620-83dd-0c163719175c for anything important * formatting, lints * alignment * reformat, remove unused aliases * remove errant comment * cleanup logic and add some comments unconditionally add to rowcount and in-memory collection of rows. For two of three branches, this is "done": (1) collecting in memory, and (2) streaming to disk but our "batch" isn't full. Only when the batch is a multiple of 5,000 rows (number completely picked at random, balancing this) we can then write the row block to disk. Totally possible this can switch to just writing to disk on each row and rely on the buffered writer to handle batching. But the use of the in-memory collection being similar across almost all of the branches makes this solution feel a bit cleaner for the moment. also does things like inverting `if-not` so the short branch is at the top and not hanging at the end where it's esay to miss. * unify two file size settings used to have some notion of `enforce` and `file-size-limits` Rather than juggle both, you can just set the filesize to 0 to make it unbounded * reuse human-readable-size in more places, fix some typos * typos, fixes * prometheus metrics * followup to metrics I’ve added a new metric ```clojure (prometheus/counter :metabase-notification/temp-storage {:description "Number and type of temporary storage uses" ;; memory, disk, above-threshold, truncated, not-limited :labels [:storage]}) ``` to record the different scenarios of query results: - stayed in `memory` - wrote them to `disk` but remained under the threshold - wrote to disk and they went `above-threshold` - wrote to disk and they went so far above threshold that we abandoned the query so the results were `truncated` - wrote to disk but the limit was set to 0 so the results were `not-limited` and these look like: (replaced # with ♯ so git message will include) ``` ❯ http get :9191/metrics | grep temp ♯ HELP metabase_notification_temp_storage_total Number and type of temporary storage uses ♯ TYPE metabase_notification_temp_storage_total counter metabase_notification_temp_storage_total{storage="memory",} 2.0 metabase_notification_temp_storage_total{storage="disk",} 2.0 metabase_notification_temp_storage_total{storage="truncated",} 2.0 ``` * make predicates a bit more clojurey * missed a refactor somehow * better styling thread the human readable size through * styling changes * bit of formatting --------- Co-authored-by: dpsutton <dan@dpsutton.com> Co-authored-by: Alexander Polyankin <alexander.polyankin@metabase.com>
OOMs from Notifications
A short history of how we got here:
We render alerts and dashboards with a graal js interpreter. To do this we need the resultsets of queries. We used to just gather these all in memory and then render them. This works super well for small datasets and exposes the instance to OOM issues if a large query is returned.
Ngoc (possibly others as well) made a nice addition in #51708 that uses disk to ensure that only a single resultset is in memory at any given time. the gist is run query, if results greater than some row count threshold, dump to disk, and then during rendering refetch it into memory.
This still leaves us susceptible to OOM if any particular resultset is too large.
What ends up happening is that instances randomly die and emails are not sent.
The fix here
The gist of this fix is quite simple: if resultsets start getting to big, stream them to disk as they come from the db rather than after they are done. Then add two knobs: a) the size limit that we don't attempt to render them and b) whether to enforce this limit. This lets anyone easily go back to the previous behavior if they want to allow large query results.
How it is implemented
We run our queries in a transducing context here: https://github.com/metabase/metabase/blob/master/src/metabase/query_processor/pipeline.clj#L78-L87 and the reducing function is created just above with
(rff metadata)which returns a reducing function.So this PR creates a new one which has two modes: collecting results in memory, similar to the default one (https://github.com/metabase/metabase/blob/master/src/metabase/query_processor/reducible.clj#L15-L38) which is optionally wired up here: https://github.com/metabase/metabase/blob/master/src/metabase/query_processor.clj#L81
If the number of rows remains under 2000, this reducing function behaves identically to the default one. If more rows are returned, it takes all accumulated rows so far and starts serializing them to disk. In this way we have prevent having entire resultsets in memory and we can also inspect the entire size and decide if we should abandon trying to render the results.
There is a limit at which we won't open the files (by default 10mb). And the reducing function will monitor the filesize at is creates it and stop running the query when the file is 1.3x this limit. This helps us not fill up disk with query results we won't attempt to render anyways.
What it looks like
This dashboard has a query that generates 50mb of data and then a simple line chart of orders aggregated by month and averaged.
The email comes across as
The logs output are
The relevant logs for these two cards are:
the 50mb query
Row count reached threshold (2000), switching to streaming modetried to not use disk but we should switchResults have exceeded 1.3 times ofnotification-temp-file-size-max-bytesof 13.0 mb (max: 10.0 mb). Truncating query results.we have exceed the size that we would read off of disk, so stop running the query💾 Stored 400338 rows to disk: 13.00 MB (never loaded into memory) (note query results were truncated) {mb-card_id=311, mb-dashboard_id=54, mb-notification_id=, mb-payload_type=:notification/dashboard}a summary of what is stored to disk, including notification id, dashboard, etc. This one does not have a notification id because i'm running the preview "send now" version.The small query
✓ Completed with 12 rows in memory (under threshold) {mb-card_id=355, mb-dashboard_id=54, mb-notification_id=, mb-payload_type=:notification/dashboard}Really nothing to see here. Simple line graph, 12 query results, no need to involve disk, etc. Happy path is very happyRendering
We have a few lines about the first card's results being too large and being skipped
Optimizations for the future:
Correctness
Query results are coming back identical as long as they aren't truncated. Of course once we truncate they should not be the same. And we get a nice speed up as well. A query that returns 50mb of data:
Here it takes us 959 ms to abandon a query after 10mb. But the regular query processor spends 4272ms processing a query that would create 50mb of data.
Performance
At first i was using gzip to keep results smaller. This has two issues: performance and correctness. For performance, it was taking a 4 second query to 18 seconds and murdering performance. For correctness, we do want to have a good sense of how big results are in a csv or in memory. GZip is really good at compressing. So a few results that would be 50mb in a csv were ~8mb on disk gzipped. Which means it wasn't a great proxy for if the query might OOM the instance, and it also slowed everything down.
Size:
The following query is my "50mb" query:
When i download this as a csv it takes 165M, when saved as a nippy file it takes:
For card 356 which is people join orders join products i see
and the download sizes are:
So roughly in-line with other formats.
Weird types:
How does this work with pgarrays, weird bigquery types, etc.? I suspect it has the same issues that caching does, which is to say it blows up, but also it will be limited by what can be fed to the js interpreter. That turns everything into json so we probably just need to verify
If these two work, then the pipeline works.