Permalink
Branch: master
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
318 lines (298 sloc) 19.1 KB
use std::time::Duration;
use futures::sync::oneshot::spawn;
use futures::{Async, Future, IntoFuture, Poll};
use tokio::runtime::Runtime;
use super::client::{SignAndDispatchError, TimeoutFuture};
use super::credential::CredentialsError;
use super::request::{HttpDispatchError, HttpResponse};
lazy_static! {
static ref FALLBACK_RUNTIME: Runtime = Runtime::new().unwrap();
}
/// Future that is returned from all rusoto service APIs.
///
/// ## Mocking
///
/// To mock service traits, you can use [`RusotoFuture::from_future`] to create
/// [`RusotoFuture`] instance. You can also use the [`From`] implementation on
/// the [`Result`] value.
///
/// ```rust,ignore
/// # // TODO: remove ignore when the cyclic dependency issue has been fixed
/// # // https://github.com/rusoto/rusoto/pull/1141#issuecomment-421865362
/// # extern crate futures;
/// # extern crate rusoto_core;
/// # extern crate rusoto_s3;
/// # extern crate tokio_timer;
/// #
/// use std::time::{Duration, Instant};
/// use futures::prelude::*;
/// use rusoto_core::RusotoFuture;
/// use rusoto_s3::*;
/// use tokio_timer::Delay;
///
/// pub struct S3Mock;
///
/// impl S3 for S3Mock {
/// fn abort_multipart_upload(
/// &self,
/// _input: AbortMultipartUploadRequest,
/// ) -> RusotoFuture<AbortMultipartUploadOutput, AbortMultipartUploadError> {
/// unimplemented!();
/// }
///
/// // ...
///
/// fn put_object(&self, input: PutObjectRequest) -> RusotoFuture<PutObjectOutput, PutObjectError> {
/// if input.bucket == "foo" {
/// let deadline = Instant::now() + Duration::from_secs(3);
/// let output = PutObjectOutput {
/// ..Default::default()
/// };
/// RusotoFuture::from_future(
/// Delay::new(deadline)
/// .map_err(|e| PutObjectError::Unknown(e.to_string()))
/// .map(|_| output)
/// )
/// } else {
/// Err(PutObjectError::Validation("Invalid bucket".to_string())).into()
/// }
/// }
///
/// // ...
/// # fn complete_multipart_upload(&self, _: CompleteMultipartUploadRequest) -> RusotoFuture<CompleteMultipartUploadOutput, CompleteMultipartUploadError> { unimplemented!() }
/// # fn copy_object(&self, _: CopyObjectRequest) -> RusotoFuture<CopyObjectOutput, CopyObjectError> { unimplemented!() }
/// # fn create_bucket(&self, _: CreateBucketRequest) -> RusotoFuture<CreateBucketOutput, CreateBucketError> { unimplemented!() }
/// # fn create_multipart_upload(&self, _: CreateMultipartUploadRequest) -> RusotoFuture<CreateMultipartUploadOutput, CreateMultipartUploadError> { unimplemented!() }
/// # fn delete_bucket(&self, _: DeleteBucketRequest) -> RusotoFuture<(), DeleteBucketError> { unimplemented!() }
/// # fn delete_bucket_analytics_configuration(&self, _: DeleteBucketAnalyticsConfigurationRequest) -> RusotoFuture<(), DeleteBucketAnalyticsConfigurationError> { unimplemented!() }
/// # fn delete_bucket_cors(&self, _: DeleteBucketCorsRequest) -> RusotoFuture<(), DeleteBucketCorsError> { unimplemented!() }
/// # fn delete_bucket_encryption(&self, _: DeleteBucketEncryptionRequest) -> RusotoFuture<(), DeleteBucketEncryptionError> { unimplemented!() }
/// # fn delete_bucket_inventory_configuration(&self, _: DeleteBucketInventoryConfigurationRequest) -> RusotoFuture<(), DeleteBucketInventoryConfigurationError> { unimplemented!() }
/// # fn delete_bucket_lifecycle(&self, _: DeleteBucketLifecycleRequest) -> RusotoFuture<(), DeleteBucketLifecycleError> { unimplemented!() }
/// # fn delete_bucket_metrics_configuration(&self, _: DeleteBucketMetricsConfigurationRequest) -> RusotoFuture<(), DeleteBucketMetricsConfigurationError> { unimplemented!() }
/// # fn delete_bucket_policy(&self, _: DeleteBucketPolicyRequest) -> RusotoFuture<(), DeleteBucketPolicyError> { unimplemented!() }
/// # fn delete_bucket_replication(&self, _: DeleteBucketReplicationRequest) -> RusotoFuture<(), DeleteBucketReplicationError> { unimplemented!() }
/// # fn delete_bucket_tagging(&self, _: DeleteBucketTaggingRequest) -> RusotoFuture<(), DeleteBucketTaggingError> { unimplemented!() }
/// # fn delete_bucket_website(&self, _: DeleteBucketWebsiteRequest) -> RusotoFuture<(), DeleteBucketWebsiteError> { unimplemented!() }
/// # fn delete_object(&self, _: DeleteObjectRequest) -> RusotoFuture<DeleteObjectOutput, DeleteObjectError> { unimplemented!() }
/// # fn delete_object_tagging(&self, _: DeleteObjectTaggingRequest) -> RusotoFuture<DeleteObjectTaggingOutput, DeleteObjectTaggingError> { unimplemented!() }
/// # fn delete_objects(&self, _: DeleteObjectsRequest) -> RusotoFuture<DeleteObjectsOutput, DeleteObjectsError> { unimplemented!() }
/// # fn get_bucket_accelerate_configuration(&self, _: GetBucketAccelerateConfigurationRequest) -> RusotoFuture<GetBucketAccelerateConfigurationOutput, GetBucketAccelerateConfigurationError> { unimplemented!() }
/// # fn get_bucket_acl(&self, _: GetBucketAclRequest) -> RusotoFuture<GetBucketAclOutput, GetBucketAclError> { unimplemented!() }
/// # fn get_bucket_analytics_configuration(&self, _: GetBucketAnalyticsConfigurationRequest) -> RusotoFuture<GetBucketAnalyticsConfigurationOutput, GetBucketAnalyticsConfigurationError> { unimplemented!() }
/// # fn get_bucket_cors(&self, _: GetBucketCorsRequest) -> RusotoFuture<GetBucketCorsOutput, GetBucketCorsError> { unimplemented!() }
/// # fn get_bucket_encryption(&self, _: GetBucketEncryptionRequest) -> RusotoFuture<GetBucketEncryptionOutput, GetBucketEncryptionError> { unimplemented!() }
/// # fn get_bucket_inventory_configuration(&self, _: GetBucketInventoryConfigurationRequest) -> RusotoFuture<GetBucketInventoryConfigurationOutput, GetBucketInventoryConfigurationError> { unimplemented!() }
/// # fn get_bucket_lifecycle(&self, _: GetBucketLifecycleRequest) -> RusotoFuture<GetBucketLifecycleOutput, GetBucketLifecycleError> { unimplemented!() }
/// # fn get_bucket_lifecycle_configuration(&self, _: GetBucketLifecycleConfigurationRequest) -> RusotoFuture<GetBucketLifecycleConfigurationOutput, GetBucketLifecycleConfigurationError> { unimplemented!() }
/// # fn get_bucket_location(&self, _: GetBucketLocationRequest) -> RusotoFuture<GetBucketLocationOutput, GetBucketLocationError> { unimplemented!() }
/// # fn get_bucket_logging(&self, _: GetBucketLoggingRequest) -> RusotoFuture<GetBucketLoggingOutput, GetBucketLoggingError> { unimplemented!() }
/// # fn get_bucket_metrics_configuration(&self, _: GetBucketMetricsConfigurationRequest) -> RusotoFuture<GetBucketMetricsConfigurationOutput, GetBucketMetricsConfigurationError> { unimplemented!() }
/// # fn get_bucket_notification(&self, _: GetBucketNotificationConfigurationRequest) -> RusotoFuture<NotificationConfigurationDeprecated, GetBucketNotificationError> { unimplemented!() }
/// # fn get_bucket_notification_configuration(&self, _: GetBucketNotificationConfigurationRequest) -> RusotoFuture<NotificationConfiguration, GetBucketNotificationConfigurationError> { unimplemented!() }
/// # fn get_bucket_policy(&self, _: GetBucketPolicyRequest) -> RusotoFuture<GetBucketPolicyOutput, GetBucketPolicyError> { unimplemented!() }
/// # fn get_bucket_replication(&self, _: GetBucketReplicationRequest) -> RusotoFuture<GetBucketReplicationOutput, GetBucketReplicationError> { unimplemented!() }
/// # fn get_bucket_request_payment(&self, _: GetBucketRequestPaymentRequest) -> RusotoFuture<GetBucketRequestPaymentOutput, GetBucketRequestPaymentError> { unimplemented!() }
/// # fn get_bucket_tagging(&self, _: GetBucketTaggingRequest) -> RusotoFuture<GetBucketTaggingOutput, GetBucketTaggingError> { unimplemented!() }
/// # fn get_bucket_versioning(&self, _: GetBucketVersioningRequest) -> RusotoFuture<GetBucketVersioningOutput, GetBucketVersioningError> { unimplemented!() }
/// # fn get_bucket_website(&self, _: GetBucketWebsiteRequest) -> RusotoFuture<GetBucketWebsiteOutput, GetBucketWebsiteError> { unimplemented!() }
/// # fn get_object(&self, _: GetObjectRequest) -> RusotoFuture<GetObjectOutput, GetObjectError> { unimplemented!() }
/// # fn get_object_acl(&self, _: GetObjectAclRequest) -> RusotoFuture<GetObjectAclOutput, GetObjectAclError> { unimplemented!() }
/// # fn get_object_tagging(&self, _: GetObjectTaggingRequest) -> RusotoFuture<GetObjectTaggingOutput, GetObjectTaggingError> { unimplemented!() }
/// # fn get_object_torrent(&self, _: GetObjectTorrentRequest) -> RusotoFuture<GetObjectTorrentOutput, GetObjectTorrentError> { unimplemented!() }
/// # fn head_bucket(&self, _: HeadBucketRequest) -> RusotoFuture<(), HeadBucketError> { unimplemented!() }
/// # fn head_object(&self, _: HeadObjectRequest) -> RusotoFuture<HeadObjectOutput, HeadObjectError> { unimplemented!() }
/// # fn list_bucket_analytics_configurations(&self, _: ListBucketAnalyticsConfigurationsRequest) -> RusotoFuture<ListBucketAnalyticsConfigurationsOutput, ListBucketAnalyticsConfigurationsError> { unimplemented!() }
/// # fn list_bucket_inventory_configurations(&self, _: ListBucketInventoryConfigurationsRequest) -> RusotoFuture<ListBucketInventoryConfigurationsOutput, ListBucketInventoryConfigurationsError> { unimplemented!() }
/// # fn list_bucket_metrics_configurations(&self, _: ListBucketMetricsConfigurationsRequest) -> RusotoFuture<ListBucketMetricsConfigurationsOutput, ListBucketMetricsConfigurationsError> { unimplemented!() }
/// # fn list_buckets(&self) -> RusotoFuture<ListBucketsOutput, ListBucketsError> { unimplemented!() }
/// # fn list_multipart_uploads(&self, _: ListMultipartUploadsRequest) -> RusotoFuture<ListMultipartUploadsOutput, ListMultipartUploadsError> { unimplemented!() }
/// # fn list_object_versions(&self, _: ListObjectVersionsRequest) -> RusotoFuture<ListObjectVersionsOutput, ListObjectVersionsError> { unimplemented!() }
/// # fn list_objects(&self, _: ListObjectsRequest) -> RusotoFuture<ListObjectsOutput, ListObjectsError> { unimplemented!() }
/// # fn list_objects_v2(&self, _: ListObjectsV2Request) -> RusotoFuture<ListObjectsV2Output, ListObjectsV2Error> { unimplemented!() }
/// # fn list_parts(&self, _: ListPartsRequest) -> RusotoFuture<ListPartsOutput, ListPartsError> { unimplemented!() }
/// # fn put_bucket_accelerate_configuration(&self, _: PutBucketAccelerateConfigurationRequest) -> RusotoFuture<(), PutBucketAccelerateConfigurationError> { unimplemented!() }
/// # fn put_bucket_acl(&self, _: PutBucketAclRequest) -> RusotoFuture<(), PutBucketAclError> { unimplemented!() }
/// # fn put_bucket_analytics_configuration(&self, _: PutBucketAnalyticsConfigurationRequest) -> RusotoFuture<(), PutBucketAnalyticsConfigurationError> { unimplemented!() }
/// # fn put_bucket_cors(&self, _: PutBucketCorsRequest) -> RusotoFuture<(), PutBucketCorsError> { unimplemented!() }
/// # fn put_bucket_encryption(&self, _: PutBucketEncryptionRequest) -> RusotoFuture<(), PutBucketEncryptionError> { unimplemented!() }
/// # fn put_bucket_inventory_configuration(&self, _: PutBucketInventoryConfigurationRequest) -> RusotoFuture<(), PutBucketInventoryConfigurationError> { unimplemented!() }
/// # fn put_bucket_lifecycle(&self, _: PutBucketLifecycleRequest) -> RusotoFuture<(), PutBucketLifecycleError> { unimplemented!() }
/// # fn put_bucket_lifecycle_configuration(&self, _: PutBucketLifecycleConfigurationRequest) -> RusotoFuture<(), PutBucketLifecycleConfigurationError> { unimplemented!() }
/// # fn put_bucket_logging(&self, _: PutBucketLoggingRequest) -> RusotoFuture<(), PutBucketLoggingError> { unimplemented!() }
/// # fn put_bucket_metrics_configuration(&self, _: PutBucketMetricsConfigurationRequest) -> RusotoFuture<(), PutBucketMetricsConfigurationError> { unimplemented!() }
/// # fn put_bucket_notification(&self, _: PutBucketNotificationRequest) -> RusotoFuture<(), PutBucketNotificationError> { unimplemented!() }
/// # fn put_bucket_notification_configuration(&self, _: PutBucketNotificationConfigurationRequest) -> RusotoFuture<(), PutBucketNotificationConfigurationError> { unimplemented!() }
/// # fn put_bucket_policy(&self, _: PutBucketPolicyRequest) -> RusotoFuture<(), PutBucketPolicyError> { unimplemented!() }
/// # fn put_bucket_replication(&self, _: PutBucketReplicationRequest) -> RusotoFuture<(), PutBucketReplicationError> { unimplemented!() }
/// # fn put_bucket_request_payment(&self, _: PutBucketRequestPaymentRequest) -> RusotoFuture<(), PutBucketRequestPaymentError> { unimplemented!() }
/// # fn put_bucket_tagging(&self, _: PutBucketTaggingRequest) -> RusotoFuture<(), PutBucketTaggingError> { unimplemented!() }
/// # fn put_bucket_versioning(&self, _: PutBucketVersioningRequest) -> RusotoFuture<(), PutBucketVersioningError> { unimplemented!() }
/// # fn put_bucket_website(&self, _: PutBucketWebsiteRequest) -> RusotoFuture<(), PutBucketWebsiteError> { unimplemented!() }
/// # fn put_object_acl(&self, _: PutObjectAclRequest) -> RusotoFuture<PutObjectAclOutput, PutObjectAclError> { unimplemented!() }
/// # fn put_object_tagging(&self, _: PutObjectTaggingRequest) -> RusotoFuture<PutObjectTaggingOutput, PutObjectTaggingError> { unimplemented!() }
/// # fn restore_object(&self, _: RestoreObjectRequest) -> RusotoFuture<RestoreObjectOutput, RestoreObjectError> { unimplemented!() }
/// # fn select_object_content(&self, _: SelectObjectContentRequest) -> RusotoFuture<SelectObjectContentOutput, SelectObjectContentError> { unimplemented!() }
/// # fn upload_part(&self, _: UploadPartRequest) -> RusotoFuture<UploadPartOutput, UploadPartError> { unimplemented!() }
/// # fn upload_part_copy(&self, _: UploadPartCopyRequest) -> RusotoFuture<UploadPartCopyOutput, UploadPartCopyError> { unimplemented!() }
/// }
/// ```
pub struct RusotoFuture<T, E> {
state: Option<RusotoFutureState<T, E>>,
}
pub fn new<T, E>(
future: Box<TimeoutFuture<Item = HttpResponse, Error = SignAndDispatchError> + Send>,
handler: fn(HttpResponse) -> Box<Future<Item = T, Error = E> + Send>,
) -> RusotoFuture<T, E> {
RusotoFuture {
state: Some(RusotoFutureState::SignAndDispatch { future, handler }),
}
}
impl<T, E> RusotoFuture<T, E> {
/// Set the timeout on the future to the provided duration.
///
/// Unlike `set_timeout` this method can be easily chained:
///
/// ```rust,ignore
/// # // TODO: remove ignore when the cyclic dependency issue has been fixed
/// # // https://github.com/rusoto/rusoto/pull/1141#issuecomment-421865362
/// # extern crate rusoto_core;
/// # extern crate rusoto_s3;
/// #
/// # use std::time::Duration;
/// # use rusoto_core::Region;
/// # use rusoto_s3::{S3, S3Client};
/// #
/// # let s3 = S3Client::new(Region::default());
/// #
/// let future = s3.list_buckets()
/// .with_timeout(Duration::from_secs(10));
/// ```
///
/// This is only guaranteed to take effect when called before the future
/// is polled for the first time.
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.set_timeout(timeout);
self
}
/// Set the timeout on the future to the provided duration.
///
/// This is only guaranteed to take effect when called before the future
/// is polled for the first time.
pub fn set_timeout(&mut self, timeout: Duration) {
if let Some(RusotoFutureState::SignAndDispatch { ref mut future, .. }) = self.state {
future.set_timeout(timeout);
}
}
/// Clear the timeout on the future.
///
/// This is only guaranteed to take effect when called before the future
/// is polled for the first time.
pub fn clear_timeout(&mut self) {
if let Some(RusotoFutureState::SignAndDispatch { ref mut future, .. }) = self.state {
future.clear_timeout();
}
}
/// Blocks the current thread until the future has resolved.
///
/// This is meant to provide a simple way for non-async consumers
/// to work with rusoto.
pub fn sync(self) -> Result<T, E>
where
T: Send + 'static,
E: From<CredentialsError> + From<HttpDispatchError> + Send + 'static,
{
spawn(self, &FALLBACK_RUNTIME.executor()).wait()
}
/// Wraps the provided future, mainly to mock the service response.
///
/// ## Caution
///
/// This is not intended to be used outside of the test case.
/// In production, [`Box`] is recommended.
pub fn from_future<F>(fut: F) -> Self
where
F: IntoFuture<Item = T, Error = E>,
F::Future: Send + 'static,
{
let fut = fut.into_future();
RusotoFuture {
state: Some(RusotoFutureState::RunningResponseHandler(Box::new(fut))),
}
}
}
impl<T, E> Future for RusotoFuture<T, E>
where
E: From<CredentialsError> + From<HttpDispatchError>,
{
type Item = T;
type Error = E;
fn poll(&mut self) -> Poll<T, E> {
match self.state.take().unwrap() {
RusotoFutureState::SignAndDispatch {
mut future,
handler,
} => match future.poll() {
Err(SignAndDispatchError::Credentials(err)) => Err(err.into()),
Err(SignAndDispatchError::Dispatch(err)) => Err(err.into()),
Ok(Async::Ready(response)) => {
self.state = Some(RusotoFutureState::RunningResponseHandler(handler(response)));
self.poll()
}
Ok(Async::NotReady) => {
self.state = Some(RusotoFutureState::SignAndDispatch { future, handler });
Ok(Async::NotReady)
}
},
RusotoFutureState::RunningResponseHandler(mut future) => match future.poll()? {
Async::Ready(value) => Ok(Async::Ready(value)),
Async::NotReady => {
self.state = Some(RusotoFutureState::RunningResponseHandler(future));
Ok(Async::NotReady)
}
},
}
}
}
enum RusotoFutureState<T, E> {
SignAndDispatch {
future: Box<TimeoutFuture<Item = HttpResponse, Error = SignAndDispatchError> + Send>,
handler: fn(HttpResponse) -> Box<Future<Item = T, Error = E> + Send>,
},
RunningResponseHandler(Box<Future<Item = T, Error = E> + Send>),
}
impl<T: Send + 'static, E: Send + 'static> From<Result<T, E>> for RusotoFuture<T, E> {
fn from(value: Result<T, E>) -> Self {
RusotoFuture::from_future(value)
}
}
#[test]
fn rusoto_future_is_send() {
fn is_send<T: Send>() {}
is_send::<RusotoFuture<(), ()>>();
}
#[test]
fn rusoto_future_from_ok() {
use std::error::Error;
let fut: RusotoFuture<i32, Box<Error + Send + Sync>> = RusotoFuture::from(Ok(42));
assert_eq!(fut.sync().unwrap(), 42);
}
#[test]
fn rusoto_future_from_err() {
use std::error::Error;
let fut: RusotoFuture<i32, Box<Error + Send + Sync>> =
RusotoFuture::from("ab".parse::<i32>().map_err(|e| e.into()));
assert!(fut.sync().is_err());
}
#[test]
fn rusoto_future_from_delay() {
use std::error::Error;
use std::time::{Duration, Instant};
use tokio_timer::Delay;
let deadline = Instant::now() + Duration::from_millis(500);
let delay = Delay::new(deadline);
let fut: RusotoFuture<i32, Box<Error + Send + Sync>> =
RusotoFuture::from_future(delay.map_err(From::from).map(|_| 42));
assert_eq!(fut.sync().unwrap(), 42);
assert!(deadline <= Instant::now());
}