Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions src/s3/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,24 @@ pub const MAX_PART_SIZE: u64 = 5_368_709_120; // 5 GiB
/// ensuring the object does not exceed 5 TiB.
pub const MAX_OBJECT_SIZE: u64 = 5_497_558_138_880; // 5 TiB

enum BodyIterator {
Segmented(crate::s3::segmented_bytes::SegmentedBytesIntoIterator),
FromVec(std::vec::IntoIter<Bytes>),
Empty(std::iter::Empty<Bytes>),
}

impl Iterator for BodyIterator {
type Item = Bytes;

fn next(&mut self) -> Option<Self::Item> {
match self {
Self::Segmented(iter) => iter.next(),
Self::FromVec(iter) => iter.next(),
Self::Empty(iter) => iter.next(),
}
}
}

/// Maximum number of parts allowed in a multipart upload.
///
/// Multipart uploads are limited to a total of 10,000 parts. If the object
Expand Down Expand Up @@ -540,14 +558,21 @@ impl MinioClient {
}

if (*method == Method::PUT) || (*method == Method::POST) {
//TODO: why-oh-why first collect into a vector and then iterate to a stream?
let bytes_vec: Vec<Bytes> = match body.as_ref() {
Some(v) => v.iter().collect(),
None => Vec::new(),
let iter = match body {
Some(v) => {
// Try to unwrap the Arc if we're the sole owner (zero-cost).
// Otherwise, collect into a Vec to avoid cloning the SegmentedBytes structure.
match Arc::try_unwrap(v) {
Ok(segmented) => BodyIterator::Segmented(segmented.into_iter()),
Err(arc) => {
let vec: Vec<Bytes> = arc.iter().collect();
BodyIterator::FromVec(vec.into_iter())
}
}
}
None => BodyIterator::Empty(std::iter::empty()),
};
let stream = futures_util::stream::iter(
bytes_vec.into_iter().map(|b| -> Result<_, Error> { Ok(b) }),
);
let stream = futures_util::stream::iter(iter.map(|b| -> Result<_, Error> { Ok(b) }));
req = req.body(Body::wrap_stream(stream));
}

Expand Down