Skip to content
This repository has been archived by the owner on Jul 27, 2022. It is now read-only.

Commit

Permalink
feat: enforce upload not readable until shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Jul 17, 2022
1 parent c9ead3d commit 070edfb
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 34 deletions.
9 changes: 9 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,15 @@ mod tests {
for chunk in &data {
writer.write_all(chunk).await?;
}

// Object should not yet exist in store
let meta_res = storage.head(&location).await;
assert!(meta_res.is_err());
assert!(matches!(
meta_res.unwrap_err(),
crate::Error::NotFound { .. }
));

writer.shutdown().await?;
let bytes_written = storage.get(&location).await?.bytes().await?;
assert_eq!(bytes_expected, bytes_written);
Expand Down
189 changes: 155 additions & 34 deletions src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use futures::future::BoxFuture;
use futures::FutureExt;
use futures::{stream::BoxStream, StreamExt};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::collections::VecDeque;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom, Write};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::{collections::BTreeSet, convert::TryFrom, io};
use std::{collections::VecDeque, path::PathBuf};
use tokio::io::AsyncWrite;
use url::Url;
use walkdir::{DirEntry, WalkDir};
Expand Down Expand Up @@ -235,27 +235,46 @@ impl ObjectStore for LocalFileSystem {
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let path = self.config.path_to_filesystem(location)?;
let dest = self.config.path_to_filesystem(location)?;

// Generate an id in case of concurrent writes
let mut multipart_id = 1;

let file = open_writable_file(&path)?;
// Will write to a temporary path
let staging_path = loop {
let staging_path = get_upload_stage_path(&dest, &multipart_id.to_string());

match std::fs::metadata(&staging_path) {
Err(err) if err.kind() == io::ErrorKind::NotFound => break staging_path,
Err(err) => return Err(Error::UnableToCopyDataToFile { source: err }.into()),
Ok(_) => multipart_id += 1,
}
};
let multipart_id = multipart_id.to_string();

let file = open_writable_file(&staging_path)?;

Ok((
String::new(),
multipart_id.clone(),
Box::new(LocalUpload {
inner_write: None,
inner_state: LocalUploadState::Idle,
multipart_id,
dest,
file: Arc::new(file),
}),
))
}

async fn abort_multipart(&self, location: &Path, _multipart_id: &MultipartId) -> Result<()> {
// Clean up partial write
self.delete(location)
.map(|res| match res {
Err(super::Error::NotFound { path: _, source: _ }) => Ok(()),
res => res,
})
.await
async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
let dest = self.config.path_to_filesystem(location)?;
let staging_path: PathBuf = get_upload_stage_path(&dest, multipart_id);

maybe_spawn_blocking(move || {
std::fs::remove_file(&staging_path)
.context(UnableToDeleteFileSnafu { path: staging_path })?;
Ok(())
})
.await
}

async fn get(&self, location: &Path) -> Result<GetResult> {
Expand Down Expand Up @@ -332,19 +351,23 @@ impl ObjectStore for LocalFileSystem {
// Don't include the root directory itself
.min_depth(1);

let s =
walkdir.into_iter().flat_map(move |result_dir_entry| {
match convert_walkdir_result(result_dir_entry) {
Err(e) => Some(Err(e)),
Ok(None) => None,
Ok(entry @ Some(_)) => entry
.filter(|dir_entry| dir_entry.file_type().is_file())
.map(|entry| {
let location = config.filesystem_to_path(entry.path())?;
convert_entry(entry, location)
}),
}
});
let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
match convert_walkdir_result(result_dir_entry) {
Err(e) => Some(Err(e)),
Ok(None) => None,
Ok(entry @ Some(_)) => entry
.filter(|dir_entry| {
dir_entry.file_type().is_file()
// Ignore file names with # in them, since they might be in-progress uploads.
// They would be rejected anyways by filesystem_to_path below.
&& !dir_entry.file_name().to_string_lossy().contains('#')
})
.map(|entry| {
let location = config.filesystem_to_path(entry.path())?;
convert_entry(entry, location)
}),
}
});

// If no tokio context, return iterator directly as no
// need to perform chunked spawn_blocking reads
Expand Down Expand Up @@ -394,6 +417,13 @@ impl ObjectStore for LocalFileSystem {

for entry_res in walkdir.into_iter().map(convert_walkdir_result) {
if let Some(entry) = entry_res? {
if entry.file_type().is_file()
// Ignore file names with # in them, since they might be in-progress uploads.
// They would be rejected anyways by filesystem_to_path below.
&& entry.file_name().to_string_lossy().contains('#')
{
continue;
}
let is_directory = entry.file_type().is_dir();
let entry_location = config.filesystem_to_path(entry.path())?;

Expand Down Expand Up @@ -469,8 +499,22 @@ impl ObjectStore for LocalFileSystem {
}
}

fn get_upload_stage_path(dest: &std::path::Path, multipart_id: &MultipartId) -> PathBuf {
let mut staging_path = dest.as_os_str().to_owned();
staging_path.push(format!("#{}", multipart_id));
staging_path.into()
}

enum LocalUploadState {
Idle,
Writing(BoxFuture<'static, Result<usize, io::Error>>),
Committing(BoxFuture<'static, Result<(), io::Error>>),
}

struct LocalUpload {
inner_write: Option<BoxFuture<'static, Result<usize, io::Error>>>,
inner_state: LocalUploadState,
dest: PathBuf,
multipart_id: MultipartId,
file: Arc<std::fs::File>,
}

Expand All @@ -484,19 +528,31 @@ impl AsyncWrite for LocalUpload {
if let Ok(runtime) = tokio::runtime::Handle::try_current() {
let data: Vec<u8> = buf.to_vec();
let data_len = data.len();
let inner_write = self.inner_write.get_or_insert_with(move || {
Box::pin(

if let LocalUploadState::Idle = &mut self.inner_state {
self.inner_state = LocalUploadState::Writing(Box::pin(
runtime
.spawn_blocking(move || (&*file).write_all(&data))
.map(move |res| match res {
Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)),
Ok(res) => res.map(move |_| data_len),
}),
)
});
));
}
let inner_write = match &mut self.inner_state {
LocalUploadState::Writing(write) => write,
LocalUploadState::Committing(_) => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Tried to write to file where a commit is already in progress.",
)));
}
LocalUploadState::Idle => unreachable!(),
};

match inner_write.poll_unpin(cx) {
Poll::Ready(res) => {
self.inner_write = None;
self.inner_state = LocalUploadState::Idle;
Poll::Ready(res)
}
Poll::Pending => Poll::Pending,
Expand All @@ -515,10 +571,46 @@ impl AsyncWrite for LocalUpload {
}

fn poll_shutdown(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
self.poll_flush(cx)
let staging_path = get_upload_stage_path(&self.dest, &self.multipart_id);
let dest = self.dest.clone();

if let Ok(runtime) = tokio::runtime::Handle::try_current() {
if let LocalUploadState::Idle = &mut self.inner_state {
self.inner_state = LocalUploadState::Committing(Box::pin(
runtime
.spawn_blocking(move || std::fs::rename(&staging_path, &dest))
.map(move |res| match res {
Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)),
Ok(res) => res,
}),
));
};

let inner_fut = match &mut self.inner_state {
LocalUploadState::Writing(_) => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Tried to commit a file where a write is in progress.",
)));
}
LocalUploadState::Committing(fut) => fut,
LocalUploadState::Idle => unreachable!(),
};

match inner_fut.poll_unpin(cx) {
Poll::Ready(_) => {
self.inner_state = LocalUploadState::Idle;
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
}
} else {
std::fs::rename(&staging_path, &self.dest)?;
Poll::Ready(Ok(()))
}
}
}

Expand Down Expand Up @@ -626,6 +718,7 @@ mod tests {
Error as ObjectStoreError, ObjectStore,
};
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;

#[tokio::test]
async fn file_test() {
Expand Down Expand Up @@ -841,4 +934,32 @@ mod tests {
err
);
}

#[tokio::test]
async fn list_hides_incomplete_uploads() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
let location = Path::from("some_file");

let data = Bytes::from("arbitrary data");
let (multipart_id, mut writer) = integration.put_multipart(&location).await.unwrap();
writer.write_all(&data).await.unwrap();

let (multipart_id_2, mut writer_2) = integration.put_multipart(&location).await.unwrap();
assert_ne!(multipart_id, multipart_id_2);
writer_2.write_all(&data).await.unwrap();

let list = flatten_list_stream(&integration, None).await.unwrap();
assert_eq!(list.len(), 0);

let list: Vec<Path> = integration
.list_with_delimiter(None)
.await
.unwrap()
.objects
.into_iter()
.map(|entry| entry.location)
.collect();
assert_eq!(list.len(), 0);
}
}

0 comments on commit 070edfb

Please sign in to comment.