From 00979ae8af581ffeb37786f7da71b7a80022f826 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kat=20March=C3=A1n?= Date: Thu, 18 May 2023 17:18:11 -0700 Subject: [PATCH] feat(scripts): run lifecycle scripts in correct dependency order (#263) Ref: https://github.com/orogene/orogene/issues/220 --- crates/node-maintainer/src/linkers/hoisted.rs | 256 ++++++---------- .../node-maintainer/src/linkers/isolated.rs | 258 ++++++----------- crates/node-maintainer/src/linkers/mod.rs | 274 +++++++++++++++++- 3 files changed, 433 insertions(+), 355 deletions(-) diff --git a/crates/node-maintainer/src/linkers/hoisted.rs b/crates/node-maintainer/src/linkers/hoisted.rs index 09c46863..5ffcab3b 100644 --- a/crates/node-maintainer/src/linkers/hoisted.rs +++ b/crates/node-maintainer/src/linkers/hoisted.rs @@ -1,11 +1,13 @@ +use std::collections::HashSet; use std::ffi::OsStr; -use std::io::{BufRead, BufReader}; +use std::path::PathBuf; use std::sync::atomic::AtomicUsize; use std::sync::{atomic, Arc}; +use futures::lock::Mutex; use futures::{StreamExt, TryStreamExt}; use oro_common::BuildManifest; -use oro_script::OroScript; +use petgraph::stable_graph::NodeIndex; use unicase::UniCase; use walkdir::WalkDir; @@ -15,13 +17,23 @@ use crate::{META_FILE_NAME, STORE_DIR_NAME}; use super::LinkerOptions; -pub(crate) struct HoistedLinker(pub(crate) LinkerOptions); +pub(crate) struct HoistedLinker { + pub(crate) pending_rebuild: Arc>>, + pub(crate) opts: LinkerOptions, +} impl HoistedLinker { + pub fn new(opts: LinkerOptions) -> Self { + Self { + pending_rebuild: Arc::new(Mutex::new(HashSet::new())), + opts, + } + } + pub async fn prune(&self, graph: &Graph) -> Result { let start = std::time::Instant::now(); - let prefix = self.0.root.join("node_modules"); + let prefix = self.opts.root.join("node_modules"); if !prefix.exists() { tracing::debug!( @@ -31,7 +43,7 @@ impl HoistedLinker { return Ok(0); } - if self.0.actual_tree.is_none() + if self.opts.actual_tree.is_none() || async_std::path::Path::new(&prefix.join(STORE_DIR_NAME)) .exists() .await @@ -109,7 +121,7 @@ impl HoistedLinker { UniCase::from(entry_subpath_path.to_string_lossy().replace('\\', "/")); let actual = self - .0 + .opts .actual_tree .as_ref() .and_then(|tree| tree.packages.get(&entry_subpath)); @@ -122,7 +134,7 @@ impl HoistedLinker { // delete them later. if ideal.is_some() && self - .0 + .opts .actual_tree .as_ref() .map(|tree| tree.packages.contains_key(&entry_subpath)) @@ -151,13 +163,13 @@ impl HoistedLinker { { continue; } else if entry.file_type().is_dir() { - if let Some(pb) = &self.0.on_prune_progress { + if let Some(pb) = &self.opts.on_prune_progress { pb(entry_path); } tracing::trace!("Pruning extraneous directory: {}", entry.path().display()); async_std::fs::remove_dir_all(entry.path()).await?; } else { - if let Some(pb) = &self.0.on_prune_progress { + if let Some(pb) = &self.opts.on_prune_progress { pb(entry_path); } tracing::trace!("Pruning extraneous file: {}", entry.path().display()); @@ -184,32 +196,46 @@ impl HoistedLinker { tracing::debug!("Extracting node_modules/..."); let start = std::time::Instant::now(); - let root = &self.0.root; + let root = &self.opts.root; let stream = futures::stream::iter(graph.inner.node_indices()); let concurrent_count = Arc::new(AtomicUsize::new(0)); let actually_extracted = Arc::new(AtomicUsize::new(0)); + let pending_rebuild = self.pending_rebuild.clone(); let total = graph.inner.node_count(); let total_completed = Arc::new(AtomicUsize::new(0)); let node_modules = root.join("node_modules"); std::fs::create_dir_all(&node_modules)?; - let prefer_copy = self.0.prefer_copy - || match self.0.cache.as_deref() { + let prefer_copy = self.opts.prefer_copy + || match self.opts.cache.as_deref() { Some(cache) => super::supports_reflink(cache, &node_modules), None => false, }; - let validate = self.0.validate; + let validate = self.opts.validate; stream - .map(|idx| Ok((idx, concurrent_count.clone(), total_completed.clone(), actually_extracted.clone()))) + .map(|idx| { + Ok(( + idx, + concurrent_count.clone(), + total_completed.clone(), + actually_extracted.clone(), + pending_rebuild.clone(), + )) + }) .try_for_each_concurrent( - self.0.concurrency, - move |(child_idx, concurrent_count, total_completed, actually_extracted)| async move { + self.opts.concurrency, + move |( + child_idx, + concurrent_count, + total_completed, + actually_extracted, + pending_rebuild, + )| async move { if child_idx == graph.root { return Ok(()); } concurrent_count.fetch_add(1, atomic::Ordering::SeqCst); - let subdir = - graph + let subdir = graph .node_path(child_idx) .iter() .map(|x| x.to_string()) @@ -225,9 +251,27 @@ impl HoistedLinker { .extract_to_dir(&target_dir, prefer_copy, validate) .await?; actually_extracted.fetch_add(1, atomic::Ordering::SeqCst); + let target_dir = target_dir.clone(); + let build_mani = async_std::task::spawn_blocking(move || { + BuildManifest::from_path(target_dir.join("package.json")).map_err(|e| { + NodeMaintainerError::BuildManifestReadError( + target_dir.join("package.json"), + e, + ) + }) + }) + .await?; + if build_mani.scripts.contains_key("preinstall") + || build_mani.scripts.contains_key("install") + || build_mani.scripts.contains_key("postinstall") + || build_mani.scripts.contains_key("prepare") + || !build_mani.bin.is_empty() + { + pending_rebuild.lock().await.insert(child_idx); + } } - if let Some(on_extract) = &self.0.on_extract_progress { + if let Some(on_extract) = &self.opts.on_extract_progress { on_extract(&graph[child_idx].package); } @@ -247,19 +291,18 @@ impl HoistedLinker { node_modules.join(META_FILE_NAME), graph.to_kdl()?.to_string(), )?; - let actually_extracted = actually_extracted.load(atomic::Ordering::SeqCst); + let extracted_count = actually_extracted.load(atomic::Ordering::SeqCst); + tracing::debug!( - "Extracted {actually_extracted} package{} in {}ms.", - if actually_extracted == 1 { "" } else { "s" }, + "Extracted {extracted_count} package{} in {}ms.", + if extracted_count == 1 { "" } else { "s" }, start.elapsed().as_millis(), ); - Ok(actually_extracted) + Ok(extracted_count) } - async fn link_bins(&self, graph: &Graph) -> Result { - tracing::debug!("Linking bins..."); - let start = std::time::Instant::now(); - let root = &self.0.root; + pub async fn link_bins(&self, graph: &Graph) -> Result { + let root = &self.opts.root; let linked = Arc::new(AtomicUsize::new(0)); let bin_file_name = Some(OsStr::new(".bin")); let nm_file_name = Some(OsStr::new("node_modules")); @@ -275,9 +318,9 @@ impl HoistedLinker { async_std::fs::remove_dir_all(entry.path()).await?; } } - futures::stream::iter(graph.inner.node_indices()) + futures::stream::iter(self.pending_rebuild.lock().await.iter().copied()) .map(|idx| Ok((idx, linked.clone()))) - .try_for_each_concurrent(self.0.concurrency, move |(idx, linked)| async move { + .try_for_each_concurrent(self.opts.concurrency, move |(idx, linked)| async move { if idx == graph.root { return Ok(()); } @@ -339,152 +382,19 @@ impl HoistedLinker { }) .await?; let linked = linked.load(atomic::Ordering::SeqCst); - tracing::debug!( - "Linked {linked} package bins in {}ms.", - start.elapsed().as_millis() - ); Ok(linked) } - pub async fn rebuild( - &self, - graph: &Graph, - ignore_scripts: bool, - ) -> Result<(), NodeMaintainerError> { - tracing::debug!("Running lifecycle scripts..."); - let start = std::time::Instant::now(); - if !ignore_scripts { - self.run_scripts(graph, "preinstall").await?; - } - self.link_bins(graph).await?; - if !ignore_scripts { - self.run_scripts(graph, "install").await?; - self.run_scripts(graph, "postinstall").await?; - } - tracing::debug!( - "Ran lifecycle scripts in {}ms.", - start.elapsed().as_millis() - ); - Ok(()) - } - - async fn run_scripts(&self, graph: &Graph, event: &str) -> Result<(), NodeMaintainerError> { - tracing::debug!("Running {event} lifecycle scripts"); - let start = std::time::Instant::now(); - let root = &self.0.root; - futures::stream::iter(graph.inner.node_indices()) - .map(Ok) - .try_for_each_concurrent(self.0.script_concurrency, move |idx| async move { - let package_dir = if idx == graph.root { - root.clone() - } else { - let subdir = graph - .node_path(idx) - .iter() - .map(|x| x.to_string()) - .collect::>() - .join("/node_modules/"); - root.join("node_modules").join(subdir) - }; - - let is_optional = graph.is_optional(idx); - - let build_mani = BuildManifest::from_path(package_dir.join("package.json")) - .map_err(|e| { - NodeMaintainerError::BuildManifestReadError( - package_dir.join("package.json"), - e, - ) - })?; - - let name = graph[idx].package.name().to_string(); - if build_mani.scripts.contains_key(event) { - let package_dir = package_dir.clone(); - let root = root.clone(); - let event = event.to_owned(); - let event_clone = event.clone(); - let span = tracing::info_span!("script"); - let _span_enter = span.enter(); - if let Some(on_script_start) = &self.0.on_script_start { - on_script_start(&graph[idx].package, &event); - } - std::mem::drop(_span_enter); - let mut script = match async_std::task::spawn_blocking(move || { - OroScript::new(package_dir, event_clone)? - .workspace_path(root) - .spawn() - }) - .await - { - Ok(script) => script, - Err(e) if is_optional => { - let e: NodeMaintainerError = e.into(); - tracing::debug!("Error in optional dependency script: {}", e); - return Ok(()); - } - Err(e) => return Err(e.into()), - }; - let stdout = script.stdout.take(); - let stderr = script.stderr.take(); - let stdout_name = name.clone(); - let stderr_name = name.clone(); - let stdout_on_line = self.0.on_script_line.clone(); - let stderr_on_line = self.0.on_script_line.clone(); - let stdout_span = span; - let stderr_span = stdout_span.clone(); - let event_clone = event.clone(); - let join = futures::try_join!( - async_std::task::spawn_blocking(move || { - let _enter = stdout_span.enter(); - if let Some(stdout) = stdout { - for line in BufReader::new(stdout).lines() { - let line = line?; - tracing::debug!("stdout::{stdout_name}::{event}: {}", line); - if let Some(on_script_line) = &stdout_on_line { - on_script_line(&line); - } - } - } - Ok::<_, NodeMaintainerError>(()) - }), - async_std::task::spawn_blocking(move || { - let _enter = stderr_span.enter(); - if let Some(stderr) = stderr { - for line in BufReader::new(stderr).lines() { - let line = line?; - tracing::debug!( - "stderr::{stderr_name}::{event_clone}: {}", - line - ); - if let Some(on_script_line) = &stderr_on_line { - on_script_line(&line); - } - } - } - Ok::<_, NodeMaintainerError>(()) - }), - async_std::task::spawn_blocking(move || { - script.wait()?; - Ok::<_, NodeMaintainerError>(()) - }), - ); - match join { - Ok(_) => {} - Err(e) if is_optional => { - tracing::debug!("Error in optional dependency script: {}", e); - return Ok(()); - } - Err(e) => return Err(e), - } - } - - Ok::<_, NodeMaintainerError>(()) - }) - .await?; - tracing::debug!( - "Ran lifecycle scripts for {event} in {}ms.", - start.elapsed().as_millis() - ); - Ok(()) + pub fn package_dir(&self, graph: &Graph, idx: NodeIndex) -> (PathBuf, PathBuf) { + let subdir = graph + .node_path(idx) + .iter() + .map(|x| x.to_string()) + .collect::>() + .join("/node_modules/"); + ( + self.opts.root.join("node_modules").join(subdir), + self.opts.root.clone(), + ) } } diff --git a/crates/node-maintainer/src/linkers/isolated.rs b/crates/node-maintainer/src/linkers/isolated.rs index 0a437d6e..ecb358e4 100644 --- a/crates/node-maintainer/src/linkers/isolated.rs +++ b/crates/node-maintainer/src/linkers/isolated.rs @@ -1,16 +1,14 @@ use std::{ collections::{HashMap, HashSet}, - io::{BufRead, BufReader}, - path::Path, + path::{Path, PathBuf}, sync::{ atomic::{self, AtomicUsize}, Arc, }, }; -use futures::{StreamExt, TryStreamExt}; +use futures::{lock::Mutex, StreamExt, TryStreamExt}; use oro_common::BuildManifest; -use oro_script::OroScript; use petgraph::{stable_graph::NodeIndex, visit::EdgeRef, Direction}; use ssri::Integrity; @@ -18,13 +16,23 @@ use crate::{graph::Graph, NodeMaintainerError, META_FILE_NAME, STORE_DIR_NAME}; use super::LinkerOptions; -pub(crate) struct IsolatedLinker(pub(crate) LinkerOptions); +pub(crate) struct IsolatedLinker { + pub(crate) pending_rebuild: Arc>>, + pub(crate) opts: LinkerOptions, +} impl IsolatedLinker { + pub fn new(opts: LinkerOptions) -> Self { + Self { + pending_rebuild: Arc::new(Mutex::new(HashSet::new())), + opts, + } + } + pub async fn prune(&self, graph: &Graph) -> Result { let start = std::time::Instant::now(); - let prefix = self.0.root.join("node_modules"); + let prefix = self.opts.root.join("node_modules"); if !prefix.exists() { tracing::debug!( @@ -36,7 +44,7 @@ impl IsolatedLinker { let store = prefix.join(STORE_DIR_NAME); - if self.0.actual_tree.is_none() || !async_std::path::Path::new(&store).exists().await { + if self.opts.actual_tree.is_none() || !async_std::path::Path::new(&store).exists().await { // If there's no actual tree previously calculated, we can't trust // *anything* inside node_modules, so everything is immediately // extraneous and we wipe it all. Sorry. @@ -78,7 +86,7 @@ impl IsolatedLinker { let prefix_ref = &prefix; futures::stream::iter(indices) .map(Ok) - .try_for_each_concurrent(self.0.concurrency, move |idx| async move { + .try_for_each_concurrent(self.opts.concurrency, move |idx| async move { let pkg = &graph[idx].package; let pkg_nm = if idx == graph.root { @@ -164,7 +172,7 @@ impl IsolatedLinker { async_std::fs::read_dir(&store) .await? .map(|entry| Ok((entry, pruned.clone()))) - .try_for_each_concurrent(self.0.concurrency, move |(entry, pruned)| async move { + .try_for_each_concurrent(self.opts.concurrency, move |(entry, pruned)| async move { let entry = entry?; let _path = entry.path(); let path: &Path = _path.as_ref(); @@ -233,27 +241,42 @@ impl IsolatedLinker { tracing::debug!("Applying node_modules/..."); let start = std::time::Instant::now(); - let root = &self.0.root; + let root = &self.opts.root; let store = root.join("node_modules").join(STORE_DIR_NAME); let store_ref = &store; let stream = futures::stream::iter(graph.inner.node_indices()); let concurrent_count = Arc::new(AtomicUsize::new(0)); + let pending_rebuild = self.pending_rebuild.clone(); let actually_extracted = Arc::new(AtomicUsize::new(0)); let total = graph.inner.node_count(); let total_completed = Arc::new(AtomicUsize::new(0)); let node_modules = root.join("node_modules"); std::fs::create_dir_all(&node_modules)?; - let prefer_copy = self.0.prefer_copy - || match self.0.cache.as_deref() { + let prefer_copy = self.opts.prefer_copy + || match self.opts.cache.as_deref() { Some(cache) => super::supports_reflink(cache, &node_modules), None => false, }; - let validate = self.0.validate; + let validate = self.opts.validate; stream - .map(|idx| Ok((idx, concurrent_count.clone(), total_completed.clone(), actually_extracted.clone()))) + .map(|idx| { + Ok(( + idx, + concurrent_count.clone(), + total_completed.clone(), + actually_extracted.clone(), + pending_rebuild.clone(), + )) + }) .try_for_each_concurrent( - self.0.concurrency, - move |(child_idx, concurrent_count, total_completed, actually_extracted)| async move { + self.opts.concurrency, + move |( + child_idx, + concurrent_count, + total_completed, + actually_extracted, + pending_rebuild, + )| async move { if child_idx == graph.root { link_deps(graph, child_idx, store_ref, &root.join("node_modules")).await?; return Ok(()); @@ -265,7 +288,10 @@ impl IsolatedLinker { // Actual package contents are extracted to // `node_modules/.oro-store/-/node_modules/` - let target_dir = store_ref.join(package_dir_name(graph, child_idx)).join("node_modules").join(pkg.name()); + let target_dir = store_ref + .join(package_dir_name(graph, child_idx)) + .join("node_modules") + .join(pkg.name()); let start = std::time::Instant::now(); @@ -275,11 +301,35 @@ impl IsolatedLinker { .extract_to_dir(&target_dir, prefer_copy, validate) .await?; actually_extracted.fetch_add(1, atomic::Ordering::SeqCst); + let target_dir = target_dir.clone(); + let build_mani = async_std::task::spawn_blocking(move || { + BuildManifest::from_path(target_dir.join("package.json")).map_err(|e| { + NodeMaintainerError::BuildManifestReadError( + target_dir.join("package.json"), + e, + ) + }) + }) + .await?; + if build_mani.scripts.contains_key("preinstall") + || build_mani.scripts.contains_key("install") + || build_mani.scripts.contains_key("postinstall") + || build_mani.scripts.contains_key("prepare") + || !build_mani.bin.is_empty() + { + pending_rebuild.lock().await.insert(child_idx); + } } - link_deps(graph, child_idx, store_ref, &target_dir.join("node_modules")).await?; + link_deps( + graph, + child_idx, + store_ref, + &target_dir.join("node_modules"), + ) + .await?; - if let Some(on_extract) = &self.0.on_extract_progress { + if let Some(on_extract) = &self.opts.on_extract_progress { on_extract(&graph[child_idx].package); } @@ -300,27 +350,25 @@ impl IsolatedLinker { node_modules.join(META_FILE_NAME), graph.to_kdl()?.to_string(), )?; - let actually_extracted = actually_extracted.load(atomic::Ordering::SeqCst); + let extracted_count = actually_extracted.load(atomic::Ordering::SeqCst); tracing::debug!( - "Extracted {actually_extracted} package{} in {}ms.", - if actually_extracted == 1 { "" } else { "s" }, + "Extracted {extracted_count} package{} in {}ms.", + if extracted_count == 1 { "" } else { "s" }, start.elapsed().as_millis(), ); - Ok(actually_extracted) + Ok(extracted_count) } - async fn link_bins(&self, graph: &Graph) -> Result { - tracing::debug!("Linking bins..."); - let start = std::time::Instant::now(); - let root = &self.0.root; + pub async fn link_bins(&self, graph: &Graph) -> Result { + let root = &self.opts.root; let store = root.join("node_modules").join(STORE_DIR_NAME); let store_ref = &store; let linked = Arc::new(AtomicUsize::new(0)); - futures::stream::iter(graph.inner.node_indices()) + futures::stream::iter(self.pending_rebuild.lock().await.iter().copied()) .map(|idx| Ok((idx, linked.clone()))) - .try_for_each_concurrent(self.0.concurrency, move |(idx, linked)| async move { + .try_for_each_concurrent(self.opts.concurrency, move |(idx, linked)| async move { if idx == graph.root { let added = link_dep_bins( graph, @@ -349,150 +397,20 @@ impl IsolatedLinker { .await?; let linked = linked.load(atomic::Ordering::SeqCst); - tracing::debug!( - "Linked {linked} package bins in {}ms.", - start.elapsed().as_millis() - ); Ok(linked) } - pub async fn rebuild( - &self, - graph: &Graph, - ignore_scripts: bool, - ) -> Result<(), NodeMaintainerError> { - tracing::debug!("Running lifecycle scripts..."); - let start = std::time::Instant::now(); - if !ignore_scripts { - self.run_scripts(graph, "preinstall").await?; - } - self.link_bins(graph).await?; - if !ignore_scripts { - self.run_scripts(graph, "install").await?; - self.run_scripts(graph, "postinstall").await?; - } - tracing::debug!( - "Ran lifecycle scripts in {}ms.", - start.elapsed().as_millis() - ); - Ok(()) - } - - async fn run_scripts(&self, graph: &Graph, event: &str) -> Result<(), NodeMaintainerError> { - tracing::debug!("Running {event} lifecycle scripts"); - let start = std::time::Instant::now(); - let root = &self.0.root; - let store = root.join("node_modules").join(STORE_DIR_NAME); - let store_ref = &store; - futures::stream::iter(graph.inner.node_indices()) - .map(Ok) - .try_for_each_concurrent(self.0.script_concurrency, move |idx| async move { - let pkg_dir = if idx == graph.root { - root.clone() - } else { - let pkg = &graph[idx].package; - store_ref - .join(package_dir_name(graph, idx)) - .join("node_modules") - .join(pkg.name()) - }; - - let is_optional = graph.is_optional(idx); - - let build_mani = - BuildManifest::from_path(pkg_dir.join("package.json")).map_err(|e| { - NodeMaintainerError::BuildManifestReadError(pkg_dir.join("package.json"), e) - })?; - - let name = graph[idx].package.name().to_string(); - if build_mani.scripts.contains_key(event) { - let package_dir = pkg_dir.clone(); - let package_dir_clone = package_dir.clone(); - let event = event.to_owned(); - let event_clone = event.clone(); - let span = tracing::info_span!("script"); - let _span_enter = span.enter(); - if let Some(on_script_start) = &self.0.on_script_start { - on_script_start(&graph[idx].package, &event); - } - std::mem::drop(_span_enter); - let mut script = match async_std::task::spawn_blocking(move || { - OroScript::new(package_dir, event_clone)? - .workspace_path(package_dir_clone) - .spawn() - }) - .await - { - Ok(script) => script, - Err(e) if is_optional => { - let e: NodeMaintainerError = e.into(); - tracing::debug!("Error in optional dependency script: {}", e); - return Ok(()); - } - Err(e) => return Err(e.into()), - }; - let stdout = script.stdout.take(); - let stderr = script.stderr.take(); - let stdout_name = name.clone(); - let stderr_name = name.clone(); - let stdout_on_line = self.0.on_script_line.clone(); - let stderr_on_line = self.0.on_script_line.clone(); - let stdout_span = span; - let stderr_span = stdout_span.clone(); - let event_clone = event.clone(); - let join = futures::try_join!( - async_std::task::spawn_blocking(move || { - let _enter = stdout_span.enter(); - if let Some(stdout) = stdout { - for line in BufReader::new(stdout).lines() { - let line = line?; - tracing::debug!("stdout::{stdout_name}::{event}: {}", line); - if let Some(on_script_line) = &stdout_on_line { - on_script_line(&line); - } - } - } - Ok::<_, NodeMaintainerError>(()) - }), - async_std::task::spawn_blocking(move || { - let _enter = stderr_span.enter(); - if let Some(stderr) = stderr { - for line in BufReader::new(stderr).lines() { - let line = line?; - tracing::debug!( - "stderr::{stderr_name}::{event_clone}: {}", - line - ); - if let Some(on_script_line) = &stderr_on_line { - on_script_line(&line); - } - } - } - Ok::<_, NodeMaintainerError>(()) - }), - async_std::task::spawn_blocking(move || { - script.wait()?; - Ok::<_, NodeMaintainerError>(()) - }), - ); - match join { - Ok(_) => {} - Err(e) if is_optional => { - tracing::debug!("Error in optional dependency script: {}", e); - return Ok(()); - } - Err(e) => return Err(e), - } - } - - Ok::<_, NodeMaintainerError>(()) - }) - .await?; - tracing::debug!( - "Ran lifecycle scripts for {event} in {}ms.", - start.elapsed().as_millis() - ); - Ok(()) + pub fn package_dir(&self, graph: &Graph, idx: NodeIndex) -> (PathBuf, PathBuf) { + let pkg = &graph[idx].package; + let dir = self + .opts + .root + .join("node_modules") + .join(STORE_DIR_NAME) + .join(package_dir_name(graph, idx)) + .join("node_modules") + .join(pkg.name()); + (dir.clone(), dir) } } diff --git a/crates/node-maintainer/src/linkers/mod.rs b/crates/node-maintainer/src/linkers/mod.rs index 188e3f44..99318f89 100644 --- a/crates/node-maintainer/src/linkers/mod.rs +++ b/crates/node-maintainer/src/linkers/mod.rs @@ -1,15 +1,25 @@ #[cfg(not(target_arch = "wasm32"))] -mod hoisted; -#[cfg(not(target_arch = "wasm32"))] -mod isolated; - +use std::io::{BufRead, BufReader}; #[cfg(not(target_arch = "wasm32"))] use std::path::{Path, PathBuf}; +#[cfg(not(target_arch = "wasm32"))] +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; +#[cfg(not(target_arch = "wasm32"))] +use futures::{lock::Mutex, StreamExt, TryStreamExt}; #[cfg(not(target_arch = "wasm32"))] use hoisted::HoistedLinker; #[cfg(not(target_arch = "wasm32"))] use isolated::IsolatedLinker; +#[cfg(not(target_arch = "wasm32"))] +use oro_common::BuildManifest; +#[cfg(not(target_arch = "wasm32"))] +use oro_script::OroScript; +#[cfg(not(target_arch = "wasm32"))] +use petgraph::stable_graph::NodeIndex; #[cfg(not(target_arch = "wasm32"))] use crate::{ @@ -17,6 +27,11 @@ use crate::{ ScriptStartHandler, }; +#[cfg(not(target_arch = "wasm32"))] +mod hoisted; +#[cfg(not(target_arch = "wasm32"))] +mod isolated; + #[cfg(not(target_arch = "wasm32"))] pub(crate) struct LinkerOptions { pub(crate) concurrency: usize, @@ -44,12 +59,12 @@ pub(crate) enum Linker { impl Linker { #[cfg(not(target_arch = "wasm32"))] pub fn isolated(opts: LinkerOptions) -> Self { - Self::Isolated(IsolatedLinker(opts)) + Self::Isolated(IsolatedLinker::new(opts)) } #[cfg(not(target_arch = "wasm32"))] pub fn hoisted(opts: LinkerOptions) -> Self { - Self::Hoisted(HoistedLinker(opts)) + Self::Hoisted(HoistedLinker::new(opts)) } #[allow(dead_code)] @@ -88,16 +103,251 @@ impl Linker { #[cfg(not(target_arch = "wasm32"))] pub async fn rebuild( &self, - #[allow(dead_code)] graph: &Graph, - #[allow(dead_code)] ignore_scripts: bool, + graph: &Graph, + ignore_scripts: bool, ) -> Result<(), NodeMaintainerError> { - match self { + tracing::debug!("Running lifecycle scripts..."); + let start = std::time::Instant::now(); + if !ignore_scripts { + self.run_scripts(graph, "preinstall").await?; + } + self.link_bins(graph).await?; + if !ignore_scripts { + self.run_scripts(graph, "install").await?; + self.run_scripts(graph, "postinstall").await?; + } + tracing::debug!( + "Ran lifecycle scripts in {}ms.", + start.elapsed().as_millis() + ); + Ok(()) + } + + #[cfg(not(target_arch = "wasm32"))] + async fn link_bins( + &self, + #[allow(dead_code)] graph: &Graph, + ) -> Result { + tracing::debug!("Linking bins..."); + let start = std::time::Instant::now(); + let linked = match self { #[cfg(not(target_arch = "wasm32"))] - Self::Isolated(isolated) => isolated.rebuild(graph, ignore_scripts).await, + Self::Isolated(isolated) => isolated.link_bins(graph).await, #[cfg(not(target_arch = "wasm32"))] - Self::Hoisted(hoisted) => hoisted.rebuild(graph, ignore_scripts).await, - Self::Null => Ok(()), + Self::Hoisted(hoisted) => hoisted.link_bins(graph).await, + Self::Null => Ok(0), + }?; + tracing::debug!( + "Linked {linked} package bins in {}ms.", + start.elapsed().as_millis() + ); + Ok(linked) + } + + #[cfg(not(target_arch = "wasm32"))] + pub async fn run_scripts(&self, graph: &Graph, event: &str) -> Result<(), NodeMaintainerError> { + let (pending_rebuild, opts) = match self { + Self::Isolated(isolated) => (&isolated.pending_rebuild, &isolated.opts), + Self::Hoisted(hoisted) => (&hoisted.pending_rebuild, &hoisted.opts), + Self::Null => return Ok(()), + }; + let pending = pending_rebuild + .lock() + .await + .iter() + .copied() + .collect::>(); + // Map of package to the set of packages that need to run before it can run. + let dependencies = pending + .iter() + .map(|idx| { + let mut deps = HashSet::new(); + for dep in &pending { + if dep != idx + && petgraph::algo::has_path_connecting(&graph.inner, *idx, *dep, None) + { + deps.insert(*dep); + } + } + (*idx, deps) + }) + .collect::>(); + // Map of package to the set of packages that depend on it completing. + let dependents = Arc::new( + pending + .iter() + .map(|idx| { + let mut deps = HashSet::new(); + for dep in &pending { + if dep != idx + && petgraph::algo::has_path_connecting(&graph.inner, *dep, *idx, None) + { + deps.insert(*dep); + } + } + (*idx, deps) + }) + .collect::>(), + ); + let (sender, receiver) = futures::channel::mpsc::unbounded(); + let remaining = Arc::new(Mutex::new(HashMap::new())); + + for (dep, requires) in dependencies.into_iter() { + if requires.is_empty() { + sender.unbounded_send((dep, remaining.clone(), dependents.clone()))?; + } else { + remaining.lock().await.insert(dep, requires); + } + } + + if remaining.lock().await.is_empty() { + sender.close_channel(); } + + let sender_ref = &sender; + + receiver + .map(Ok) + .try_for_each_concurrent( + opts.script_concurrency, + move |(idx, remaining_arc, dependents)| async move { + let ret = self.run_dep_script(graph, idx, event, opts).await; + + let mut remaining = remaining_arc.lock().await; + + if let Some(dependents_set) = dependents.get(&idx) { + for dependent in dependents_set { + if let Some(remaining_deps) = remaining.get_mut(dependent) { + remaining_deps.remove(&idx); + if remaining_deps.is_empty() { + remaining.remove(dependent); + sender_ref.unbounded_send(( + *dependent, + remaining_arc.clone(), + dependents.clone(), + ))?; + } + }; + } + } + + if remaining.is_empty() { + sender_ref.close_channel(); + } + + ret + }, + ) + .await?; + + Ok(()) + } + + #[cfg(not(target_arch = "wasm32"))] + async fn run_dep_script( + &self, + graph: &Graph, + idx: NodeIndex, + event: &str, + opts: &LinkerOptions, + ) -> Result<(), NodeMaintainerError> { + let root = &opts.root; + let (package_dir, workspace_path) = if idx == graph.root { + (root.clone(), root.clone()) + } else { + match self { + Self::Isolated(isolated) => isolated.package_dir(graph, idx), + Self::Hoisted(hoisted) => hoisted.package_dir(graph, idx), + Self::Null => unreachable!("Null linker should not run scripts."), + } + }; + + let is_optional = graph.is_optional(idx); + + let build_mani = + BuildManifest::from_path(package_dir.join("package.json")).map_err(|e| { + NodeMaintainerError::BuildManifestReadError(workspace_path.join("package.json"), e) + })?; + + let name = graph[idx].package.name().to_string(); + if build_mani.scripts.contains_key(event) { + let package_dir = package_dir.clone(); + let root = root.clone(); + let event = event.to_owned(); + let event_clone = event.clone(); + let span = tracing::info_span!("script"); + let _span_enter = span.enter(); + if let Some(on_script_start) = &opts.on_script_start { + on_script_start(&graph[idx].package, &event); + } + std::mem::drop(_span_enter); + let mut script = match async_std::task::spawn_blocking(move || { + OroScript::new(package_dir, event_clone)? + .workspace_path(root) + .spawn() + }) + .await + { + Ok(script) => script, + Err(e) if is_optional => { + let e: NodeMaintainerError = e.into(); + tracing::debug!("Error in optional dependency script: {}", e); + return Ok(()); + } + Err(e) => return Err(e.into()), + }; + let stdout = script.stdout.take(); + let stderr = script.stderr.take(); + let stdout_name = name.clone(); + let stderr_name = name.clone(); + let stdout_on_line = opts.on_script_line.clone(); + let stderr_on_line = opts.on_script_line.clone(); + let stdout_span = span; + let stderr_span = stdout_span.clone(); + let event_clone = event.clone(); + let join = futures::try_join!( + async_std::task::spawn_blocking(move || { + let _enter = stdout_span.enter(); + if let Some(stdout) = stdout { + for line in BufReader::new(stdout).lines() { + let line = line?; + tracing::debug!("stdout::{stdout_name}::{event}: {line}"); + if let Some(on_script_line) = &stdout_on_line { + on_script_line(&line); + } + } + } + Ok::<_, NodeMaintainerError>(()) + }), + async_std::task::spawn_blocking(move || { + let _enter = stderr_span.enter(); + if let Some(stderr) = stderr { + for line in BufReader::new(stderr).lines() { + let line = line?; + tracing::debug!("stderr::{stderr_name}::{event_clone}: {line}"); + if let Some(on_script_line) = &stderr_on_line { + on_script_line(&line); + } + } + } + Ok::<_, NodeMaintainerError>(()) + }), + async_std::task::spawn_blocking(move || { + script.wait()?; + Ok::<_, NodeMaintainerError>(()) + }), + ); + match join { + Ok(_) => {} + Err(e) if is_optional => { + tracing::debug!("Error in optional dependency script: {}", e); + return Ok(()); + } + Err(e) => return Err(e), + } + } + + Ok(()) } }