Skip to content

Commit

Permalink
feat(cow): prefer CoW on systems that support it. Also, fall back to …
Browse files Browse the repository at this point in the history
…copy when hard links fail.
  • Loading branch information
zkat committed Mar 7, 2023
1 parent c1d832f commit 0e29632
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 29 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Expand Up @@ -78,10 +78,12 @@ maplit = "1.0.2"
miette = "5.5.0"
node-semver = "2.1.0"
nom = "7.1.3"
once_cell = "1.17.1"
percent-encoding = "2.1.0"
pretty_assertions = "1.3.0"
proc-macro2 = "1.0.18"
quote = "1.0.7"
reflink = "0.1.3"
reqwest = "0.11.14"
reqwest-middleware = "0.2.0"
serde = "1.0.152"
Expand Down
2 changes: 2 additions & 0 deletions crates/nassun/Cargo.toml
Expand Up @@ -37,6 +37,8 @@ backon = { workspace = true }
cacache = { workspace = true }
flate2 = { workspace = true }
io_tee = { workspace = true }
once_cell = { workspace = true }
reflink = { workspace = true }
tar = { workspace = true }
tempfile = { workspace = true }
which = { workspace = true }
Expand Down
30 changes: 27 additions & 3 deletions crates/nassun/src/client.rs
Expand Up @@ -25,6 +25,7 @@ pub struct NassunOpts {
base_dir: Option<PathBuf>,
default_tag: Option<String>,
registries: HashMap<Option<String>, Url>,
prefer_copy: bool,
}

