Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg MOR support - implement row-level deletes #21571

Merged
merged 1 commit into from
Feb 21, 2024

Conversation

s-akhtar-baig
Copy link
Contributor

@s-akhtar-baig s-akhtar-baig commented Dec 19, 2023

Description

Support row-level delete operations for Iceberg V2 tables. Use merge-on-read mode by default and create positional delete files.

Motivation and Context

GitHub issue: #20494

Impact

Delete operations will succeed on partitioning columns as well as non-partitioning columns. If delete mode is set to "copy-on-write" and if filtered on non-partitioning columns, then the following error will be displayed as of this moment.

This connector only supports delete where one or more partitions are deleted entirely

Test Plan

  • Added tests for delete operations in both modes.
  • Updated existing tests to account for the new changes.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

Iceberg Connector Changes
* Added support for row-level deletes on Iceberg V2 tables.
The delete mode can be changed from ``merge-on-read`` to ``copy-on-write`` by setting table property ``delete_mode``.

@s-akhtar-baig s-akhtar-baig added the iceberg Apache Iceberg related label Dec 19, 2023
@@ -93,6 +93,26 @@ public String toJson()
}
}

public static PartitionData fromStructLike(StructLike partitionData, Type[] types)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why convert to JSON then convert to PartitionData--why not just directly create a PartitionData?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, this method needs refactoring. I have made changes, let me know how it looks.

@s-akhtar-baig s-akhtar-baig changed the title implement row level deletes Iceberg MOR support - implement row-level deletes Jan 9, 2024
@s-akhtar-baig s-akhtar-baig marked this pull request as ready for review January 9, 2024 18:22
@s-akhtar-baig s-akhtar-baig requested a review from a team as a code owner January 9, 2024 18:22
Copy link

github-actions bot commented Jan 9, 2024

Codenotify: Notifying subscribers in CODENOTIFY files for diff d4bdf97...fa45b73.

Notify File(s)
@steveburnett presto-docs/src/main/sphinx/connector/iceberg.rst

Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Local build of the docs from this branch, everything looks fine. Thanks!

@s-akhtar-baig s-akhtar-baig force-pushed the row_level_deletes branch 3 times, most recently from 60dd47d to 47af37b Compare January 10, 2024 19:35
Comment on lines 217 to 221
``iceberg.enable-merge-on-read-mode`` Enable reading base tables that use merge-on-read for ``true``
updates.

``iceberg.delete-mode`` The delete mode for Iceberg V2 tables. The available values ``merge-on-read``
are ``copy-on-write`` and ``merge-on-read``.
Copy link
Member

@hantangwangd hantangwangd Jan 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little confused about the two properties here. Didn't they refer to the same meaning? And should we firstly figure out whether the delete-mode be a connector property or a table property, or both? As far as I know, Iceberg treat write.delete.mode as a table property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd, thank you for reviewing these changes!

My thought process was to allow users to switch between these two modes by setting the connector property. Although data rewriting is not implemented at the moment, I am assuming we want to support it in the near future.


Yes, I am good with either keeping both or just one, considering the table property is not being used yet.

@tdcmeehan, thoughts on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ask that whatever decision (both or just one), that the properties are documented.

If the decision is made to keep just one, the release note entry should be updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be consistent with the rest of the Iceberg ecosystem, which treat this as a table property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense! And should we keep or remove the connector property?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @tdcmeehan. This may be clearer and without confusion for users, especially the ones who are already working on iceberg.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s-akhtar-baig let's remove it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made changes to use delete_mode as iceberg table property. I have also updated the document and release notes. fyi @hantangwangd @steveburnett @tdcmeehan.

Please let me know if I missed anything. Thanks!

@s-akhtar-baig s-akhtar-baig force-pushed the row_level_deletes branch 2 times, most recently from 63f22ed to 05ad0ee Compare January 19, 2024 16:15
@yingsu00 yingsu00 self-requested a review January 23, 2024 15:23
Copy link
Contributor

@yingsu00 yingsu00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s-akhtar-baig @tdcmeehan Overall looks good, I have a couple of questions though.

  1. I see that this PR is using the UpdatablePageSource approach. Since the DeleteOperator can only set an UpdatablePageSource as its pageSource, and only TableScanOperator and ScanFilterAndProjectOperator can actually return an UpdatablePageSource object, it means that the DeleteOperator can only be in the same stage with the Scan operator that reads the target table with rows to be deleted. It cannot work with ExchangeOperator, who is also a SourceOperator, because it's addSplit() doesn't return an actual object but just nullopt. This means the SemiJoin to join the target table with the delete values has to be broadcast join. What will happen if both sides exceed the broadcast join limit data size? Will the query force broadcast join, or will the query fail?
  2. Trino has removed the UpdatablePageSource in Remove legacy code for DELETE and UPDATE. What's your opinion on their change? Any plans to adopt similar routes?

