Skip to content

Commit

Permalink
Make ObjectStore::copy Atomic and Automatically Create Parent Directo…
Browse files Browse the repository at this point in the history
…ries (apache#4758) (apache#4760) (apache#4759)

* Make LocalFileSystem::copy atomic (apache#4758)

* Create sub-directories for copy (apache#4760)

* Fix HttpStore

* Clippy

* Tweak error propagation

* Add doc
  • Loading branch information
tustvold authored Sep 2, 2023
1 parent 4927c1e commit 6e28c03
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 67 deletions.
50 changes: 28 additions & 22 deletions object_store/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,31 +256,37 @@ impl Client {
}

pub async fn copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
let from = self.path_url(from);
let to = self.path_url(to);
let method = Method::from_bytes(b"COPY").unwrap();

let mut builder = self
.client
.request(method, from)
.header("Destination", to.as_str());
let mut retry = false;
loop {
let method = Method::from_bytes(b"COPY").unwrap();

if !overwrite {
builder = builder.header("Overwrite", "F");
}
let mut builder = self
.client
.request(method, self.path_url(from))
.header("Destination", self.path_url(to).as_str());

match builder.send_retry(&self.retry_config).await {
Ok(_) => Ok(()),
Err(e)
if !overwrite
&& matches!(e.status(), Some(StatusCode::PRECONDITION_FAILED)) =>
{
Err(crate::Error::AlreadyExists {
path: to.to_string(),
source: Box::new(e),
})
if !overwrite {
builder = builder.header("Overwrite", "F");
}
Err(source) => Err(Error::Request { source }.into()),

return match builder.send_retry(&self.retry_config).await {
Ok(_) => Ok(()),
Err(source) => Err(match source.status() {
Some(StatusCode::PRECONDITION_FAILED) if !overwrite => {
crate::Error::AlreadyExists {
path: to.to_string(),
source: Box::new(source),
}
}
// Some implementations return 404 instead of 409
Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => {
retry = true;
self.create_parent_directories(to).await?;
continue;
}
_ => Error::Request { source }.into(),
}),
};
}
}
}
Expand Down
20 changes: 18 additions & 2 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1105,8 +1105,24 @@ mod tests {
files.sort_unstable();
assert_eq!(files, vec![emoji_file.clone(), dst.clone()]);

let dst2 = Path::from("new/nested/foo.parquet");
storage.copy(&emoji_file, &dst2).await.unwrap();
let mut files = flatten_list_stream(storage, None).await.unwrap();
files.sort_unstable();
assert_eq!(files, vec![emoji_file.clone(), dst.clone(), dst2.clone()]);

let dst3 = Path::from("new/nested2/bar.parquet");
storage.rename(&dst, &dst3).await.unwrap();
let mut files = flatten_list_stream(storage, None).await.unwrap();
files.sort_unstable();
assert_eq!(files, vec![emoji_file.clone(), dst2.clone(), dst3.clone()]);

let err = storage.head(&dst).await.unwrap_err();
assert!(matches!(err, Error::NotFound { .. }));

storage.delete(&emoji_file).await.unwrap();
storage.delete(&dst).await.unwrap();
storage.delete(&dst3).await.unwrap();
storage.delete(&dst2).await.unwrap();
let files = flatten_list_stream(storage, Some(&emoji_prefix))
.await
.unwrap();
Expand Down Expand Up @@ -1605,7 +1621,7 @@ mod tests {
pub(crate) async fn copy_if_not_exists(storage: &DynObjectStore) {
// Create two objects
let path1 = Path::from("test1");
let path2 = Path::from("test2");
let path2 = Path::from("not_exists_nested/test2");
let contents1 = Bytes::from("cats");
let contents2 = Bytes::from("dogs");

Expand Down
112 changes: 69 additions & 43 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use futures::{stream::BoxStream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use snafu::{ensure, ResultExt, Snafu};
use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions};
use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
use std::ops::Range;
Expand Down Expand Up @@ -78,10 +78,10 @@ pub(crate) enum Error {
path: PathBuf,
},

#[snafu(display("Unable to create file {}: {}", path.display(), err))]
#[snafu(display("Unable to create file {}: {}", path.display(), source))]
UnableToCreateFile {
source: io::Error,
path: PathBuf,
err: io::Error,
},

#[snafu(display("Unable to delete file {}: {}", path.display(), source))]
Expand Down Expand Up @@ -336,12 +336,13 @@ impl ObjectStore for LocalFileSystem {
// If the file was successfully opened, return it wrapped in a boxed `AsyncWrite` trait object.
Ok(file) => return Ok(Box::new(file)),
// If the error is that the file was not found, attempt to create the file and any necessary parent directories.
Err(err) if err.kind() == ErrorKind::NotFound => {
Err(source) if source.kind() == ErrorKind::NotFound => {
// Get the path to the parent directory of the file.
let parent = path
.parent()
// If the parent directory does not exist, return a `UnableToCreateFileSnafu` error.
.context(UnableToCreateFileSnafu { path: &path, err })?;
let parent =
path.parent().ok_or_else(|| Error::UnableToCreateFile {
path: path.to_path_buf(),
source,
})?;

// Create the parent directory and any necessary ancestors.
tokio::fs::create_dir_all(parent)
Expand Down Expand Up @@ -584,20 +585,42 @@ impl ObjectStore for LocalFileSystem {
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let from = self.config.path_to_filesystem(from)?;
let to = self.config.path_to_filesystem(to)?;

maybe_spawn_blocking(move || {
std::fs::copy(&from, &to).context(UnableToCopyFileSnafu { from, to })?;
Ok(())
let mut id = 0;
// In order to make this atomic we:
//
// - hard link to a hidden temporary file
// - atomically rename this temporary file into place
//
// This is necessary because hard_link returns an error if the destination already exists
maybe_spawn_blocking(move || loop {
let staged = staged_upload_path(&to, &id.to_string());
match std::fs::hard_link(&from, &staged) {
Ok(_) => {
return std::fs::rename(&staged, &to).map_err(|source| {
Error::UnableToCopyFile { from, to, source }.into()
})
}
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => id += 1,
ErrorKind::NotFound => create_parent_dirs(&to, source)?,
_ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
},
}
})
.await
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
let from = self.config.path_to_filesystem(from)?;
let to = self.config.path_to_filesystem(to)?;
maybe_spawn_blocking(move || {
std::fs::rename(&from, &to).context(UnableToCopyFileSnafu { from, to })?;
Ok(())
maybe_spawn_blocking(move || loop {
match std::fs::rename(&from, &to) {
Ok(_) => return Ok(()),
Err(source) => match source.kind() {
ErrorKind::NotFound => create_parent_dirs(&to, source)?,
_ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
},
}
})
.await
}
Expand All @@ -606,25 +629,37 @@ impl ObjectStore for LocalFileSystem {
let from = self.config.path_to_filesystem(from)?;
let to = self.config.path_to_filesystem(to)?;

maybe_spawn_blocking(move || {
std::fs::hard_link(&from, &to).map_err(|err| match err.kind() {
io::ErrorKind::AlreadyExists => Error::AlreadyExists {
path: to.to_str().unwrap().to_string(),
source: err,
}
.into(),
_ => Error::UnableToCopyFile {
from,
to,
source: err,
}
.into(),
})
maybe_spawn_blocking(move || loop {
match std::fs::hard_link(&from, &to) {
Ok(_) => return Ok(()),
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => {
return Err(Error::AlreadyExists {
path: to.to_str().unwrap().to_string(),
source,
}
.into())
}
ErrorKind::NotFound => create_parent_dirs(&to, source)?,
_ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
},
}
})
.await
}
}

/// Creates the parent directories of `path` or returns an error based on `source` if no parent
fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> {
let parent = path.parent().ok_or_else(|| Error::UnableToCreateFile {
path: path.to_path_buf(),
source,
})?;

std::fs::create_dir_all(parent).context(UnableToCreateDirSnafu { path: parent })?;
Ok(())
}

/// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `suffix`
///
/// Creates any directories if necessary
Expand All @@ -636,20 +671,11 @@ fn new_staged_upload(base: &std::path::Path) -> Result<(File, String)> {
let mut options = OpenOptions::new();
match options.read(true).write(true).create_new(true).open(&path) {
Ok(f) => return Ok((f, suffix)),
Err(e) if e.kind() == ErrorKind::AlreadyExists => {
multipart_id += 1;
}
Err(err) if err.kind() == ErrorKind::NotFound => {
let parent = path
.parent()
.context(UnableToCreateFileSnafu { path: &path, err })?;

std::fs::create_dir_all(parent)
.context(UnableToCreateDirSnafu { path: parent })?;

continue;
}
Err(source) => return Err(Error::UnableToOpenFile { source, path }.into()),
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => multipart_id += 1,
ErrorKind::NotFound => create_parent_dirs(&path, source)?,
_ => return Err(Error::UnableToOpenFile { source, path }.into()),
},
}
}
}
Expand Down

0 comments on commit 6e28c03

Please sign in to comment.