Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 streaming upload with multipart upload api #3712

Merged
merged 1 commit into from Dec 4, 2020
Merged

S3 streaming upload with multipart upload api #3712

merged 1 commit into from Dec 4, 2020

Conversation

chhsiao90
Copy link
Member

Implements S3 streaming upload with multipart upload api.

The original implementation upload file after the all query result is fetched (which was stored in a file), it might be a bottleneck if the query result is slow or very large.

It's a draft, please kindly let me know if you have any suggestion, thanks!

@cla-bot cla-bot bot added the cla-signed label May 13, 2020
@sopel39
Copy link
Member

sopel39 commented May 13, 2020

Hi @chhsiao90

Have eliminated staging of S3 files? Will files be uploaded before query finishes? One benefit of staging is that if query fails in the middle of execution, then partial data won't be uploaded to S3.

@chhsiao90
Copy link
Member Author

This pull request leverage the Multipart Upload API.

A query result was separated into several parts and uploaded to S3.
We can complete a multipart upload if the query is succeed, the parts will be concat into one file by S3. Or we can abort a multipart upload if the query is failed, all parts will be aborted.

Have eliminated staging of S3 files?

Yes

Will files be uploaded before query finishes?

Yes, but it will be uploaded to S3 as a part of multipart upload (5MB by default), which can be aborted if query was failed.

@sopel39
Copy link
Member

sopel39 commented May 13, 2020

Yes, but it will be uploaded to S3 as a part of multipart upload (5MB by default), which can be aborted if query was failed.

Will some files be committed to S3 before query finishes?

@chhsiao90
Copy link
Member Author

I'm not sure if the "committed" means.

If it means the file is visible or accessible by S3 API before the query finish, then the answer is no, nobody can see or access the data before the query finished.

The parts data were saved in parts storage (like a staging area in S3).

@electrum
Copy link
Member

electrum commented May 14, 2020

Thanks for working on this feature. I had started on it locally but didn't get as far as you. Some initial commits:

  • I'd prefer to move the benchmark to a different PR, since it distracts from this change. I'm not sure if we need it at all, since the biggest benefit of this feature is not requiring potentially huge amounts of local storage for the temporary files. Performance benefits are secondary.

  • This type of change is inherently risky. The old S3 code upload code has been working for years. We expect the new code to be better, but we should allow users to switch back to the old code in case there are problems. For this, we can rename PrestoS3OutputStream to PrestoS3StagingOutputStream and introduce a new PrestoS3StreamingOutputStream, with a config property hive.s3.streaming.enabled to switch between them.

  • Part size is tricky. S3 has a maximum of 10,000 parts, so the part size determines the maximum file size that can be written, thus we don't want to make it too small. The part size also determines the memory usage used by the writer, so we don't want it too large, either. If we pick 16MB, that allows a 160GB file, which is huge for a single writer, and probably fine as a default. We had an idea of increasing the buffer size based on the number of parts written, e.g., double the size each time you consume half the remaining parts, but that might not be needed. We can start with a fixed buffer size configured with hive.s3.streaming.part-size.

  • Upload concurrency impacts memory usage. We should have a fixed, maximum number of concurrent uploads per writer. I'd be fine if we simplify this and only allow a single background upload. In other words, fill the buffer, start the upload, fill the next buffer, then block until the previous upload completes. I suspect that if the uploads take longer than writing, there won't be much benefit to multiple background uploads (and might be worse since they'll compete with each other).

@chhsiao90
Copy link
Member Author

chhsiao90 commented May 15, 2020

Thanks for reviewing the pull request.

All comments look good to me and I already updated the pull request.

For the item 3, Part size

Can I leverage hive.s3.multipart.min-part-size for hive.s3.streaming.part-size as they are same configuration?

For item 4

I will implement "limited threads per writer". I will start with one single thread per writer. Should it configurable?

@chhsiao90
Copy link
Member Author

BTW, the streaming upload should work with below config.

connector.name=hive-hadoop2
hive.metastore=file
hive.metastore.catalog.dir=s3://bucket/path
hive.metastore.user=test
hive.s3.aws-access-key=
hive.s3.aws-secret-key=
hive.s3.streaming.enabled=true

After presto starts, we can have some statements to upload files to s3.

CREATE SCHEMA hive.test;
CREATE TABLE hive.test.orders AS SELECT * from tpch.tiny.orders;

@sopel39
Copy link
Member

sopel39 commented May 15, 2020

I suspect that if the uploads take longer than writing, there won't be much benefit to multiple background uploads (and might be worse since they'll compete with each other).

For large instances you need multiple uploads per node to utilize full S3 bandwidth capacity

@electrum
Copy link
Member

@chhsiao90 apologies for the delay on this.

Can I leverage hive.s3.multipart.min-part-size for hive.s3.streaming.part-size as they are same configuration?

I think it's best to have a separate configuration since these are different upload methods. It's clearer to have all the old multipart configs under hive.s3.multipart and the new streaming configs under hive.s3.streaming. Also, they're actually different configs, as hive.s3.multipart.min-part-size controls the minimum part size (the AWS transfer manager may choose a larger value) whereas hive.s3.streaming.part-size is a fixed part size.

I will start with one single thread per writer. Should it configurable?

If we can keep the implementation simpler by only supporting a single writer, then I don't see a reason to make it configurable.

@sopel39 yes, multiple uploads are needed, but we achieve that by having multiple writers. We would only need multiple uploads per writer in the case where a writer can write faster than a single uploader, but multiple uploaders can keep up with the writer.

Copy link
Member

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks good. Nice work!

@chhsiao90
Copy link
Member Author

Thanks @electrum for the review. All comments are addressed.

I have one question. If we only accept one in-progress upload for one writer, should we just remove the executor and make the "upload part" synchronized?

BTW, we cannot guarantee that multipart upload will be completed or aborted (may caused by network issue or process killed), should we have documentation about configured AbortIncompleteMultipartUpload or maybe we can create policy automatically?

@electrum
Copy link
Member

Doing the upload in the background seems useful, since we can do it in parallel with writing the next part. An alternative is to simply add more concurrent writers, but this has the downside of higher memory usage, so I think doing it in the background is good.

@chhsiao90 chhsiao90 changed the title WIP: S3 streaming upload with multipart upload api S3 streaming upload with multipart upload api Jul 3, 2020
@electrum
Copy link
Member

@chhsiao90 My apologies, I forgot to follow up on this, and will do so this week.

@rtjarvis
Copy link

I'd love to see this make it in. Would accelerate our hive CTAS when we have slow (and cheap) EBS storage.

@martint martint requested a review from electrum October 26, 2020 22:04
Copy link
Member

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chhsiao90 my sincere apologies for not following up. This looks great. I had a number of minor, mostly stylistic comments.

I'd like to get this merged for the next release. If you don't have time to address these, I'm happy to do it. Please squash the commits when updating the PR, so that we have a single commit titled

Implement S3 streaming upload

``hive.s3.streaming.enabled`` Use S3 multipart upload API to upload file in streaming way,
without staging file to be created in the local file system.

``hive.s3.streaming.part-size`` The part size for S3 streaming upload. Defaults to ``5MB``.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking we should make this larger by default, say 16MB or 32MB, since this controls the maximum file size (part size * 10,000 max parts). Using 16MB would increase the max from 50GB to 160GB, while still being acceptable from a memory standpoint (writers can use substantial memory for buffers).

@@ -65,6 +65,8 @@
private PrestoS3AclType s3AclType = PrestoS3AclType.PRIVATE;
private boolean skipGlacierObjects;
private boolean requesterPaysEnabled;
private boolean s3StreamingUploadEnabled;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we enable this by default? Are you using this change in production already?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not enable this feature by default.

We are using this feature in production already, but we are using it without large loading.

And there is concern that user should create a AbortIncompleteMultipartUpload policy, so they don't get charged if any upload is neither uploaded successfully nor never aborted (might caused by server crashed during upload).
Our company create the policy automatically when the S3FileSystem was initialized.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already use multipart uploads via TransferManager, so the policy would be needed in either case. It's unfortunate that they don't have a default policy that aborts after some number of days. It seems ridiculous that the default is to keep them around forever.

I'm ok with not enabling by default for now. We can do that in a future release, after we have confirmation that it is working well for multiple users.

One thing I just realized is that we have hive.s3.multipart.min-file-size for the existing behavior. We could add similar but simpler logic for the streaming approach to not use multipart if there would only be one part (i.e. if we are flushing when finished and there are no parts yet). This can be a follow up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a PR to implement this: #6201

It also improves the abort logic. I realized your original implementation of aborting immediately was good:

  • ensure we cleanup even if the output stream is not closed for some reason (bugs elsewhere)
  • since close() is called after a failure, we need to ensure that we don't finish the upload

I added a failed flag to track the fact that a previous part upload failed, since the abort might fail but the finish could succeed.

@chhsiao90
Copy link
Member Author

Thanks @electrum for reviewing, all comments are reasonable to me and were already fixed.
All commits are already squashed into one commit.

@electrum electrum merged commit f681708 into trinodb:master Dec 4, 2020
@electrum
Copy link
Member

electrum commented Dec 4, 2020

Thanks!

@chhsiao90 chhsiao90 deleted the s3-streaming-upload-with-multipart-upload-api branch December 4, 2020 03:21
@electrum electrum added this to the 348 milestone Dec 9, 2020
@electrum electrum mentioned this pull request Dec 9, 2020
9 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

4 participants