Btw. I attach a Presto plan and a Trino plan just for your conveniences.

Query:

explain analyze delete from supplier where nationkey in (select nationkey from nation n, region r where n.regionkey=r.regionkey and r.regionkey=3);

Presto
image

Trino
image

@hantangwangd
Copy link
Member

hantangwangd commented Jan 25, 2024

What will happen if both sides exceed the broadcast join limit data size? Will the query force broadcast join, or will the query fail?

Seems semi join for delete has been fixed to broadcast base on the optimizer ReplicateSemiJoinInDelete. And the comment in DetermineSemiJoinDistributionType elaborated this point as follows:

/**
 * This rule must run after the distribution type has already been set for delete queries,
 * since semi joins in delete queries must be replicated.
 * Once we have better pattern matching, we can fold that optimizer into this one.
 */
public class DetermineSemiJoinDistributionType

@yingsu00
Copy link
Contributor

Seems semi join for delete has been fixed to broadcast base on the optimizer ReplicateSemiJoinInDelete.

Thanks @hantangwangd for your information. It seems this is a limitation for this approach. Do you know why ExchangeOperator cannot be used as the SourceOperator here so it can allow hash joins which might be beneficial for larger data sets? Also, this UpdatablePageSource way for deletes/udpates and having a delete sink in a PageSource seems a bit unusual to me. Do you folks want to consider the MergeWriter approach Trino has?

@hantangwangd
Copy link
Member

Do you know why ExchangeOperator cannot be used as the SourceOperator here so it can allow hash joins which might be beneficial for larger data sets? Also, this UpdatablePageSource way for deletes/udpates and having a delete sink in a PageSource seems a bit unusual to me.

I think this approach has an implicit constraint between a sourceOperator and a deleteOperator in the same driver, that is they would handle the same data file, reading from and writing delete file for it. And currently an exchangeOperator could not follow this constraint unless further customization. Maybe that is the reason for this design.

@yingsu00
Copy link
Contributor

I think this approach has an implicit constraint between a sourceOperator and a deleteOperator in the same driver, that is they would handle the same data file, reading from and writing delete file for it. And currently an exchangeOperator could not follow this constraint unless further customization. Maybe that is the reason for this design.

@hantangwangd Thanks for the discussion. But why do the delete file writes have to be done in a SourceOperator, and specifically, the ScanFilterAndProject and TableScan? Can it be done just in DeleteOperator? For MOR delete, the base files are not updated at all, and the delete files are just a bunch of new files that can be written by the DeleteOperator. Even for COW delete, it doesn't have to be the scan operators that write the updated data file, as long as the updated data to be written is passed to the DeleteOperator. The data coming out of the SemiJoin can just be that. The difference is that MOR delete would produce the rows to be deleted, while COW delete produce the rows that are not deleted. In neither case, the scan doesn't have to be in the same stage as the delete.

Taking one step back, even if we don't write the delete files in DeleteOperator but still in a SourceOperator, why can't it be any SourceOperator like ExchangeOperator? As long as it pulls the data to be written from its upstream, it should be fine. They are not operating on the same data file in either MOR or COW. Why was there the constraint they have to be in the same fragment?

@hantangwangd
Copy link
Member

@yingsu00 It's my pleasure to discuss.

But why do the delete file writes have to be done in a SourceOperator, and specifically, the ScanFilterAndProject and TableScan? Can it be done just in DeleteOperator?

As I understand, SourceOperator just build and return a factory for DeleteOperator to create the UpdatablePageSource, which contains a supplier deleteSinkSupplier. The IcebergDeletePageSink would be created in deleteOperator and then the delete file writes would be done in deleteOperator. That's all about lazy load and parameter closure. Of course I agree that things don't have to be this way.

why can't it be any SourceOperator like ExchangeOperator? As long as it pulls the data to be written from its upstream, it should be fine.

In order to remove the constraint mentioned above, maybe we need to add a column about file path besides of rowId in the output page of ConnectorPageSourceWithRowPositions. In that way, I think maybe we could use exchangeOperator as a sourceOperator for deleteOperator.

If anything I wrongly understood, please let me know, thanks.

@s-akhtar-baig
Copy link
Contributor Author

@yingsu00, thank you for reviewing and @hantangwangd, thank you for providing details! Much appreciated!

Regarding the delete workflow in this pull request, I tried to follow Presto’s current implementation of delete and update as described at (https://github.com/prestodb/presto/blob/master/presto-docs/src/main/sphinx/develop/delete-and-update.rst).

As you mentioned, there are constraints with this way of implementation, one-to-one association of a data file to a delete file being one of the big ones. This presents future work opportunities including coalescing delete files and writing equality deletes on the coordinator node. 



Please let me know if I am missing anything. But, there is definitely an opportunity to optimize delete and merge operations. I propose we can create a design document and start curating an optimized workflow.

@yingsu00
Copy link
Contributor

@hantangwangd

In order to remove the constraint mentioned above, maybe we need to add a column about file path besides of rowId in the output page of ConnectorPageSourceWithRowPositions.

Agreed, I think that will work. It's going to be a RLEBlock and will take very small space. Same for the partition information the DeleteOperator needs.

As I understand, SourceOperator just build and return a factory for DeleteOperator to create the UpdatablePageSource, which contains a supplier deleteSinkSupplier. The IcebergDeletePageSink would be created in deleteOperator and then the delete file writes would be done in deleteOperator. That's all about lazy load and parameter closure. Of course I agree that things don't have to be this way.

Yes the DeleteOperator initiate the delete by calling pageSource().deleteRows(rowIds), but the deleteSink and deleteSinkSupplier are owned by the IcebergUpdateablePageSource, which actually write the delete files by the code within IcebergUpdateablePageSource:

@Override
    public void deleteRows(Block rowIds)
    {
        if (deleteSink == null) {
            deleteSink = deleteSinkSupplier.get();
        }
        deleteSink.appendPage(new Page(rowIds));
    }

Do the deleteSink and deleteSinkSupplier have to be in the PageSource? We both agree it's not the case. All the information can be acquired in the DeleteOperator, if it could be passed to the DeleteOperator from the SourceOperator. But a PageSource owning a sink, and a loop in the Driver pipeline seems a bit strange to me.

By lazy loading, did you mean the information like the file path, partition spec, etc that are needed in addition to the rowIds could be lazily loaded? I think they can still be lazily loaded in the DeleteOperator even if we remove the driver loop and make the DeleteOperator own the deleteSink, but they have the same value for for every deleted rows(base file path, partition spec are the same for all rows in the same split), so there's no point or benefit to lazily load them because they're already there. They would just be some RLEBlocks and carrying them from the SourceOperator to the DeleteOperator doesn't incur much cost at all. Note that the rowIds and the delete keys cannot be lazily loaded anyways, since the SemiJoin before the DeleteOperator would need to load the values in order to perform the join. Other than these, I'm not sure what else could be lazily loaded. Maybe you refer to something else?

Of course, I'm not asking to make these changes in this PR, but I wonder if somebody can try this approach in the near future. Trino has removed the UpdatablePageSource, and there must be some valid reasons behind it. Maybe the issues we discussed above are some of their reasons too. I think the approach that doesn't introduce a sink in a source, or a loop in the driver pipeline is cleaner. What do you think?

cc @agrawalreetika

@yingsu00
Copy link
Contributor

hi @s-akhtar-baig, yes I know you were following the current framework. As I said, I'm not asking you to make these changes in this PR, but I wonder if somebody can do some through study and try this approach in the near future. We can merge this PR first so that we can at least support the positional delete and make improvements in the future.

This presents future work opportunities including coalescing delete files and writing equality deletes on the coordinator node. 



Equality deletes MAY be written on the coordinator if the delete values are known at the planning time, but I would consider it as an improvement to the general implementation. In the general implementation, we will just remove the join that joins the delete values with the target table, and then the scan on the target table can be saved as well. I expect this to be much faster than the positional deletes, especially when the target table is very large. The read afterwards with the equality delete files would also run faster because many equality deletes can be interpreted as TupleDomain filters and pushed down to the readers in scan.

@hantangwangd
Copy link
Member

@yingsu00

Do the deleteSink and deleteSinkSupplier have to be in the PageSource? We both agree it's not the case. All the information can be acquired in the DeleteOperator, if it could be passed to the DeleteOperator from the SourceOperator. But a PageSource owning a sink, and a loop in the Driver pipeline seems a bit strange to me.

Yes, I agree with you, things do not have to be this way. Another way, for example, only passing the closure for creation logic of deleteSink and let DeleteOperator really create and hold deleteSink might be a more nature way.

As for lazy loading, I meant that in this approach the IcebergUpdateablePageSource would not really be created until the driver begin running, that is, TableScanOperator.getOutput() be invoked. So during operators construction, they have to pass a closure(or say factory). And as you say, that's also not the reason why sourceOperator must hold the deleteSink.

So overall, I agree with you that, if we further unbind the constraint, and clearly define the message to transfer, we could make the things you concerned more cleaner, and support an exchangeOperator to be source of deleteOperator.

@s-akhtar-baig
Copy link
Contributor Author

s-akhtar-baig commented Jan 31, 2024

@yingsu00, I am working on other items and may not get to this anytime soon. @hantangwangd, please feel free to work on it and let me know if I can be helpful in any way, shape, or form.

Thank you both for your feedback and sharing your insights!

Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thank your for the great work. One last nit.

@@ -787,7 +791,7 @@ dropping the table from the metadata catalog using ``TRUNCATE TABLE``.
DELETE
^^^^^^^^

The Iceberg connector can delete data in one or more entire partitions from tables by using ``DELETE FROM``. For example, to delete from the table ``lineitem``::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we preserve the older version of the documentation for V1 tables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdcmeehan @steveburnett, I have made the suggested changes. Please take a look when you get a chance, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed, nice work!


// Test with delete_mode set to copy-on-write
String tableNameCor = "test_delete_partitioned_cor";
@Language("RegExp") String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the error message be changed to reflect that actually this is now configurable by a table property?

Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Any final comments @beinan @ChunxuTang @yingsu00

Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really, this review is only a nit, a compliment, and a suggestion (Default column) that is entirely optional for you to consider if you have time.

presto-docs/src/main/sphinx/connector/iceberg.rst Outdated Show resolved Hide resolved
presto-docs/src/main/sphinx/connector/iceberg.rst Outdated Show resolved Hide resolved
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Pulled just-updated branch, new local build, everything looks good. Thanks for the quick response!

If you make further changes in the docs before merge, request review from me and I'll respond as quickly as I'm able.

@s-akhtar-baig
Copy link
Contributor Author

Sounds good, thanks @steveburnett!

Copy link
Member

@ChunxuTang ChunxuTang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s-akhtar-baig This work is fantastic! The row-level deletion is a feature that the community strongly needs.

Just a quick question on the backward compatibility: After the change, would the Iceberg connector still be compatible with pre-existing V1 tables?

@ChunxuTang
Copy link
Member

BTW, could you resolve the conflicts from the main branch and make sure all tests passed?

Copy link
Contributor Author

@s-akhtar-baig s-akhtar-baig left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChunxuTang, thank you for reviewing these changes!

Ah, yes. The older table versions were throwing Iceberg errors and there were inconsistencies with the default version set by Iceberg vs Presto.

  • I have made relevant changes to set the default format version to 2 which follows Iceberg's implementation.
  • For v1 tables, the delete mode will be set to copy-on-write and for newer versions, it will look up table properties.
  • Updated docs.
  • Added a test to show backwards compatibility.

@ChunxuTang @tdcmeehan Please take a look and let me know what you think.

Session session = getSession();

String errorMessage = format("This connector only supports delete where one or more partitions are deleted entirely for table versions older than %d", MIN_FORMAT_VERSION_FOR_DELETE);
assertUpdate("CREATE TABLE " + tableName + " (id integer, value integer) WITH (format_version = '1', partitioning = Array['id'])");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test for delete operations on V1 tables.

}
else {
RowLevelOperationMode deleteMode = IcebergTableProperties.getDeleteMode(tableMetadata.getProperties());
propertiesBuilder.put(DELETE_MODE, deleteMode.modeName());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added checks to determine delete modes for older table versions vs newer.

@@ -74,13 +77,22 @@ public IcebergTableProperties(IcebergConfig icebergConfig)
.add(stringProperty(
FORMAT_VERSION,
"Format version for the table",
null,
DEFAULT_FORMAT_VERSION,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed default format version to 2 to follow Iceberg's implementation (https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/TableMetadata.java#L53C20-L53C48).


int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
throw new PrestoException(NOT_SUPPORTED, format("This connector only supports delete where one or more partitions are deleted entirely for table versions older than %d", MIN_FORMAT_VERSION_FOR_DELETE));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new error message for v1 tables.

@@ -251,14 +251,16 @@ Property Name Description
``location`` Optionally specifies the file system location URI for
the table.

``format_version`` Optionally specifies the format version of the Iceberg
``format_version`` Optionally specifies the format version of the Iceberg ``2``
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steveburnett, I have made changes to the default format version and resolved other merge conflicts. Please review when you get a chance, thanks!

Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Pulled updated branch, made new local build of docs and reviewed new doc changes. Everything looks good, thanks!

@tdcmeehan tdcmeehan merged commit 14620ca into prestodb:master Feb 21, 2024
57 checks passed
@s-akhtar-baig s-akhtar-baig deleted the row_level_deletes branch February 21, 2024 20:42
@wanglinsong wanglinsong mentioned this pull request May 1, 2024
48 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
iceberg Apache Iceberg related
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

7 participants