Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support table write commit in Presto on Spark #13854

Merged
merged 5 commits into from
Apr 27, 2020

Conversation

wenleix
Copy link
Contributor

@wenleix wenleix commented Dec 12, 2019

This is required by Presto-on-Spark (#13856) in case there is job failures/retry. Data written by failed tasks shouldn't be visible.

@arhimondr arhimondr changed the title [POC] Support table write in Presto-on-Spark with Spark task failure Support table write commit in Presto on Spark Apr 17, 2020
Copy link
Contributor

@shixuan-fan shixuan-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Refactor HiveWriterFactory" LGTM
"Rename PageSinkProperties#isPartitionCommitRequired" Changing to commit is a little bit ambiguous because it might entangle with commit mechanism for transactions (TransactionalMetadata). Maybe isWriteCommitRequired? Honestly I don't have a good name. Also, the unsupported error message in connectors might also need to be changed to reflect the new API name

Copy link
Contributor

@shixuan-fan shixuan-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Rename partition / lifespan commit into table write commit" LGTM
"Introduce ConnectorCommitStrategy" LGTM % nits

Copy link
Contributor

@shixuan-fan shixuan-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Support table write commit in Presto on Spark" Looks good with question

@wenleix
Copy link
Contributor Author

wenleix commented Apr 18, 2020

@shixuan-fan :

"Rename PageSinkProperties#isPartitionCommitRequired" Changing to commit is a little bit ambiguous because it might entangle with commit mechanism for transactions (TransactionalMetadata). Maybe isWriteCommitRequired? Honestly I don't have a good name. Also, the unsupported error message in connectors might also need to be changed to reflect the new API name

That's a great point. When I think about the issue in 2019/12, I think even the old name PartitionCommit is a bit ambiguous. -- it's not really a "commit", but more like "make the data durable"( remember the classic ACID properties in Database) . However, at that time there is "partition" as the prefix , makes since less confusing (we know we are doing the "commit" at lifespan/partition granularity ) .

However, it's difficult to give it an appropriate name under the connector abstraction, as this "rename operation"/" is a bit lower level as connector abstraction level.

Now I relook at it after 4 months, Another way to model it as a two-stage commit. And this is the "commit" at the PageSink level:

  • The PageSink is a sink to the pages, and it has its own "commit" (or make the data visible to connector. Basically think it as you can append pages to the sink, however, before you commit the data to PageSink, they are visible to the connector (e.g. dot files in HDFS)
  • After the data is committed to PageSink, the engine will perform the final stage transaction commit so the data is visible to the engine.

In that case, PageSink#isCommitRequired seems a reasonable name (or we can call it PageSink#isSinkCommitRequired? ) , we might need some document/comment to explain there is two-stage commit: first stage is the PageSink level commit (which is optional) and the second stage is the Connector level commit (which is required).

@arhimondr
Copy link
Member

@shixuan-fan @wenleix I like calling this concept as pageSinkCommit. I will do the renames.

@arhimondr
Copy link
Member

arhimondr commented Apr 22, 2020

@shixuan-fan @wenleix

Maybe isWriteCommitRequired? Honestly I don't have a good name. Also, the unsupported error message in connectors might also need to be changed to reflect the new API name

Since we decided to go with pageSinkCommit first I thought to rename it to PageSinkProperties#isPageSinkCommitRequired, but then it sounded a little bit tautological. So I decided to keep the name as PageSinkProperties#isCommitRequired. I did the renames in other, more ambiguous places, though.

Also, the unsupported error message in connectors might also need to be changed to reflect the new API name

Updated

Copy link
Contributor

@shixuan-fan shixuan-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

if (pageSinkCommitRequired) {
return TASK_COMMIT;
}
if (stageExecutionDescriptor.isRecoverableGroupedExecution()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we first check whether isRecoverableGroupedExecution is true and return LIFESPAN_COMMIT first? I feel like isRecoverableGroupedExecution is a stronger predicate than pageSinkCommitRequired. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Let me do that.

@arhimondr
Copy link
Member

@wenleix @shixuan-fan

The problem of this commit protocol is that the rename of all the files happens at the coordinator (or driver in Spark). In Presto it is not an issue, as Coordinator receives partition updates continuously as soon as TableWriter finishes writing. However in Spark all partition updates are delivered all at once only after the writing is completely finished. This creates unnecessary "hiccup" at the very end of the query, as coordinator has to rename thousands of files in a loop. This also creates additional stress on the file system, as very high number of files has to be renamed in a very short period of time.

I just had an interesting discussion with @sameeragarwal, and turns out Spark's commit protocol does not require files to be renamed on the driver. In spark the output file names are deterministic. As long as the target file name is the same across task attempts, tasks are allowed to speculatively "rename-overwrite" destination files without risk of introducing duplicated data.

It feels like ideally we would like to have the commit protocol similar to one that Spark has. I wonder if we still want to have the current approach as a temporary transitional solution?

Thoughts?

@arhimondr
Copy link
Member

@sameeragarwal

From what I understand Spark supports dynamic partitions. What happens if the partition key is non deterministic?

For example, first run adds a file to partition 'p1' and does commit. Second run doesn't add any files to partition 'p1', but instead adds some files to 'p2'?

@sameeragarwal
Copy link
Contributor

@arhimondr It'll break -- we require partition keys to be deterministic (not just for the commit protocol, but for general task retries as well)

@shixuan-fan
Copy link
Contributor

@arhimondr Just curious, is it possible to amortize these renames by committing at the time of receiving the page from table writer, rather than commit all files after receiving everything from table writer?

@arhimondr
Copy link
Member

@shixuan-fan That's what we do in conventional Presto. On Spark however the results from the upstream stage are delivered all at once, when the upstream stage is finished =\

wenleix and others added 5 commits April 27, 2020 15:23
Minor variable renames
Page sink commit mechanism is a general connector capability and is not
restricted only for partition commit.
It can be used not only to commit lifespans or physical partitions.
In fact it can be used to commit any page sink write.
Co-authored-by: Andrii Rosa <andriirosa@fb.com>
Tasks in spark are often retried and run speculatively, thus the
commit protocol required for table writes to avoid data corruption

Co-authored-by: Andrii Rosa <andriirosa@fb.com>
@arhimondr arhimondr merged commit 2bfddd0 into prestodb:master Apr 27, 2020
@wenleix
Copy link
Contributor Author

wenleix commented Apr 30, 2020

@shixuan-fan :

@arhimondr Just curious, is it possible to amortize these renames by committing at the time of receiving the page from table writer, rather than commit all files after receiving everything from table writer?

Also we did this in Presto to reduce the query latency and reduce coordinator pressure, in Spark:

  • Each query has its own driver (coordinator)
  • Latency is not that sensitive... (thinking Presto-on-Spark aims at queries requires > 1 hour to run, otherwise, user should use Presto-on-Presto)

@caithagoras caithagoras mentioned this pull request May 4, 2020
13 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants