Skip to content
This repository has been archived by the owner on Jul 27, 2022. It is now read-only.

Commit

Permalink
refactor: Try AsyncWrite
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Jun 18, 2022
1 parent 6fd56ed commit c5134a2
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 33 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ rusoto_core = { version = "0.48.0", optional = true, default-features = false, f
rusoto_credential = { version = "0.48.0", optional = true, default-features = false }
rusoto_s3 = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
snafu = "0.7"
tokio = { version = "1.18", features = ["sync", "macros", "parking_lot", "rt-multi-thread", "time"] }
tokio = { version = "1.18", features = ["sync", "macros", "parking_lot", "rt-multi-thread", "time", "fs"] }
tracing = { version = "0.1" }
reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls"] }
# Filesystem integration
Expand Down
4 changes: 2 additions & 2 deletions src/aws.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! An object store implementation for S3
use crate::MultiPartUpload;
use crate::util::format_http_range;
use crate::{
collect_bytes,
Expand Down Expand Up @@ -232,9 +233,8 @@ impl ObjectStore for AmazonS3 {

async fn upload(
&self,
_stream: BoxStream<'static, Result<Bytes>>,
_location: &Path,
) -> Result<()> {
) -> Result<Box<dyn MultiPartUpload>> {
todo!()
}

Expand Down
5 changes: 2 additions & 3 deletions src/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::{
path::{Path, DELIMITER},
util::format_prefix,
GetResult, ListResult, ObjectMeta, ObjectStore, Result,
GetResult, ListResult, ObjectMeta, ObjectStore, Result, MultiPartUpload,
};
use async_trait::async_trait;
use azure_core::{prelude::*, HttpClient};
Expand Down Expand Up @@ -107,9 +107,8 @@ impl ObjectStore for MicrosoftAzure {

async fn upload(
&self,
_stream: BoxStream<'static, Result<Bytes>>,
_location: &Path,
) -> Result<()> {
) -> Result<Box<dyn MultiPartUpload>> {
todo!()
}

Expand Down
7 changes: 3 additions & 4 deletions src/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
use crate::{
path::{Path, DELIMITER},
util::format_prefix,
GetResult, ListResult, ObjectMeta, ObjectStore, Result,
GetResult, ListResult, ObjectMeta, ObjectStore, Result, MultiPartUpload,
};
use async_trait::async_trait;
use bytes::Bytes;
use cloud_storage::{Client, Object};
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use snafu::{ResultExt, Snafu};
use std::ops::Range;
use std::pin::Pin;
Expand Down Expand Up @@ -139,9 +139,8 @@ impl ObjectStore for GoogleCloudStorage {

async fn upload(
&self,
_stream: BoxStream<'static, Result<Bytes>>,
_location: &Path,
) -> Result<()> {
) -> Result<Box<dyn MultiPartUpload>> {
todo!()
}

Expand Down
19 changes: 14 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::util::{collect_bytes, maybe_spawn_blocking};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use tokio::io::AsyncWrite;
use futures::{stream::BoxStream, StreamExt};
use snafu::Snafu;
use std::fmt::{Debug, Formatter};
Expand All @@ -55,14 +56,11 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// Save the provided bytes to the specified location.
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()>;

/// Upload a stream of bytes as a multi-part upload
///
/// This is typically implemented with a multi-part upload.
/// Get a multi-part upload that allows writing data in chunks
async fn upload(
&self,
stream: BoxStream<'static, Result<Bytes>>,
location: &Path,
) -> Result<()>;
) -> Result<Box<dyn MultiPartUpload>>;

/// Return the bytes that are stored at the specified location.
async fn get(&self, location: &Path) -> Result<GetResult>;
Expand Down Expand Up @@ -116,6 +114,17 @@ pub struct ObjectMeta {
pub size: usize,
}

/// Multi-part upload
#[async_trait]
pub trait MultiPartUpload: AsyncWrite {
/// Abort the multipart upload
///
/// On some services, if you fail to call this and do not
/// close the sink, parts will linger in the object store
/// and will be billed.
async fn abort(&mut self) -> Result<()>;
}

/// Result for a get request
///
/// This special cases the case of a local file, as some systems may
Expand Down
54 changes: 40 additions & 14 deletions src/local.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
//! An object store implementation for a local filesystem
use crate::MultiPartUpload;
use crate::{
maybe_spawn_blocking, path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result,
};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use futures::{stream::BoxStream, StreamExt};
use percent_encoding::percent_decode;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use std::collections::VecDeque;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom, Write};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::{collections::BTreeSet, convert::TryFrom, io};
use url::Url;
Expand Down Expand Up @@ -225,23 +228,13 @@ impl ObjectStore for LocalFileSystem {

async fn upload(
&self,
mut stream: BoxStream<'static, Result<Bytes>>,
location: &Path,
) -> Result<()> {
) -> Result<Box<dyn MultiPartUpload>> {
let path = self.config.path_to_filesystem(location)?;

let mut file = open_writable_file(&path)?;
let file = open_writable_file(&path)?;

while let Some(data) = stream.try_next().await? {
// TODO: Spawn blocking?
// maybe_spawn_blocking(move || {
file.write_all(&data).context(UnableToCopyDataToFileSnafu)?
// .map_err(|err| err.into())
// })
// .await?;
}

Ok(())
Ok(Box::new(LocalUpload { file: tokio::fs::File::from_std(file) }))
}

async fn get(&self, location: &Path) -> Result<GetResult> {
Expand Down Expand Up @@ -413,6 +406,39 @@ impl ObjectStore for LocalFileSystem {
}
}

struct LocalUpload {
file: tokio::fs::File,
}

#[async_trait]
impl MultiPartUpload for LocalUpload {
async fn abort(&mut self) -> Result<()> {
Ok(())
}
}

impl AsyncWrite for LocalUpload {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8]
) -> std::task::Poll<Result<usize, io::Error>> {
Pin::new(&mut self.file).poll_write(cx, buf)
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>
) -> std::task::Poll<Result<(), io::Error>> {
Pin::new(&mut self.file).poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>
) -> std::task::Poll<Result<(), io::Error>> {
Pin::new(&mut self.file).poll_shutdown(cx)
}
}

fn open_file(path: &std::path::PathBuf) -> Result<File> {
let file = File::open(path).map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
Expand Down
4 changes: 2 additions & 2 deletions src/memory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! An in-memory object store implementation
use crate::MultiPartUpload;
use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -61,9 +62,8 @@ impl ObjectStore for InMemory {

async fn upload(
&self,
_stream: BoxStream<'static, Result<Bytes>>,
_location: &Path,
) -> Result<()> {
) -> Result<Box<dyn MultiPartUpload>> {
todo!()
}

Expand Down
4 changes: 2 additions & 2 deletions src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
sync::{Arc, Mutex},
};

use crate::MultiPartUpload;
use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -136,9 +137,8 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {

async fn upload(
&self,
_stream: BoxStream<'static, Result<Bytes>>,
_location: &Path,
) -> Result<()> {
) -> Result<Box<dyn MultiPartUpload>> {
todo!()
}

Expand Down

0 comments on commit c5134a2

Please sign in to comment.