Skip to content

Commit

Permalink
feat(scripts): run lifecycle scripts in correct dependency order (#263)
Browse files Browse the repository at this point in the history
Ref: #220
  • Loading branch information
zkat committed May 19, 2023
1 parent 4f0fbba commit 00979ae
Show file tree
Hide file tree
Showing 3 changed files with 433 additions and 355 deletions.
256 changes: 83 additions & 173 deletions crates/node-maintainer/src/linkers/hoisted.rs
@@ -1,11 +1,13 @@
use std::collections::HashSet;
use std::ffi::OsStr;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::{atomic, Arc};

use futures::lock::Mutex;
use futures::{StreamExt, TryStreamExt};
use oro_common::BuildManifest;
use oro_script::OroScript;
use petgraph::stable_graph::NodeIndex;
use unicase::UniCase;
use walkdir::WalkDir;

Expand All @@ -15,13 +17,23 @@ use crate::{META_FILE_NAME, STORE_DIR_NAME};

use super::LinkerOptions;

pub(crate) struct HoistedLinker(pub(crate) LinkerOptions);
pub(crate) struct HoistedLinker {
pub(crate) pending_rebuild: Arc<Mutex<HashSet<NodeIndex>>>,
pub(crate) opts: LinkerOptions,
}

impl HoistedLinker {
pub fn new(opts: LinkerOptions) -> Self {
Self {
pending_rebuild: Arc::new(Mutex::new(HashSet::new())),
opts,
}
}

pub async fn prune(&self, graph: &Graph) -> Result<usize, NodeMaintainerError> {
let start = std::time::Instant::now();

let prefix = self.0.root.join("node_modules");
let prefix = self.opts.root.join("node_modules");

if !prefix.exists() {
tracing::debug!(
Expand All @@ -31,7 +43,7 @@ impl HoistedLinker {
return Ok(0);
}

if self.0.actual_tree.is_none()
if self.opts.actual_tree.is_none()
|| async_std::path::Path::new(&prefix.join(STORE_DIR_NAME))
.exists()
.await
Expand Down Expand Up @@ -109,7 +121,7 @@ impl HoistedLinker {
UniCase::from(entry_subpath_path.to_string_lossy().replace('\\', "/"));

let actual = self
.0
.opts
.actual_tree
.as_ref()
.and_then(|tree| tree.packages.get(&entry_subpath));
Expand All @@ -122,7 +134,7 @@ impl HoistedLinker {
// delete them later.
if ideal.is_some()
&& self
.0
.opts
.actual_tree
.as_ref()
.map(|tree| tree.packages.contains_key(&entry_subpath))
Expand Down Expand Up @@ -151,13 +163,13 @@ impl HoistedLinker {
{
continue;
} else if entry.file_type().is_dir() {
if let Some(pb) = &self.0.on_prune_progress {
if let Some(pb) = &self.opts.on_prune_progress {
pb(entry_path);
}
tracing::trace!("Pruning extraneous directory: {}", entry.path().display());
async_std::fs::remove_dir_all(entry.path()).await?;
} else {
if let Some(pb) = &self.0.on_prune_progress {
if let Some(pb) = &self.opts.on_prune_progress {
pb(entry_path);
}
tracing::trace!("Pruning extraneous file: {}", entry.path().display());
Expand All @@ -184,32 +196,46 @@ impl HoistedLinker {
tracing::debug!("Extracting node_modules/...");
let start = std::time::Instant::now();

let root = &self.0.root;
let root = &self.opts.root;
let stream = futures::stream::iter(graph.inner.node_indices());
let concurrent_count = Arc::new(AtomicUsize::new(0));
let actually_extracted = Arc::new(AtomicUsize::new(0));
let pending_rebuild = self.pending_rebuild.clone();
let total = graph.inner.node_count();
let total_completed = Arc::new(AtomicUsize::new(0));
let node_modules = root.join("node_modules");
std::fs::create_dir_all(&node_modules)?;
let prefer_copy = self.0.prefer_copy
|| match self.0.cache.as_deref() {
let prefer_copy = self.opts.prefer_copy
|| match self.opts.cache.as_deref() {
Some(cache) => super::supports_reflink(cache, &node_modules),
None => false,
};
let validate = self.0.validate;
let validate = self.opts.validate;
stream
.map(|idx| Ok((idx, concurrent_count.clone(), total_completed.clone(), actually_extracted.clone())))
.map(|idx| {
Ok((
idx,
concurrent_count.clone(),
total_completed.clone(),
actually_extracted.clone(),
pending_rebuild.clone(),
))
})
.try_for_each_concurrent(
self.0.concurrency,
move |(child_idx, concurrent_count, total_completed, actually_extracted)| async move {
self.opts.concurrency,
move |(
child_idx,
concurrent_count,
total_completed,
actually_extracted,
pending_rebuild,
)| async move {
if child_idx == graph.root {
return Ok(());
}

concurrent_count.fetch_add(1, atomic::Ordering::SeqCst);
let subdir =
graph
let subdir = graph
.node_path(child_idx)
.iter()
.map(|x| x.to_string())
Expand All @@ -225,9 +251,27 @@ impl HoistedLinker {
.extract_to_dir(&target_dir, prefer_copy, validate)
.await?;
actually_extracted.fetch_add(1, atomic::Ordering::SeqCst);
let target_dir = target_dir.clone();
let build_mani = async_std::task::spawn_blocking(move || {
BuildManifest::from_path(target_dir.join("package.json")).map_err(|e| {
NodeMaintainerError::BuildManifestReadError(
target_dir.join("package.json"),
e,
)
})
})
.await?;
if build_mani.scripts.contains_key("preinstall")
|| build_mani.scripts.contains_key("install")
|| build_mani.scripts.contains_key("postinstall")
|| build_mani.scripts.contains_key("prepare")
|| !build_mani.bin.is_empty()
{
pending_rebuild.lock().await.insert(child_idx);
}
}

if let Some(on_extract) = &self.0.on_extract_progress {
if let Some(on_extract) = &self.opts.on_extract_progress {
on_extract(&graph[child_idx].package);
}

Expand All @@ -247,19 +291,18 @@ impl HoistedLinker {
node_modules.join(META_FILE_NAME),
graph.to_kdl()?.to_string(),
)?;
let actually_extracted = actually_extracted.load(atomic::Ordering::SeqCst);
let extracted_count = actually_extracted.load(atomic::Ordering::SeqCst);

tracing::debug!(
"Extracted {actually_extracted} package{} in {}ms.",
if actually_extracted == 1 { "" } else { "s" },
"Extracted {extracted_count} package{} in {}ms.",
if extracted_count == 1 { "" } else { "s" },
start.elapsed().as_millis(),
);
Ok(actually_extracted)
Ok(extracted_count)
}

async fn link_bins(&self, graph: &Graph) -> Result<usize, NodeMaintainerError> {
tracing::debug!("Linking bins...");
let start = std::time::Instant::now();
let root = &self.0.root;
pub async fn link_bins(&self, graph: &Graph) -> Result<usize, NodeMaintainerError> {
let root = &self.opts.root;
let linked = Arc::new(AtomicUsize::new(0));
let bin_file_name = Some(OsStr::new(".bin"));
let nm_file_name = Some(OsStr::new("node_modules"));
Expand All @@ -275,9 +318,9 @@ impl HoistedLinker {
async_std::fs::remove_dir_all(entry.path()).await?;
}
}
futures::stream::iter(graph.inner.node_indices())
futures::stream::iter(self.pending_rebuild.lock().await.iter().copied())
.map(|idx| Ok((idx, linked.clone())))
.try_for_each_concurrent(self.0.concurrency, move |(idx, linked)| async move {
.try_for_each_concurrent(self.opts.concurrency, move |(idx, linked)| async move {
if idx == graph.root {
return Ok(());
}
Expand Down Expand Up @@ -339,152 +382,19 @@ impl HoistedLinker {
})
.await?;
let linked = linked.load(atomic::Ordering::SeqCst);
tracing::debug!(
"Linked {linked} package bins in {}ms.",
start.elapsed().as_millis()
);
Ok(linked)
}

pub async fn rebuild(
&self,
graph: &Graph,
ignore_scripts: bool,
) -> Result<(), NodeMaintainerError> {
tracing::debug!("Running lifecycle scripts...");
let start = std::time::Instant::now();
if !ignore_scripts {
self.run_scripts(graph, "preinstall").await?;
}
self.link_bins(graph).await?;
if !ignore_scripts {
self.run_scripts(graph, "install").await?;
self.run_scripts(graph, "postinstall").await?;
}
tracing::debug!(
"Ran lifecycle scripts in {}ms.",
start.elapsed().as_millis()
);
Ok(())
}

async fn run_scripts(&self, graph: &Graph, event: &str) -> Result<(), NodeMaintainerError> {
tracing::debug!("Running {event} lifecycle scripts");
let start = std::time::Instant::now();
let root = &self.0.root;
futures::stream::iter(graph.inner.node_indices())
.map(Ok)
.try_for_each_concurrent(self.0.script_concurrency, move |idx| async move {
let package_dir = if idx == graph.root {
root.clone()
} else {
let subdir = graph
.node_path(idx)
.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join("/node_modules/");
root.join("node_modules").join(subdir)
};

let is_optional = graph.is_optional(idx);

let build_mani = BuildManifest::from_path(package_dir.join("package.json"))
.map_err(|e| {
NodeMaintainerError::BuildManifestReadError(
package_dir.join("package.json"),
e,
)
})?;

let name = graph[idx].package.name().to_string();
if build_mani.scripts.contains_key(event) {
let package_dir = package_dir.clone();
let root = root.clone();
let event = event.to_owned();
let event_clone = event.clone();
let span = tracing::info_span!("script");
let _span_enter = span.enter();
if let Some(on_script_start) = &self.0.on_script_start {
on_script_start(&graph[idx].package, &event);
}
std::mem::drop(_span_enter);
let mut script = match async_std::task::spawn_blocking(move || {
OroScript::new(package_dir, event_clone)?
.workspace_path(root)
.spawn()
})
.await
{
Ok(script) => script,
Err(e) if is_optional => {
let e: NodeMaintainerError = e.into();
tracing::debug!("Error in optional dependency script: {}", e);
return Ok(());
}
Err(e) => return Err(e.into()),
};
let stdout = script.stdout.take();
let stderr = script.stderr.take();
let stdout_name = name.clone();
let stderr_name = name.clone();
let stdout_on_line = self.0.on_script_line.clone();
let stderr_on_line = self.0.on_script_line.clone();
let stdout_span = span;
let stderr_span = stdout_span.clone();
let event_clone = event.clone();
let join = futures::try_join!(
async_std::task::spawn_blocking(move || {
let _enter = stdout_span.enter();
if let Some(stdout) = stdout {
for line in BufReader::new(stdout).lines() {
let line = line?;
tracing::debug!("stdout::{stdout_name}::{event}: {}", line);
if let Some(on_script_line) = &stdout_on_line {
on_script_line(&line);
}
}
}
Ok::<_, NodeMaintainerError>(())
}),
async_std::task::spawn_blocking(move || {
let _enter = stderr_span.enter();
if let Some(stderr) = stderr {
for line in BufReader::new(stderr).lines() {
let line = line?;
tracing::debug!(
"stderr::{stderr_name}::{event_clone}: {}",
line
);
if let Some(on_script_line) = &stderr_on_line {
on_script_line(&line);
}
}
}
Ok::<_, NodeMaintainerError>(())
}),
async_std::task::spawn_blocking(move || {
script.wait()?;
Ok::<_, NodeMaintainerError>(())
}),
);
match join {
Ok(_) => {}
Err(e) if is_optional => {
tracing::debug!("Error in optional dependency script: {}", e);
return Ok(());
}
Err(e) => return Err(e),
}
}

Ok::<_, NodeMaintainerError>(())
})
.await?;
tracing::debug!(
"Ran lifecycle scripts for {event} in {}ms.",
start.elapsed().as_millis()
);
Ok(())
pub fn package_dir(&self, graph: &Graph, idx: NodeIndex) -> (PathBuf, PathBuf) {
let subdir = graph
.node_path(idx)
.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join("/node_modules/");
(
self.opts.root.join("node_modules").join(subdir),
self.opts.root.clone(),
)
}
}

0 comments on commit 00979ae

Please sign in to comment.