impl NassunOpts {
Expand All @@ -37,6 +38,17 @@ impl NassunOpts {
self
}

/// When extracting tarballs, prefer to copy files to their destination as
/// separate, standalone files instead of hard linking them. Full copies
/// will still happen when hard linking fails. Furthermore, on filesystems
/// that support Copy-on-Write (zfs, btrfs, APFS (macOS), etc), this
/// option will use that feature for all copies.
#[cfg(not(target_arch = "wasm32"))]
pub fn prefer_copy(mut self, prefer_copy: bool) -> Self {
self.prefer_copy = prefer_copy;
self
}

pub fn registry(mut self, registry: Url) -> Self {
self.registries.insert(None, registry);
self
Expand Down Expand Up @@ -84,6 +96,10 @@ impl NassunOpts {
.unwrap_or_else(|| std::env::current_dir().expect("failed to get cwd.")),
default_tag: self.default_tag.unwrap_or_else(|| "latest".into()),
},
#[cfg(not(target_arch = "wasm32"))]
prefer_copy: self.prefer_copy,
#[cfg(target_arch = "wasm32")]
prefer_copy: false,
npm_fetcher: Arc::new(NpmFetcher::new(
#[allow(clippy::redundant_clone)]
client.clone(),
Expand All @@ -101,6 +117,7 @@ impl NassunOpts {
#[derive(Clone)]
pub struct Nassun {
cache: Arc<Option<PathBuf>>,
prefer_copy: bool,
resolver: PackageResolver,
npm_fetcher: Arc<dyn PackageFetcher>,
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -199,7 +216,7 @@ impl Nassun {
let fetcher = self.pick_fetcher(&spec);
let name = fetcher.name(&spec, &self.resolver.base_dir).await?;
self.resolver
.resolve(name, spec, fetcher, self.cache.clone())
.resolve(name, spec, fetcher, self.cache.clone(), self.prefer_copy)
.await
}

Expand All @@ -214,8 +231,14 @@ impl Nassun {
resolved: PackageResolution,
) -> Package {
let fetcher = self.pick_fetcher(&from);
self.resolver
.resolve_from(name, from, resolved, fetcher, self.cache.clone())
self.resolver.resolve_from(
name,
from,
resolved,
fetcher,
self.cache.clone(),
self.prefer_copy,
)
}

/// Creates a "resolved" package from a plain [`oro_common::Manifest`].
Expand All @@ -227,6 +250,7 @@ impl Nassun {
from: PackageSpec::Dir {
path: PathBuf::from("."),
},
prefer_copy: false,
name: manifest.name.clone().unwrap_or_else(|| "dummy".to_string()),
resolved: PackageResolution::Dir {
name: manifest.name.clone().unwrap_or_else(|| "dummy".to_string()),
Expand Down
2 changes: 1 addition & 1 deletion crates/nassun/src/fetch/git.rs
Expand Up @@ -78,7 +78,7 @@ impl GitFetcher {
async fn fetch_tarball(&self, dir: &Path, tarball: &Url) -> Result<()> {
let tarball = self.client.stream_external(tarball).await?;
Tarball::new_unchecked(tarball)
.extract_from_tarball_data(dir, None)
.extract_from_tarball_data(dir, None, false)
.await?;
Ok(())
}
Expand Down
23 changes: 16 additions & 7 deletions crates/nassun/src/package.rs
Expand Up @@ -22,6 +22,8 @@ pub struct Package {
pub(crate) fetcher: Arc<dyn PackageFetcher>,
pub(crate) base_dir: PathBuf,
pub(crate) cache: Arc<Option<PathBuf>>,
#[cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
pub(crate) prefer_copy: bool,
}

impl Package {
Expand Down Expand Up @@ -169,33 +171,40 @@ impl Package {
// If extracting from the cache failed for some reason
// (bad data, etc), then go ahead and do a network
// extract.
match self.extract_from_cache(dir, cache, entry).await {
match self
.extract_from_cache(dir, cache, entry, self.prefer_copy)
.await
{
Ok(_) => return Ok(sri),
Err(e) => {
tracing::debug!("extract_from_cache failed: {}", e);
return self
.tarball_checked(sri)
.await?
.extract_from_tarball_data(dir, self.cache.as_deref())
.extract_from_tarball_data(
dir,
self.cache.as_deref(),
self.prefer_copy,
)
.await;
}
}
} else {
return self
.tarball_checked(sri.clone())
.await?
.extract_from_tarball_data(dir, self.cache.as_deref())
.extract_from_tarball_data(dir, self.cache.as_deref(), self.prefer_copy)
.await;
}
}
self.tarball_checked(sri.clone())
.await?
.extract_from_tarball_data(dir, self.cache.as_deref())
.extract_from_tarball_data(dir, self.cache.as_deref(), self.prefer_copy)
.await
} else {
self.tarball_unchecked()
.await?
.extract_from_tarball_data(dir, self.cache.as_deref())
.extract_from_tarball_data(dir, self.cache.as_deref(), self.prefer_copy)
.await
}
}
Expand All @@ -206,6 +215,7 @@ impl Package {
dir: &Path,
cache: &Path,
entry: cacache::Metadata,
prefer_copy: bool,
) -> Result<()> {
let dir = PathBuf::from(dir);
let cache = PathBuf::from(cache);
Expand All @@ -231,8 +241,7 @@ impl Package {
created.insert(parent);
}

cacache::hard_link_hash_unchecked_sync(&cache, &sri, &path)
.map_err(move |e| NassunError::ExtractCacheError(e, Some(path)))?;
crate::tarball::extract_from_cache(&cache, &sri, &path, prefer_copy)?;
}
Ok::<_, NassunError>(())
})
Expand Down
4 changes: 4 additions & 0 deletions crates/nassun/src/resolver.rs
Expand Up @@ -109,13 +109,15 @@ impl PackageResolver {
resolved: PackageResolution,
fetcher: Arc<dyn PackageFetcher>,
cache: Arc<Option<PathBuf>>,
prefer_copy: bool,
) -> Package {
Package {
name,
from,
resolved,
fetcher,
cache,
prefer_copy,
base_dir: self.base_dir.clone(),
}
}
Expand All @@ -126,6 +128,7 @@ impl PackageResolver {
wanted: PackageSpec,
fetcher: Arc<dyn PackageFetcher>,
cache: Arc<Option<PathBuf>>,
prefer_copy: bool,
) -> Result<Package, NassunError> {
let packument = fetcher.corgi_packument(&wanted, &self.base_dir).await?;
let resolved = self.get_resolution(&name, &wanted, &packument)?;
Expand All @@ -136,6 +139,7 @@ impl PackageResolver {
fetcher,
base_dir: self.base_dir.clone(),
cache,
prefer_copy,
})
}

Expand Down
77 changes: 59 additions & 18 deletions crates/nassun/src/tarball.rs
Expand Up @@ -25,6 +25,10 @@ use crate::TarballStream;

const MAX_IN_MEMORY_TARBALL_SIZE: usize = 1024 * 1024 * 5;

#[cfg(not(target_arch = "wasm32"))]
pub(crate) static SUPPORTS_REFLINK: once_cell::sync::Lazy<bool> =
once_cell::sync::Lazy::new(supports_reflink);

pub struct Tarball {
checker: Option<IntegrityChecker>,
reader: TarballStream,
Expand Down Expand Up @@ -57,13 +61,14 @@ impl Tarball {
mut self,
dir: &Path,
cache: Option<&Path>,
prefer_copy: bool,
) -> Result<Integrity> {
let integrity = self.integrity.take();
let temp = self.into_temp().await?;
let dir = PathBuf::from(dir);
let cache = cache.map(PathBuf::from);
async_std::task::spawn_blocking(move || {
temp.extract_to_dir(&dir, integrity, cache.as_deref())
temp.extract_to_dir(&dir, integrity, cache.as_deref(), prefer_copy)
})
.await
}
Expand Down Expand Up @@ -189,6 +194,7 @@ impl TempTarball {
dir: &Path,
tarball_integrity: Option<Integrity>,
cache: Option<&Path>,
prefer_copy: bool,
) -> Result<Integrity> {
let mut file_index = serde_json::Map::new();
let mut drain_buf = [0u8; 1024 * 8];
Expand Down Expand Up @@ -254,23 +260,7 @@ impl TempTarball {
.commit()
.map_err(|e| NassunError::ExtractCacheError(e, Some(path.clone())))?;

// HACK: This is horrible, but on wsl2 (at least), this
// was sometimes crashing with an ENOENT (?!), which
// really REALLY shouldn't happen. So we just retry a few
// times and hope the problem goes away.
let op = || {
cacache::hard_link_hash_unchecked_sync(cache, &sri, &path)
.map_err(|e| NassunError::ExtractCacheError(e, Some(path.clone())))
};
op.retry(&ConstantBuilder::default().with_delay(Duration::from_millis(50)))
.notify(|err, wait| {
tracing::debug!(
"Error hard linking from cache: {}. Retrying after {}ms",
err,
wait.as_micros() / 1000
)
})
.call()?;
extract_from_cache(cache, &sri, &path, prefer_copy)?;

file_index.insert(
entry_subpath.to_string_lossy().into(),
Expand Down Expand Up @@ -368,3 +358,54 @@ fn strip_one(path: &Path) -> Option<&Path> {
pub(crate) fn tarball_key(integrity: &Integrity) -> String {
format!("nassun::package::{integrity}")
}

#[cfg(not(target_arch = "wasm32"))]
fn supports_reflink() -> bool {
let tempdir = match tempfile::tempdir() {
Ok(t) => t,
Err(_) => return false,
};
match std::fs::write(tempdir.path().join("a"), "a") {
Ok(_) => {}
Err(_) => return false,
};
let supports_reflink = reflink::reflink(tempdir.path().join("a"), tempdir.path().join("b"))
.map(|_| true)
.unwrap_or(false);

if supports_reflink {
tracing::info!("Filesystem supports reflinks, extracted data will use copy-on-write reflinks instead of hard links or full copies (unless cache in in a separate disk).")
}

supports_reflink
}

pub(crate) fn extract_from_cache(
cache: &Path,
sri: &Integrity,
to: &Path,
prefer_copy: bool,
) -> Result<()> {
if *SUPPORTS_REFLINK || prefer_copy {
cacache::copy_hash_unchecked_sync(cache, sri, to)
.map_err(|e| NassunError::ExtractCacheError(e, Some(PathBuf::from(to))))?;
} else {
// HACK: This is horrible, but on wsl2 (at least), this
// was sometimes crashing with an ENOENT (?!), which
// really REALLY shouldn't happen. So we just retry a few
// times and hope the problem goes away.
let op = || cacache::hard_link_hash_unchecked_sync(cache, sri, to);
op.retry(&ConstantBuilder::default().with_delay(Duration::from_millis(50)))
.notify(|err, wait| {
tracing::debug!(
"Error hard linking from cache: {}. Retrying after {}ms",
err,
wait.as_micros() / 1000
)
})
.call()
.or_else(|_| cacache::copy_hash_unchecked_sync(cache, sri, to))
.map_err(|e| NassunError::ExtractCacheError(e, Some(PathBuf::from(to))))?;
}
Ok(())
}

0 comments on commit 0e29632

Please sign in to comment.