Skip to content
This repository has been archived by the owner on Jul 27, 2022. It is now read-only.

Commit

Permalink
refactor: Try AsyncWrite
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Jun 18, 2022
1 parent c46ba68 commit 5a27004
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ rusoto_core = { version = "0.48.0", optional = true, default-features = false, f
rusoto_credential = { version = "0.48.0", optional = true, default-features = false }
rusoto_s3 = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
snafu = "0.7"
tokio = { version = "1.18", features = ["sync", "macros", "parking_lot", "rt-multi-thread", "time"] }
tokio = { version = "1.18", features = ["sync", "macros", "parking_lot", "rt-multi-thread", "time", "fs", "io-util"] }
tracing = { version = "0.1" }
reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls"] }
# Filesystem integration
Expand Down
4 changes: 2 additions & 2 deletions src/aws.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! An object store implementation for S3
use crate::MultiPartUpload;
use crate::util::format_http_range;
use crate::{
collect_bytes,
Expand Down Expand Up @@ -246,9 +247,8 @@ impl ObjectStore for AmazonS3 {

async fn upload(
&self,
_stream: BoxStream<'static, Result<Bytes>>,
_location: &Path,
) -> Result<()> {
) -> Result<Box<dyn MultiPartUpload>> {
todo!()
}

Expand Down
5 changes: 2 additions & 3 deletions src/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::{
path::{Path, DELIMITER},
util::format_prefix,
GetResult, ListResult, ObjectMeta, ObjectStore, Result,
GetResult, ListResult, ObjectMeta, ObjectStore, Result, MultiPartUpload,
};
use async_trait::async_trait;
use azure_core::{prelude::*, HttpClient};
Expand Down Expand Up @@ -200,9 +200,8 @@ impl ObjectStore for MicrosoftAzure {

async fn upload(
&self,
_stream: BoxStream<'static, Result<Bytes>>,
_location: &Path,
) -> Result<()> {
) -> Result<Box<dyn MultiPartUpload>> {
todo!()
}

Expand Down
7 changes: 3 additions & 4 deletions src/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
use crate::{
path::{Path, DELIMITER},
util::format_prefix,
GetResult, ListResult, ObjectMeta, ObjectStore, Result,
GetResult, ListResult, ObjectMeta, ObjectStore, Result, MultiPartUpload,
};
use async_trait::async_trait;
use bytes::Bytes;
use cloud_storage::{Client, Object};
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use snafu::{ResultExt, Snafu};
use std::ops::Range;
use std::pin::Pin;
Expand Down Expand Up @@ -158,9 +158,8 @@ impl ObjectStore for GoogleCloudStorage {

async fn upload(
&self,
_stream: BoxStream<'static, Result<Bytes>>,
_location: &Path,
) -> Result<()> {
) -> Result<Box<dyn MultiPartUpload>> {
todo!()
}

Expand Down
41 changes: 23 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::util::{collect_bytes, maybe_spawn_blocking};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use tokio::io::AsyncWrite;
use futures::{stream::BoxStream, StreamExt};
use snafu::Snafu;
use std::fmt::{Debug, Formatter};
Expand All @@ -55,14 +56,11 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// Save the provided bytes to the specified location.
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()>;

/// Upload a stream of bytes as a multi-part upload
///
/// This is typically implemented with a multi-part upload.
/// Get a multi-part upload that allows writing data in chunks
async fn upload(
&self,
stream: BoxStream<'static, Result<Bytes>>,
location: &Path,
) -> Result<()>;
) -> Result<Box<dyn MultiPartUpload>>;

/// Return the bytes that are stored at the specified location.
async fn get(&self, location: &Path) -> Result<GetResult>;
Expand Down Expand Up @@ -145,6 +143,17 @@ pub struct ObjectMeta {
pub size: usize,
}

/// Multi-part upload
#[async_trait]
pub trait MultiPartUpload: AsyncWrite + Unpin {
/// Abort the multipart upload
///
/// On some services, if you fail to call this and do not
/// close the sink, parts will linger in the object store
/// and will be billed.
async fn abort(&mut self) -> Result<()>;
}

/// Result for a get request
///
/// This special cases the case of a local file, as some systems may
Expand Down Expand Up @@ -311,7 +320,7 @@ mod test_util {
mod tests {
use super::*;
use crate::test_util::flatten_list_stream;
use futures::TryStreamExt;
use tokio::io::AsyncWriteExt;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -498,24 +507,20 @@ mod tests {
// Can write to storage
let data = get_byte_stream(5_000, 10);
let bytes_expected = data.concat();
let data_stream = Box::pin(
futures::stream::iter(data)
.map(Ok)
.map_err(|source| super::Error::UpstreamError { source }),
);
storage.upload(data_stream, &location).await?;
let mut writer = storage.upload(&location).await?;
for chunk in data {
writer.write_all(&chunk).await?;
}
let bytes_written = storage.get(&location).await?.bytes().await?;
assert_eq!(bytes_expected, bytes_written);

// Can overwrite some storage
let data = get_byte_stream(5_000, 5);
let bytes_expected = data.concat();
let data_stream = Box::pin(
futures::stream::iter(data)
.map(Ok)
.map_err(|source| super::Error::UpstreamError { source }),
);
storage.upload(data_stream, &location).await?;
let mut writer = storage.upload(&location).await?;
for chunk in data {
writer.write_all(&chunk).await?;
}
let bytes_written = storage.get(&location).await?.bytes().await?;
assert_eq!(bytes_expected, bytes_written);

Expand Down
54 changes: 40 additions & 14 deletions src/local.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
//! An object store implementation for a local filesystem
use crate::MultiPartUpload;
use crate::{
maybe_spawn_blocking,
path::{filesystem_path_to_url, Path},
GetResult, ListResult, ObjectMeta, ObjectStore, Result,
};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use futures::{stream::BoxStream, StreamExt};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use std::collections::VecDeque;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom, Write};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::{collections::BTreeSet, convert::TryFrom, io};
use url::Url;
Expand Down Expand Up @@ -228,23 +231,13 @@ impl ObjectStore for LocalFileSystem {

async fn upload(
&self,
mut stream: BoxStream<'static, Result<Bytes>>,
location: &Path,
) -> Result<()> {
) -> Result<Box<dyn MultiPartUpload>> {
let path = self.config.path_to_filesystem(location)?;

let mut file = open_writable_file(&path)?;
let file = open_writable_file(&path)?;

while let Some(data) = stream.try_next().await? {
// TODO: Spawn blocking?
// maybe_spawn_blocking(move || {
file.write_all(&data).context(UnableToCopyDataToFileSnafu)?
// .map_err(|err| err.into())
// })
// .await?;
}

Ok(())
Ok(Box::new(LocalUpload { file: tokio::fs::File::from_std(file) }))
}

async fn get(&self, location: &Path) -> Result<GetResult> {
Expand Down Expand Up @@ -459,6 +452,39 @@ impl ObjectStore for LocalFileSystem {
}
}

struct LocalUpload {
file: tokio::fs::File,
}

#[async_trait]
impl MultiPartUpload for LocalUpload {
async fn abort(&mut self) -> Result<()> {
Ok(())
}
}

impl AsyncWrite for LocalUpload {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8]
) -> std::task::Poll<Result<usize, io::Error>> {
Pin::new(&mut self.file).poll_write(cx, buf)
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>
) -> std::task::Poll<Result<(), io::Error>> {
Pin::new(&mut self.file).poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>
) -> std::task::Poll<Result<(), io::Error>> {
Pin::new(&mut self.file).poll_shutdown(cx)
}
}

fn open_file(path: &std::path::PathBuf) -> Result<File> {
let file = File::open(path).map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
Expand Down
4 changes: 2 additions & 2 deletions src/memory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! An in-memory object store implementation
use crate::MultiPartUpload;
use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -68,9 +69,8 @@ impl ObjectStore for InMemory {

async fn upload(
&self,
_stream: BoxStream<'static, Result<Bytes>>,
_location: &Path,
) -> Result<()> {
) -> Result<Box<dyn MultiPartUpload>> {
todo!()
}

Expand Down
4 changes: 2 additions & 2 deletions src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
sync::{Arc, Mutex},
};

use crate::MultiPartUpload;
use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -136,9 +137,8 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {

async fn upload(
&self,
_stream: BoxStream<'static, Result<Bytes>>,
_location: &Path,
) -> Result<()> {
) -> Result<Box<dyn MultiPartUpload>> {
todo!()
}

Expand Down

0 comments on commit 5a27004

Please sign in to comment.