Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrent writes reconciliation for DELETE pushdown/TRUNCATE in Delta Lake #18521

Conversation

findinpath
Copy link
Contributor

@findinpath findinpath commented Aug 3, 2023

Description

Allow committing operations based on delete pushdown / truncate
in a concurrent context by placing these operations right after
any other previously concurrently completed write operations.

Disallow committing the operation in any of the following cases:

  • table schema change has been committed in the meantime
  • table protocol change has been committed in the meantime
  • add files committed in the meantime should be read by
    the current operation
  • remove files committed in the meantime conflict with the
    add files read by the current operation

The current changes also take into consideration the delta.isolationLevel
table property of the Delta Lake table for DELETE/TRUNCATE operations.

Relevant example taken from Databricks documentation in regards to the
distinction between WriteSerializable and Serializable isolation levels:

For example, consider txn1, a long running delete and txn2,
which inserts blindly data into the table.
txn2 and txn1 complete and they are recorded in the order
txn2, txn1
into the history of the table.
According to the history, the data inserted in txn2 should not exist
in the table. For Serializable level, a reader would never see data
inserted by txn2. However, for the WriteSerializable level, a reader
could at some point see the data inserted by txn2.

A few words about WriteSerializable isolation level taken from delta.io javadocs:

This isolation level will ensure snapshot isolation consistency guarantee
between write operations only.
In other words, if only the write operations are considered, then
there exists a serializable sequence between them that would produce the same
result as seen in the table.

Additional context and related issues

INSERT scaffolding PRs:

Depends on the fix from #21330

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(x) Release notes are required, with the following suggested text:

# Delta Lake
* Add support for concurrent `DELETE` pushdown / `TRUNCATE` queries. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Aug 3, 2023
@github-actions github-actions bot added the delta-lake Delta Lake connector label Aug 3, 2023
@findinpath findinpath force-pushed the findinpath/delta-concurrent-update-merge-delete-reconciliation branch from 258382d to b262cb7 Compare August 3, 2023 13:13
@findinpath findinpath self-assigned this Sep 5, 2023
@findinpath findinpath force-pushed the findinpath/delta-concurrent-update-merge-delete-reconciliation branch from b262cb7 to b05405f Compare January 28, 2024 22:23
@findinpath findinpath force-pushed the findinpath/delta-concurrent-update-merge-delete-reconciliation branch from b05405f to 01347f4 Compare March 28, 2024 20:42
@findinpath findinpath changed the title WiP Add concurrent writes reconciliation for UPDATE/MERGE/DELETE in Delta Lake Add concurrent writes reconciliation for UPDATE/MERGE/DELETE in Delta Lake Mar 28, 2024
@findinpath findinpath force-pushed the findinpath/delta-concurrent-update-merge-delete-reconciliation branch 7 times, most recently from 2bbb70e to 6b94cfe Compare April 1, 2024 07:04
@findinpath findinpath marked this pull request as ready for review April 1, 2024 07:04
@findinpath findinpath force-pushed the findinpath/delta-concurrent-update-merge-delete-reconciliation branch from 6b94cfe to 39f1647 Compare April 1, 2024 08:35
@findinpath findinpath requested review from alexjo2144 and ebyhr and removed request for alexjo2144 April 3, 2024 13:49
@findinpath findinpath force-pushed the findinpath/delta-concurrent-update-merge-delete-reconciliation branch from 39f1647 to 74ab9dc Compare April 3, 2024 13:50
@findinpath findinpath requested a review from pajaks April 4, 2024 10:52

return metadata.executeDelete(connectorSession, table.getConnectorHandle());
List<ConnectorTableHandle> sourceConnectorHandles = sourceTableHandles.stream()
.filter(handle -> handle.getCatalogHandle().equals(catalogHandle))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we wan't only handles from the same catalog?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional in order to ensure that schema and table name are pointing to the very same table.

If we'd have catalog1.schema1.table1 and catalog2.schema1.table1 we wouldn't be able to tell whether these tables are one and the same (because they may point to completely different tables even though their schema and table name are the same).

