Skip to content
This repository has been archived by the owner on Jul 27, 2022. It is now read-only.

Commit

Permalink
feat: remove throttle implementation of multipart
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Jul 17, 2022
1 parent a135395 commit cac4ad4
Showing 1 changed file with 5 additions and 98 deletions.
103 changes: 5 additions & 98 deletions src/throttle.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
//! A throttling object store wrapper
use futures::future::BoxFuture;
use parking_lot::Mutex;
use std::io;
use std::ops::Range;
use std::pin::Pin;
use std::task::Poll;
use std::{convert::TryInto, sync::Arc};

use crate::MultipartId;
use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::Future;
use futures::{stream::BoxStream, StreamExt};
use std::time::Duration;
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -141,21 +136,13 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {

async fn put_multipart(
&self,
location: &Path,
_location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let (upload_id, inner) = self.inner.put_multipart(location).await?;
Ok((
upload_id,
Box::new(ThrottledUpload {
inner,
config: Arc::clone(&self.config),
state: ThrottledUploadState::Idle,
}),
))
Err(super::Error::NotImplemented)
}

async fn cleanup_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
self.inner.cleanup_multipart(location, multipart_id).await
async fn cleanup_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
Err(super::Error::NotImplemented)
}

async fn get(&self, location: &Path) -> Result<GetResult> {
Expand Down Expand Up @@ -260,93 +247,14 @@ fn usize_to_u32_saturate(x: usize) -> u32 {
x.try_into().unwrap_or(u32::MAX)
}

enum ThrottledUploadState {
Idle,
Sleeping(BoxFuture<'static, ()>),
Waiting,
}

struct ThrottledUpload {
inner: Box<dyn AsyncWrite + Unpin + Send>,
config: Arc<Mutex<ThrottleConfig>>,
state: ThrottledUploadState,
}

impl ThrottledUpload {
fn config(&self) -> ThrottleConfig {
*self.config.lock()
}

fn poll_waiting<F>(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
inner_call: F,
) -> Poll<Result<(), io::Error>>
where
F: Fn(
&mut Box<dyn AsyncWrite + Unpin + Send>,
&mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>>,
{
loop {
match &mut self.state {
ThrottledUploadState::Idle => {
// If idle, begin sleeping
let wait = self.config().wait_put_per_call;
self.state = ThrottledUploadState::Sleeping(Box::pin(sleep(wait)))
}
ThrottledUploadState::Sleeping(fut) => {
// If sleep is done, move to waiting for inner
match Pin::new(fut).poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(_) => {
self.state = ThrottledUploadState::Waiting;
}
}
}
ThrottledUploadState::Waiting => {
return inner_call(&mut self.inner, cx);
}
}
}
}
}

impl AsyncWrite for ThrottledUpload {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, io::Error>> {
self.poll_waiting(cx, |inner, cx| {
Pin::new(inner).poll_write(cx, buf).map_ok(|_| ())
})
.map_ok(|_| buf.len())
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
self.poll_waiting(cx, |inner, cx| Pin::new(inner).poll_flush(cx))
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
self.poll_waiting(cx, |inner, cx| Pin::new(inner).poll_shutdown(cx))
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{
memory::InMemory,
tests::{
copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list, rename_and_copy, stream_get,
put_get_delete_list, rename_and_copy,
},
};
use bytes::Bytes;
Expand Down Expand Up @@ -380,7 +288,6 @@ mod tests {
list_with_delimiter(&store).await.unwrap();
rename_and_copy(&store).await.unwrap();
copy_if_not_exists(&store).await.unwrap();
stream_get(&store).await.unwrap();
}

#[tokio::test]
Expand Down

0 comments on commit cac4ad4

Please sign in to comment.