Skip to content
This repository has been archived by the owner on Jul 27, 2022. It is now read-only.

Commit

Permalink
fix: rename to put_multipart
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Jul 8, 2022
1 parent 178d929 commit eb4da24
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 62 deletions.
20 changes: 10 additions & 10 deletions src/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
//!
//! ## Multi-part uploads
//!
//! Multi-part uploads can be initiated with the [ObjectStore::upload] method.
//! Multi-part uploads can be initiated with the [ObjectStore::put_multipart] method.
//! Data passed to the writer is automatically buffered to meet the minimum size
//! requirements for a part. Multiple parts are uploaded concurrently.
//!
//! If the writer fails for any reason, you may have parts uploaded to AWS but not
//! used that you may be charged for. Use the [ObjectStore::cleanup_upload] method
//! used that you may be charged for. Use the [ObjectStore::cleanup_multipart] method
//! to abort the upload and drop those unneeded parts. In addition, you may wish to
//! consider implementing automatic clean up of unused parts that are older than one
//! week.
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
use crate::util::format_http_range;
use crate::UploadId;
use crate::MultipartId;
use crate::{
collect_bytes,
path::{Path, DELIMITER},
Expand Down Expand Up @@ -132,13 +132,13 @@ enum Error {
},

#[snafu(display(
"Unable to cleanup upload data. Bucket: {}, Location: {}, Error: {} ({:?})",
"Unable to cleanup multipart data. Bucket: {}, Location: {}, Error: {} ({:?})",
bucket,
path,
source,
source,
))]
UnableToCleanupUploadData {
UnableToCleanupMultipartData {
source: rusoto_core::RusotoError<rusoto_s3::AbortMultipartUploadError>,
bucket: String,
path: String,
Expand Down Expand Up @@ -287,10 +287,10 @@ impl ObjectStore for AmazonS3 {
Ok(())
}

async fn upload(
async fn put_multipart(
&self,
location: &Path,
) -> Result<(UploadId, Box<dyn AsyncWrite + Unpin + Send>)> {
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let bucket_name = self.bucket_name.clone();

let request_factory = move || rusoto_s3::CreateMultipartUploadRequest {
Expand Down Expand Up @@ -325,7 +325,7 @@ impl ObjectStore for AmazonS3 {
Ok((upload_id, Box::new(CloudMultiPartUpload::new(inner, 8))))
}

async fn cleanup_upload(&self, location: &Path, upload_id: &UploadId) -> Result<()> {
async fn cleanup_multipart(&self, location: &Path, upload_id: &MultipartId) -> Result<()> {
let request_factory = move || rusoto_s3::AbortMultipartUploadRequest {
bucket: self.bucket_name.clone(),
key: location.to_string(),
Expand All @@ -340,7 +340,7 @@ impl ObjectStore for AmazonS3 {
async move { s3.abort_multipart_upload(request_factory()).await }
})
.await
.context(UnableToCleanupUploadDataSnafu {
.context(UnableToCleanupMultipartDataSnafu {
bucket: &self.bucket_name,
path: location.as_ref(),
})?;
Expand Down Expand Up @@ -888,7 +888,7 @@ struct S3MultiPartUpload {
}

impl CloudMultiPartUploadImpl for S3MultiPartUpload {
fn upload_part(
fn put_multipart_part(
&self,
buf: Vec<u8>,
part_idx: usize,
Expand Down
27 changes: 7 additions & 20 deletions src/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
//!
//! ## Streaming uploads
//!
//! [ObjectStore::upload] will upload data in blocks and write a blob from those
//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those
//! blocks. Data is buffered internally to make blocks of at least 5MB and blocks
//! are uploaded concurrently.
//!
//! [ObjectStore::cleanup_upload] is a no-op, since Azure Blob Store doesn't provide
//! [ObjectStore::cleanup_multipart] is a no-op, since Azure Blob Store doesn't provide
//! a way to drop old blocks. Instead unused blocks are automatically cleaned up
//! after 7 days.
use crate::{
multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
path::{Path, DELIMITER},
util::format_prefix,
GetResult, ListResult, ObjectMeta, ObjectStore, Result, UploadId,
GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
};
use async_trait::async_trait;
use azure_core::{prelude::*, HttpClient};
Expand Down Expand Up @@ -105,19 +105,6 @@ enum Error {
path: String,
},

#[snafu(display(
"Unable to upload data. Bucket: {}, Location: {}, Error: {} ({:?})",
container,
path,
source,
source,
))]
UnableToUploadData {
source: Box<dyn std::error::Error + Send + Sync>,
container: String,
path: String,
},

#[snafu(display(
"Unable to list data. Bucket: {}, Error: {} ({:?})",
container,
Expand Down Expand Up @@ -225,18 +212,18 @@ impl ObjectStore for MicrosoftAzure {
Ok(())
}

async fn upload(
async fn put_multipart(
&self,
location: &Path,
) -> Result<(UploadId, Box<dyn AsyncWrite + Unpin + Send>)> {
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let inner = AzureMultiPartUpload {
container_client: Arc::clone(&self.container_client),
location: location.to_owned(),
};
Ok((String::new(), Box::new(CloudMultiPartUpload::new(inner, 8))))
}

async fn cleanup_upload(&self, _location: &Path, _upload_id: &UploadId) -> Result<()> {
async fn cleanup_multipart(&self, _location: &Path, _upload_id: &MultipartId) -> Result<()> {
// There is no way to drop blocks that have been uploaded. Instead, they simply
// expire in 7 days.
Ok(())
Expand Down Expand Up @@ -590,7 +577,7 @@ impl AzureMultiPartUpload {
}

impl CloudMultiPartUploadImpl for AzureMultiPartUpload {
fn upload_part(
fn put_multipart_part(
&self,
buf: Vec<u8>,
part_idx: usize,
Expand Down
8 changes: 4 additions & 4 deletions src/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
path::{Path, DELIMITER},
token::TokenCache,
util::format_prefix,
GetResult, ListResult, ObjectMeta, ObjectStore, Result, UploadId,
GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
};

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -379,16 +379,16 @@ impl ObjectStore for GoogleCloudStorage {
self.put_request(location, bytes).await
}

async fn upload(
async fn put_multipart(
&self,
_location: &Path,
) -> Result<(UploadId, Box<dyn AsyncWrite + Unpin + Send>)> {
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
// TODO: cloud_storage does not provide any bindings for multi-part upload.
// But GCS does support this.
Err(super::Error::NotImplemented)
}

async fn cleanup_upload(&self, _location: &Path, _upload_id: &UploadId) -> Result<()> {
async fn cleanup_multipart(&self, _location: &Path, _upload_id: &MultipartId) -> Result<()> {
Err(super::Error::NotImplemented)
}

Expand Down
28 changes: 17 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod oauth;
#[cfg(feature = "gcp")]
mod token;

#[cfg(any(feature = "azure", feature = "aws"))]
mod multipart;
mod util;

Expand All @@ -60,7 +61,7 @@ use tokio::io::AsyncWrite;
pub type DynObjectStore = dyn ObjectStore;

/// Id type for multi-part uploads.
pub type UploadId = String;
pub type MultipartId = String;

/// Universal API to multiple object store services.
#[async_trait]
Expand All @@ -70,19 +71,24 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {

/// Get a multi-part upload that allows writing data in chunks
///
/// Most cloud-based uploads will buffer and upload parts in parallel.
///
/// To complete the upload, [AsyncWrite::poll_shutdown] must be called
/// to completion.
///
/// For some object stores (S3, GCS, and local in particular), if the
/// writer fails or panics, you must call [ObjectStore::cleanup_upload]
/// writer fails or panics, you must call [ObjectStore::cleanup_multipart]
/// to clean up partially written data.
async fn upload(
async fn put_multipart(
&self,
location: &Path,
) -> Result<(UploadId, Box<dyn AsyncWrite + Unpin + Send>)>;
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)>;

/// Cleanup an aborted upload.
async fn cleanup_upload(&self, location: &Path, upload_id: &UploadId) -> Result<()>;
///
/// See documentation for individual stores for exact behavior, as capabilities
/// vary by object store.
async fn cleanup_multipart(&self, location: &Path, upload_id: &MultipartId) -> Result<()>;

/// Return the bytes that are stored at the specified location.
async fn get(&self, location: &Path) -> Result<GetResult>;
Expand Down Expand Up @@ -508,7 +514,7 @@ mod tests {
// Can write to storage
let data = get_vec_of_bytes(5_000_000, 10);
let bytes_expected = data.concat();
let (_, mut writer) = storage.upload(&location).await?;
let (_, mut writer) = storage.put_multipart(&location).await?;
for chunk in &data {
writer.write_all(chunk).await?;
}
Expand All @@ -519,7 +525,7 @@ mod tests {
// Can overwrite some storage
let data = get_vec_of_bytes(5_000_000, 5);
let bytes_expected = data.concat();
let (_, mut writer) = storage.upload(&location).await?;
let (_, mut writer) = storage.put_multipart(&location).await?;
for chunk in &data {
writer.write_all(chunk).await?;
}
Expand All @@ -529,9 +535,9 @@ mod tests {

// We can abort an empty write
let location = Path::from("test_dir/test_abort_upload.txt");
let (upload_id, writer) = storage.upload(&location).await?;
let (upload_id, writer) = storage.put_multipart(&location).await?;
drop(writer);
storage.cleanup_upload(&location, &upload_id).await?;
storage.cleanup_multipart(&location, &upload_id).await?;
let get_res = storage.get(&location).await;
assert!(get_res.is_err());
assert!(matches!(
Expand All @@ -540,14 +546,14 @@ mod tests {
));

// We can abort an in-progress write
let (upload_id, mut writer) = storage.upload(&location).await?;
let (upload_id, mut writer) = storage.put_multipart(&location).await?;
if let Some(chunk) = data.get(0) {
writer.write_all(chunk).await?;
let _ = writer.write(chunk).await?;
}
drop(writer);

storage.cleanup_upload(&location, &upload_id).await?;
storage.cleanup_multipart(&location, &upload_id).await?;
let get_res = storage.get(&location).await;
assert!(get_res.is_err());
assert!(matches!(
Expand Down
8 changes: 4 additions & 4 deletions src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::{
maybe_spawn_blocking,
path::{filesystem_path_to_url, Path},
GetResult, ListResult, ObjectMeta, ObjectStore, Result, UploadId,
GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -231,10 +231,10 @@ impl ObjectStore for LocalFileSystem {
.await
}

async fn upload(
async fn put_multipart(
&self,
location: &Path,
) -> Result<(UploadId, Box<dyn AsyncWrite + Unpin + Send>)> {
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let path = self.config.path_to_filesystem(location)?;

let file = open_writable_file(&path)?;
Expand All @@ -248,7 +248,7 @@ impl ObjectStore for LocalFileSystem {
))
}

async fn cleanup_upload(&self, location: &Path, _upload_id: &UploadId) -> Result<()> {
async fn cleanup_multipart(&self, location: &Path, _upload_id: &MultipartId) -> Result<()> {
// Clean up partial write
self.delete(location)
.map(|res| match res {
Expand Down
8 changes: 4 additions & 4 deletions src/memory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! An in-memory object store implementation
use crate::UploadId;
use crate::MultipartId;
use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -72,10 +72,10 @@ impl ObjectStore for InMemory {
Ok(())
}

async fn upload(
async fn put_multipart(
&self,
location: &Path,
) -> Result<(UploadId, Box<dyn AsyncWrite + Unpin + Send>)> {
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
Ok((
String::new(),
Box::new(InMemoryUpload {
Expand All @@ -86,7 +86,7 @@ impl ObjectStore for InMemory {
))
}

async fn cleanup_upload(&self, _location: &Path, _upload_id: &UploadId) -> Result<()> {
async fn cleanup_multipart(&self, _location: &Path, _upload_id: &MultipartId) -> Result<()> {
// Nothing to clean up
Ok(())
}
Expand Down
14 changes: 11 additions & 3 deletions src/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ type BoxedTryFuture<T> = Pin<Box<dyn Future<Output = Result<T, io::Error>> + Sen
// Lifetimes are difficult to manage, so not using AsyncTrait
pub(crate) trait CloudMultiPartUploadImpl {
/// Upload a single part
fn upload_part(&self, buf: Vec<u8>, part_idx: usize) -> BoxedTryFuture<(usize, UploadPart)>;
fn put_multipart_part(
&self,
buf: Vec<u8>,
part_idx: usize,
) -> BoxedTryFuture<(usize, UploadPart)>;

/// Complete the upload with the provided parts
fn complete(&self, completed_parts: Vec<Option<UploadPart>>) -> BoxedTryFuture<()>;
Expand Down Expand Up @@ -98,7 +102,9 @@ where
self.current_buffer.extend_from_slice(buf);

let out_buffer = std::mem::take(&mut self.current_buffer);
let task = self.inner.upload_part(out_buffer, self.current_part_idx);
let task = self
.inner
.put_multipart_part(out_buffer, self.current_part_idx);
self.tasks.push(task);
self.current_part_idx += 1;

Expand All @@ -125,7 +131,9 @@ where
// If current_buffer is not empty, see if it can be submitted
if !self.current_buffer.is_empty() && self.tasks.len() < self.max_concurrency {
let out_buffer: Vec<u8> = std::mem::take(&mut self.current_buffer);
let task = self.inner.upload_part(out_buffer, self.current_part_idx);
let task = self
.inner
.put_multipart_part(out_buffer, self.current_part_idx);
self.tasks.push(task);
}

Expand Down

0 comments on commit eb4da24

Please sign in to comment.