-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add concurrent writes reconciliation for OPTIMIZE in Delta Lake #22443
base: master
Are you sure you want to change the base?
Add concurrent writes reconciliation for OPTIMIZE in Delta Lake #22443
Conversation
c3fe9e8
to
9264397
Compare
9264397
to
f4be087
Compare
Allow committing OPTIMIZE operations in a concurrent context by placing these operations right after any other previously concurrently completed write operations.
f4be087
to
e695b50
Compare
// Note: during writes we want to preserve original case of partition columns | ||
List<String> partitionColumns = getPartitionColumns( | ||
optimizeHandle.getMetadataEntry().getOriginalPartitionColumns(), | ||
optimizeHandle.getTableColumns(), | ||
getColumnMappingMode(optimizeHandle.getMetadataEntry(), optimizeHandle.getProtocolEntry())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put it inside commitOptimizeOperation
function?
.build()) | ||
.forEach(MoreFutures::getDone); | ||
|
||
assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (2, 10), (11, 20), (12, 20), (21, 30), (22, 30)"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we check file count to verify that OPTIMIZE
actually happen?
ExecutorService executor = newFixedThreadPool(threads); | ||
String tableName = "test_concurrent_optimize_and_inserts_table_" + randomNameSuffix(); | ||
|
||
assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20)", 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add more files, so that OPTIMIZE
can rewrite something?
.build()) | ||
.forEach(MoreFutures::getDone); | ||
|
||
assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (8, 10), (11, 20), (21, 30)"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we check that OPTIMIZE
actually changed something?
Description
Allow committing
OPTIMIZE
operations in a concurrent contextby placing these operations right after any other previously
concurrently completed write operations.
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text: