Skip to content
This repository has been archived by the owner on Jul 27, 2022. It is now read-only.

feat: Add stream upload (multi-part upload) #20

Merged
merged 19 commits into from Jul 18, 2022

Conversation

wjones127
Copy link
Contributor

@wjones127 wjones127 commented Jun 2, 2022

Implements stream / multi-part upload for all object stores except GCP. Exposed as two new methods in ObjectStore trait:

   /// Get a multi-part upload that allows writing data in chunks
    async fn put_multipart(
        &self,
        location: &Path,
    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)>;

    /// Cleanup an aborted upload.
    async fn cleanup_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()>;

Follow-up work:

  • Need to expose concurrency options for cloud multi-part uploads and set tested default values.

Copy link
Collaborator

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome that you're working on this 🎉 This will open up all sorts of cool things, like doing streaming parquet writes to object storage.

That being said I'm not sure that BoxStream will give us the optimal interface. The main challenge with streams is that they are pull-based, which can complicate things like offloading CPU bound work to a separate threadpool.

One option might be to use futures::Sink i.e. something like

type ChunkWriter<'a> = Pin<Box<dyn Sink<Bytes, Error=Error>>>

async fn put_chunked(&self, location: &Path) -> Result<ChunkWriter<'_>>

But this has two issues:

  • It isn't clear how to handle cancellation
  • It still forces data to be produced serially

I wonder if instead we might do something like

#[async_trait]
trait ChunkWriter {
  /// Write a chunk at index `idx`
  ///
  /// Takes a shared reference so can be performed in parallel
  async fn put(&self, idx: usize, data: Bytes) -> Result<()>;
  
  /// Abort the multipart upload
  ///
  /// Takes a mutable reference to ensure no inflight requests
  async fn abort(&mut self) -> Result<()>

  /// Finish the multipart upload
  ///
  /// Takes a mutable reference to ensure no inflight requests  
  async fn finish(&mut self) -> Result<()>
}

#[async_trait]
trait ObjectStore {
  async fn put_chunked(&self) -> Result<Box<dyn ChunkWriter + '_>>
  ...
}

We could then perhaps impl Sink for ChunkWriter as a usability enhancement for simple use-cases. This would let people do

store.put_chunked(location)?.await.forward(stream).await?

What do you think?

src/azure.rs Outdated
@@ -103,6 +105,14 @@ impl ObjectStore for MicrosoftAzure {
Ok(())
}

async fn upload(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think put_chunked or put_multi_part might be a better name

@wjones127
Copy link
Contributor Author

The main challenge with streams is that they are pull-based, which can complicate things like offloading CPU bound work to a separate threadpool.

I was starting to think that about streams. It seems like Sink is much more appropriate! (AsyncWrite is auto-implemented for Sink, so that's cool 😄.)

I initially liked the idea of the ChunkWriter trait you propose. But the more I thought about it, it leaves the user with the complexity of not only handling the chunk indices, but also the buffer sizes. In AWS S3, chunks have to be at least 5 MB (except for final one). A nice API would combine bytes to an appropriate size, and even increase that minimum size for later chunks. If you only use 5MB chunked, you are limited to about 50GB with the maximum of 10k parts, which isn't terrible but could be better.

It still forces data to be produced serially

Apologies if this is a naive question, but are there file format serializers that don't do this?

I was expecting that something like the parquet writer would produce buffers serially, and the upload would receive them and initiate concurrent upload calls as they came in. (Possibly using something like StreamExt::for_each_concurrent().)

So far, I'm more inclined to go with just Sink (plus some additional trait to handling aborting):

#[async_trait]
trait AbortableUpload {
  async abort() -> Result<()>;
}

type ChunkWriter<'a> = Pin<Box<dyn Sink<Bytes, Error=Error> + AbortableUpload>>

async fn put_chunked(&self, location: &Path) -> Result<ChunkWriter<'_>>

@tustvold
Copy link
Collaborator

tustvold commented Jun 3, 2022

In AWS S3, chunks have to be at least 5 MB

Aah yes, yeah that would be annoying for users to have to worry about

Apologies if this is a naive question, but are there file format serializers that don't do this?

You could theoretically encode parquet row groups separately, potentially in parallel, or chunks of a CSV or ndjson file, etc... Although tbh uploading them out of order is probably of only limited benefit, especially if internally the ChunkWriter can handle multiple concurrent uploads.

Possibly using something like StreamExt::for_each_concurrent()

I suspect FuturesUnordered or similar might be easier to use in a push-based context.

So far, I'm more inclined to go with just Sink

Sounds good to me 👍

plus some additional trait to handling aborting

Thinking a bit more on this:

  • We need to handle uploads aborted by panics, process shutdown, etc...
  • Methods like Stream::forward will drop the sink on error
  • We may want to be able to resume uploads

As such I wonder if we need multipart uploads to be a higher level concept on the trait, maybe something like

type MultipartWriter<'a> = Pin<Box<dyn Sink<Bytes, Error=Error>>>

pub struct MultipartId(String);

pub struct MultiPart {
  id: MultipartId,
  pub parts: Vec<Part>,
}

pub struct Part {
  pub size: usize,
}

[async_trait]
trait ObjectStore {

  async fn create_multipart(&self, location: &Path) -> Result<MultipartId>;

  async fn write_multipart(&self, id: &MultipartId) -> Result<MultipartWriter<'a>>;  

  async fn abort_multipart(&self, id: &MultipartId) -> Result<()>;

  async fn list_multipart(&self) -> Result<Vec<MultipartId>>;

  async fn get_multipart(&self, id: &MultipartId) -> Result<Multipart>;  
}

What do you think?

@wjones127
Copy link
Contributor Author

I've been away from this for a bit, but got back into it this weekend.

I decided to go ahead with the implementation to get a better sense of what the low-level details looks like. I realized that AsyncWrite is easier to implement and probably more aligned with what we actually want than Sink, so I implemented that.

I learned Azure does not provide a way to clean up parts ("blocks" as they call them). There is no unique ID associated with a particular upload, other than the object location. They just expire in 7 days. So abort does nothing for now.

Thinking a bit more on this:

  1. We need to handle uploads aborted by panics, process shutdown, etc...
  2. Methods like Stream::forward will drop the sink on error
  3. We may want to be able to resume uploads
  1. I'm not sure what we can do about this in our error handling. Are you suggesting that we just need to make sure we allow the user to get the MultipartId in case they need to abort manually after a crash? Or is there more we can do?
  2. I see so that will take ownership of the Sink. We wont use that anymore since I implemented AsyncWrite instead, but that means there is a problem if there are functions that would consumer AsyncWrite because then in the existing design we couldn't abort in case of an error. parquet's ArrowWriter consumes the Writer, so it would probably do the same for an async version. I will refactor the interface then, to make sure we still have some handle to abort from.
  3. That would be cool. The Azure case makes this very difficult though, because without a way to clean up past parts, there's no obvious way to differentiate blocks that were part of the upload to resume, or from previously failed upload attempts.

@tustvold
Copy link
Collaborator

Are you suggesting that we just need to make sure we allow the user to get the MultipartId in case they need to abort manually after a crash?

Pretty much, I just remember being stung in the past by S3 bills for aborted multipart uploads which were just sitting around eating up storage space.

They just expire in 7 days

Yeah, tbh this seems like a pretty sensible policy. https://aws.amazon.com/blogs/aws-cloud-financial-management/discovering-and-deleting-incomplete-multipart-uploads-to-lower-amazon-s3-costs/ describes how you can now achieve the same with S3, so perhaps we don't need to add explicit support for this after all...

That would be cool. The Azure case makes this very difficult though

I see, yeah let's do the simple thing that is supported by all use-cases and we can extend as needed later

Copy link
Collaborator

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking really nice, mostly just minor nits 👍

src/aws.rs Outdated Show resolved Hide resolved
src/azure.rs Outdated Show resolved Hide resolved
src/lib.rs Outdated Show resolved Hide resolved
src/aws.rs Outdated Show resolved Hide resolved
src/multipart.rs Outdated Show resolved Hide resolved
src/multipart.rs Show resolved Hide resolved
src/multipart.rs Show resolved Hide resolved
src/local.rs Outdated
buf: &[u8],
) -> std::task::Poll<Result<usize, io::Error>> {
loop {
match &mut self.state {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be simplified with maybe_spawn_blocking and BoxFuture instead of JoinHandle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main reason I didn't use maybe_spawn_blocking was it forces the wrong error type for this function, and don't see any easy way to make that generic. But I agree BoxFuture would be better.

src/local.rs Outdated Show resolved Hide resolved
src/local.rs Outdated Show resolved Hide resolved
@wjones127
Copy link
Contributor Author

I think put_chunked or put_multi_part might be a better name

"multi-part" seems to be an S3 specific concept, though it also is a decent description. Right now the difference between put() and (what is currently called) upload() is that the former uploads a single buffer while the latter can upload a stream of buffers. But IIUC we could have put() take a stream of buffers too. Then the difference is the upload() method would support concurrent uploads for higher throughput on some object stores. So maybe put_parallel() or put_concurrent()? I will think about this some more...

@wjones127 wjones127 force-pushed the feat/async-writer branch 2 times, most recently from 2e67be7 to eb4da24 Compare July 8, 2022 02:46
@alamb
Copy link
Contributor

alamb commented Jul 8, 2022

@wjones127 and @tustvold --

Given we plan to donate the object_store crate's code into the arrow-rs repo soon (TM) see apache/arrow-rs#2030, what is the plan for this PR? Will we merge it here prior to the donation or do you plan to retarget the changes to arrow-rs once it is there?

@wjones127
Copy link
Contributor Author

How soon are we planning to move the crate? I will add the GCS implementation to the PR this weekend, but I can also hold off on that if we want to get this is sooner. I think this should be ready to merge in the next week, but feel free to let me know if you believe otherwise @tustvold 😄

@alamb
Copy link
Contributor

alamb commented Jul 8, 2022

How soon are we planning to move the crate?

I was imagining sometime next week -- I think we can have a race to see what gets merged first, and the worst case is I'll port this PR's code if needed.

@wjones127
Copy link
Contributor Author

We won't be able to test GCS XML multipart uploads in CI until it's added upstream: fsouza/fake-gcs-server#852

But I can test manually for now.

@wjones127
Copy link
Contributor Author

wjones127 commented Jul 9, 2022

GCS tests pass against real GCS, but are very slow (takes 2 minutes!). I'll be looking into why that is. My local internet upload bandwidth is very low.

#[derive(Debug)]
pub struct GoogleCloudStorage {
client: Arc<GoogleCloudStorageClient>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed to have a reference to the underlying token function and client from the mutlipart writer struct, so I moved the request data into another struct and referenced with an Arc.

@wjones127 wjones127 marked this pull request as ready for review July 9, 2022 21:53
@wjones127 wjones127 requested a review from tustvold July 12, 2022 18:06
Copy link
Collaborator

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking extremely cool, just some relatively minor suggestions

Cargo.toml Outdated
@@ -27,6 +27,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock"] }
futures = "0.3"
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
serde_json = { version = "1.0", default-features = false, optional = true }
serde-xml-rs = { version = "0.5.1", default-features = false, optional = true }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is currently a dependency of azure_storage, but I am a bit apprehensive about its maintenance, it doesn't seem to be particularly actively receiving contributions, and has some pretty long-standing issues - RReverser/serde-xml-rs#135

The same actually holds of xml-rs netvl/xml-rs#219 which is used by rusoto...

https://github.com/tafia/quick-xml is possibly a more future-proof option? Perhaps something to bear in mind as we work on #18

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. quick-xml does look like it might be a better choice. I will try switching to that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little better, but there is one issue they will not fix that we have to work around: tafia/quick-xml#350

src/aws.rs Outdated Show resolved Hide resolved
src/aws.rs Outdated Show resolved Hide resolved
src/aws.rs Outdated Show resolved Hide resolved
src/lib.rs Outdated Show resolved Hide resolved
src/multipart.rs Show resolved Hide resolved
src/multipart.rs Show resolved Hide resolved
buf: &[u8],
) -> std::task::Poll<Result<usize, io::Error>> {
// Poll current tasks
self.as_mut().poll_tasks(cx)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had no idea how this was working, turns out Rust 1.36 added some magic to make this work

https://doc.rust-lang.org/src/core/task/poll.rs.html#245

src/throttle.rs Outdated Show resolved Hide resolved
src/throttle.rs Outdated
Waiting,
}

struct ThrottledUpload {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell this simply delays the first write or flush, I'm not really sure this is particularly meaningful. As this cannot delay the underlying API calls (as they're hidden behind the AsyncWrite abstraction), I would personally be inclined to just not support these APIs in the ThrottledStore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ThrottledStore mostly used just for testing?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solely used for testing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the ThrottleStore implementation.

src/multipart.rs Show resolved Hide resolved
src/multipart.rs Show resolved Hide resolved
Copy link
Collaborator

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome 👍

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
automerge Put in the merge queue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants