Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement sharding #98

Merged
merged 1 commit into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration-tests/basic/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ in {

${lib.optionalString (config.storage == "local") ''
with subtest("Check that all chunks are actually deleted after GC"):
files = server.succeed("find /var/lib/atticd/storage -type f")
files = server.succeed("find /var/lib/atticd/storage -type f ! -name 'VERSION'")
print(f"Remaining files: {files}")
assert files.strip() == ""
assert files.strip() == "", "Some files remain after GC: " + files
''}

with subtest("Check that we can include the upload info in the payload"):
Expand Down
107 changes: 101 additions & 6 deletions server/src/storage/local.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! Local file storage.

use std::ffi::OsStr;
use std::os::unix::ffi::OsStrExt;
use std::path::Path;
use std::path::PathBuf;

use async_trait::async_trait;
Expand Down Expand Up @@ -30,17 +33,95 @@ pub struct LocalRemoteFile {
pub name: String,
}

async fn read_version(storage_path: &Path) -> ServerResult<u32> {
let version_path = storage_path.join("VERSION");
let v = match fs::read_to_string(&version_path).await {
Ok(version) => version
.trim()
.parse()
.map_err(|_| ErrorKind::StorageError(anyhow::anyhow!("Invalid version file")))?,
Err(e) if e.kind() == io::ErrorKind::NotFound => 0,
Err(e) => {
return Err(ErrorKind::StorageError(anyhow::anyhow!(
"Failed to read version file: {}",
e
))
.into());
}
};
Ok(v)
}

async fn write_version(storage_path: &Path, version: u32) -> ServerResult<()> {
let version_path = storage_path.join("VERSION");
fs::write(&version_path, format!("{}", version))
.await
.map_err(ServerError::storage_error)?;
Ok(())
}

async fn upgrade_0_to_1(storage_path: &Path) -> ServerResult<()> {
let mut files = fs::read_dir(storage_path)
.await
.map_err(ServerError::storage_error)?;
// move all files to subdirectory using the first two characters of the filename
while let Some(file) = files
.next_entry()
.await
.map_err(ServerError::storage_error)?
{
if file
.file_type()
.await
.map_err(ServerError::storage_error)?
.is_file()
{
let name = file.file_name();
let name_bytes = name.as_os_str().as_bytes();
let parents = storage_path
.join(OsStr::from_bytes(&name_bytes[0..1]))
.join(OsStr::from_bytes(&name_bytes[0..2]));
let new_path = parents.join(name);
fs::create_dir_all(&parents).await.map_err(|e| {
ErrorKind::StorageError(anyhow::anyhow!("Failed to create directory {}", e))
})?;
fs::rename(&file.path(), &new_path).await.map_err(|e| {
ErrorKind::StorageError(anyhow::anyhow!(
"Failed to move file {} to {}: {}",
file.path().display(),
new_path.display(),
e
))
})?;
}
}

Ok(())
}

impl LocalBackend {
pub async fn new(config: LocalStorageConfig) -> ServerResult<Self> {
fs::create_dir_all(&config.path)
.await
.map_err(ServerError::storage_error)?;
fs::create_dir_all(&config.path).await.map_err(|e| {
ErrorKind::StorageError(anyhow::anyhow!(
"Failed to create storage directory {}: {}",
config.path.display(),
e
))
})?;

let version = read_version(&config.path).await?;
if version == 0 {
upgrade_0_to_1(&config.path).await?;
}
write_version(&config.path, 1).await?;

Ok(Self { config })
}

fn get_path(&self, p: &str) -> PathBuf {
self.config.path.join(p)
let level1 = &p[0..1];
let level2 = &p[0..2];
self.config.path.join(level1).join(level2).join(p)
}
}

Expand All @@ -51,9 +132,23 @@ impl StorageBackend for LocalBackend {
name: String,
mut stream: &mut (dyn AsyncRead + Unpin + Send),
) -> ServerResult<RemoteFile> {
let mut file = File::create(self.get_path(&name))
let path = self.get_path(&name);
fs::create_dir_all(path.parent().unwrap())
.await
.map_err(ServerError::storage_error)?;
.map_err(|e| {
ErrorKind::StorageError(anyhow::anyhow!(
"Failed to create directory {}: {}",
path.parent().unwrap().display(),
e
))
})?;
let mut file = File::create(self.get_path(&name)).await.map_err(|e| {
ErrorKind::StorageError(anyhow::anyhow!(
"Failed to create file {}: {}",
self.get_path(&name).display(),
e
))
})?;

io::copy(&mut stream, &mut file)
.await
Expand Down