@@ -64,7 +65,8 @@ public Result apply(TableFinishNode node, Captures captures, Context context)
.map(newHandle -> new TableDeleteNode(
context.getIdAllocator().getNextId(),
newHandle,
getOnlyElement(node.getOutputSymbols())))
getOnlyElement(node.getOutputSymbols()),
ImmutableList.of(tableScan.getTable())))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the `tableScan.getTable() target table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scanned table is what is being read.
See also a few lines above which indicates as well that this is the input table.

metadata.applyDelete(context.getSession(), tableScan.getTable())

@@ -59,15 +61,16 @@ else if (transactionLogEntry.getAdd() != null) {
addedFilesCanonicalPartitionValuesBuilder.add(transactionLogEntry.getAdd().getCanonicalPartitionValues());
}
else if (transactionLogEntry.getRemove() != null) {
removedFilesFound = true;
Map<String, String> partitionValues = transactionLogEntry.getRemove().partitionValues();
removedFilesCanonicalPartitionValuesBuilder.add(partitionValues == null ? ImmutableMap.of() : canonicalizePartitionValues(partitionValues));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we add empty map to set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.
In the unlikely case that the winning commit does not contain information about the partition values of the remove file, we should fail the operation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also fix addFiles in the same way?

@findinpath findinpath force-pushed the findinpath/delta-concurrent-update-merge-delete-reconciliation branch from ead4b33 to e4de37b Compare April 10, 2024 08:44
@findinpath
Copy link
Contributor Author

@ebyhr pls run the PR with secrets

@ebyhr
Copy link
Member

ebyhr commented Apr 10, 2024

/test-with-secrets sha=e4de37b413e10dd0a9d255c95a868ccb1b4e4f01

Copy link

github-actions bot commented Apr 10, 2024

The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/8637812363

@findinpath findinpath force-pushed the findinpath/delta-concurrent-update-merge-delete-reconciliation branch 2 times, most recently from e29ae61 to cda867b Compare April 11, 2024 20:59
@ebyhr
Copy link
Member

ebyhr commented Apr 12, 2024

/test-with-secrets sha=cda867bfcfe862af396c91026220e33692f5e4ce

Copy link

github-actions bot commented Apr 12, 2024

The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/8656675392

@findinpath findinpath force-pushed the findinpath/delta-concurrent-update-merge-delete-reconciliation branch from cda867b to 1979c9c Compare April 12, 2024 20:30
@findinpath findinpath force-pushed the findinpath/delta-concurrent-update-merge-delete-reconciliation branch from 1979c9c to bb6c2e3 Compare April 19, 2024 10:33
In case of performing commit retries, start seeking the latest
version of the table from the version of the table read during the
latest attempt to commit the insert operation.
In the context of concurrent operations, this strategy can
spare some unnecessary HEAD operations to iterate incrementally
starting from the version read when the table handle was instantiated.
@findinpath findinpath force-pushed the findinpath/delta-concurrent-update-merge-delete-reconciliation branch from bb6c2e3 to 10b2d55 Compare April 23, 2024 15:10
@ebyhr
Copy link
Member

ebyhr commented Apr 25, 2024

/test-with-secrets sha=10b2d559dfce2926e352613677558fec0bc68e45

Copy link

The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/8826682787

@findinpath findinpath force-pushed the findinpath/delta-concurrent-update-merge-delete-reconciliation branch from 10b2d55 to 31db21d Compare April 26, 2024 10:51
@findinpath findinpath changed the title Add concurrent writes reconciliation for UPDATE/MERGE/DELETE in Delta Lake Add concurrent writes reconciliation for DELETE pushdown/TRUNCATE in Delta Lake Apr 26, 2024
Allow committing pushdown DELETE operations in
a concurrent context by placing these operations right after
any other previously concurrently completed write operations.

Disallow committing the operation in any of the following cases:

- table schema change has been committed in the meantime
- table protocol change has been committed in the meantime
- add files committed in the meantime should be read by
the current operation
- remove files committed in the meantime conflict with the
add files read by the current operation

The current changes also take into consideration the `delta.isolationLevel`
table property of the Delta Lake table for DELETE operations.

Relevant example taken from Databricks documentation in regards to the
distinction between `WriteSerializable` and `Serializable` isolation levels:

> For example, consider `txn1`, a long running delete and `txn2`,
> which inserts blindly data into the table.
> `txn2` and `txn1` complete and they are recorded in the order
> `txn2, txn1`
> into the history of the table.
> According to the history, the data inserted in `txn2` should not exist
> in the table. For `Serializable` level, a reader would never see data
> inserted by `txn2`. However, for the `WriteSerializable` level, a reader
> could at some point see the data inserted by `txn2`.

A few words about WriteSerializable isolation level taken from delta.io javadocs:

> This isolation level will ensure snapshot isolation consistency guarantee
> between write operations only.
> In other words, if only the write operations are considered, then
> there exists a serializable sequence between them that would produce the same
> result as seen in the table.
@findinpath findinpath force-pushed the findinpath/delta-concurrent-update-merge-delete-reconciliation branch from 31db21d to f7829ae Compare April 29, 2024 09:39
@ebyhr ebyhr merged commit e5f536e into trinodb:master May 8, 2024
99 checks passed
@github-actions github-actions bot added this to the 447 milestone May 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector
Development

Successfully merging this pull request may close these issues.

None yet

4 participants