-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sync protocol improvements #530
Conversation
// Denotes a column that that has the new value for a primary key column | ||
NEW_PK = 1; | ||
// Denotes whether a particular column has changed in value | ||
CHANGED = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still not personally confident about how TOAST columns actually work. We may need to change more here or not. But nbd for now, just mentioning.
// This means that if a row is changed multiple times, only the last change will be reflected in the | ||
// output batch (meaning the last NewPk and Value role columns and the last Value column where the | ||
// accompanying Changed field was `true`). | ||
pub(super) fn compact_batches( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I see the point about squashing changes, isn't it pretty wasteful to be converting to rows and then back to columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, can't you squash rows without actually converting to row format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point; for one thing note this was designed to be efficiently encodable:
https://github.com/apache/arrow-rs/blob/8752e01be642bce205984e16b44e06078413dc68/arrow-row/src/lib.rs#L20-L24
In fact DataFusion itself relies on it for some core computations itself (also we only ever transpose the PKs, though that isn't a big factor anyway).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorting with columnar format is like the worst case for columnar format, so it doesn't surprise me they convert to row to sort, but I don't think we are trying to do that exactly. Or I'm not sure what you mean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or I'm not sure what you mean
Ah, so what needs to happen above is the squashing (compaction) of all chains of changes to a given row (that may encompass multiple input rows), into a single output row, which will then be applied to the target table.
These chains can be of arbitrary lengths, from 1 (meaning a row is touched only once) to the batch row count (meaning the entire batch conveys changes for a single original row). In addition, various things can happen in each chain, such as value changes, pk changes and entire row deletion.
In principle I think this kind of computation can be formalized using recursive CTEs. However there are 3 things to note here:
- I briefly experimented with recursive CTEs, managing to get some basic (but not full) functionality that we need here. The simplest prototype worked ok, however slightly more involved stuff made DF hang and spike the CPU, meaning there are some bugs there and I can't resolve those now.
- Even if there were no issues in the execution, the recursive CTE implementation (using dataframe/plan/expression primitives) would be much more complex, since it would approach the problem in a round-about way.
- Ultimately, even such an implementation would entail using the
arrow-row
under the hood somewhere (e.g. in the grouping/window aggregation phases).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think normally we could do this with something like this:
WITH latest_rows AS (
SELECT row_number() OVER (PARTITION BY pk ORDER BY timestamp DESC) AS _row_num
FROM table
) SELECT [cols] FROM latest_rows WHERE _row_num = 1 AND NOT is_deleted
which would still process things row-by-row in the background but at least would reduce the amount of our custom code that we have to write.
But with PK-changing UPDATEs we have to be clever. It does raise the question of why we expect the API user to convert row-based changes into Arrow and then this code transposes it back into rows, then back again into columnar (maybe V2 of this would be Avro changes instead).
We can always see how this performs under benchmarks (append-only, update-intensive, update-intensive with PK changes). Ultimately, we'll have to do this transpose/compact somewhere - on write or on every read.
src/frontend/flight/sync/utils.rs
Outdated
} | ||
|
||
// Generates a pruning qualifier for the table based on the PK columns and the min/max values | ||
// in the data sync entries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand the point of this, like what is the benefit? The comment doesn't help me much
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal is to reduce the amount of data needed to be scanned/streamed/joined with from the base table.
Basically we only need to touch those partition which might have the extreme (min/max) PK values that are contained in the sync data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add something like that to the comment in this case
// Generate a qualifier expression that, when applied to the table, will only return
// rows whose primary keys are affected by the changes in `entry`. This is so that
// we can only read the partitions from Delta Lake that we need to rewrite.
Aside: if my entry
has two changes to PK=1 and PK=infinity, will this make me read the entire Delta table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, changing the doc.
Aside: if my entry has two changes to PK=1 and PK=infinity, will this make me read the entire Delta table?
Yup, I don't think there's a way around it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can still work around it though - by manually reading the Delta metadata and cross-referencing it against each PK we know we have changes for, but that sounds like more of a future optimization to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But delta-rs already does that for us automatically, i.e. the scan uses the filter from this function to prune the partitions. So if we have PK=1..infinity (I'm assuming you mean here a really large number encompassing all partitions) then we must hit all partitions otherwise we're not guaranteeing idempotency (e.g. we may double-insert)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking just two changes, one hitting partition #1, one hitting the last partition, not a range of changes spanning all partitions. Surely in this case we can optimize by only rewriting these two partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yeah that should be possible.
Not sure how complicated it would be though, naively in the general case we'd need to have a qualifier per change/sync row and consequently a multiple scans/joins.
The less naive approach would be to segment all changes and group them by partitions they hit, and then only scan/join per group.
src/frontend/flight/sync/utils.rs
Outdated
} | ||
|
||
// Generates a pruning qualifier for the table based on the PK columns and the min/max values | ||
// in the data sync entries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add something like that to the comment in this case
// Generate a qualifier expression that, when applied to the table, will only return
// rows whose primary keys are affected by the changes in `entry`. This is so that
// we can only read the partitions from Delta Lake that we need to rewrite.
Aside: if my entry
has two changes to PK=1 and PK=infinity, will this make me read the entire Delta table?
// This means that if a row is changed multiple times, only the last change will be reflected in the | ||
// output batch (meaning the last NewPk and Value role columns and the last Value column where the | ||
// accompanying Changed field was `true`). | ||
pub(super) fn compact_batches( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think normally we could do this with something like this:
WITH latest_rows AS (
SELECT row_number() OVER (PARTITION BY pk ORDER BY timestamp DESC) AS _row_num
FROM table
) SELECT [cols] FROM latest_rows WHERE _row_num = 1 AND NOT is_deleted
which would still process things row-by-row in the background but at least would reduce the amount of our custom code that we have to write.
But with PK-changing UPDATEs we have to be clever. It does raise the question of why we expect the API user to convert row-based changes into Arrow and then this code transposes it back into rows, then back again into columnar (maybe V2 of this would be Avro changes instead).
We can always see how this performs under benchmarks (append-only, update-intensive, update-intensive with PK changes). Ultimately, we'll have to do this transpose/compact somewhere - on write or on every read.
src/frontend/flight/sync/schema.rs
Outdated
#[allow(dead_code)] | ||
pub fn arrow_schema(&self) -> SchemaRef { | ||
self.schema.clone() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why keep this code if it's dead, some future impl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potentially, but also I use it in the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make it non-public then (it'll still be available to unit tests in the same file)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The unit test is in another file, so it's simpler to remove and revise the test setup slightly.
The changes in this PR involve:
TODOS:
A lotmore unit testing, and a bit more integration testing