From eebded17861d9dca4bff262c955430765d32e3e9 Mon Sep 17 00:00:00 2001 From: Zhaofeng Li Date: Tue, 7 Dec 2021 23:13:31 -0800 Subject: [PATCH] Build each node individually Now nodes that take a long time to build won't bottleneck the deployment of other nodes in the same chunk. Fixes #47. --- src/job.rs | 24 +------- src/nix/deployment/limits.rs | 4 -- src/nix/deployment/mod.rs | 115 ++++++++++++++++------------------- src/nix/hive/eval.nix | 20 ++---- src/nix/hive/mod.rs | 32 ++++++---- src/nix/mod.rs | 5 +- src/nix/profile.rs | 91 ++++++++------------------- src/nix/store.rs | 22 +++++-- src/progress/mod.rs | 9 --- src/util.rs | 11 +++- 10 files changed, 136 insertions(+), 197 deletions(-) diff --git a/src/job.rs b/src/job.rs index 66da643..6dcfbac 100644 --- a/src/job.rs +++ b/src/job.rs @@ -13,7 +13,7 @@ use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::time; use uuid::Uuid; -use crate::nix::{NixResult, NixError, NodeName, ProfileMap}; +use crate::nix::{NixResult, NixError, NodeName}; use crate::progress::{Sender as ProgressSender, Message as ProgressMessage, Line, LineStyle}; pub type Sender = UnboundedSender; @@ -208,9 +208,6 @@ pub enum EventPayload { /// The job wants to transition to a new state. NewState(JobState), - /// The job built a set of system profiles. - ProfilesBuilt(ProfileMap), - /// The child process printed a line to stdout. ChildStdout(String), @@ -348,19 +345,6 @@ impl JobMonitor { self.print_job_stats(); } } - EventPayload::ProfilesBuilt(profiles) => { - if let Some(sender) = &self.progress { - for (name, profile) in profiles.iter() { - let text = format!("Built {:?}", profile.as_path()); - let line = Line::new(message.job_id, text) - .label(name.as_str().to_string()) - .one_off() - .style(LineStyle::Success); - let pm = self.get_print_message(message.job_id, line); - sender.send(pm).unwrap(); - } - } - } EventPayload::ChildStdout(m) | EventPayload::ChildStderr(m) | EventPayload::Message(m) => { if let Some(sender) = &self.progress { let metadata = &self.jobs[&message.job_id]; @@ -598,11 +582,6 @@ impl JobHandleInner { self.send_payload(EventPayload::Failure(error.to_string())) } - /// Sends a set of built profiles. - pub fn profiles_built(&self, profiles: ProfileMap) -> NixResult<()> { - self.send_payload(EventPayload::ProfilesBuilt(profiles)) - } - /// Runs a closure, automatically updating the job monitor based on the result. async fn run_internal(self: Arc, f: U, report_running: bool) -> NixResult where U: FnOnce(Arc) -> F, @@ -788,7 +767,6 @@ impl Display for EventPayload { EventPayload::Noop(m) => write!(f, " noop) {}", m)?, EventPayload::Failure(e) => write!(f, " failure) {}", e)?, EventPayload::ShutdownMonitor => write!(f, "shutdown)")?, - EventPayload::ProfilesBuilt(pm) => write!(f, " built) {:?}", pm)?, } Ok(()) diff --git a/src/nix/deployment/limits.rs b/src/nix/deployment/limits.rs index b099271..d68d2fa 100644 --- a/src/nix/deployment/limits.rs +++ b/src/nix/deployment/limits.rs @@ -14,9 +14,6 @@ pub struct ParallelismLimit { /// Limit of concurrent evaluation processes. pub evaluation: Semaphore, - /// Limit of concurrent build processes. - pub build: Semaphore, - /// Limit of concurrent apply processes. pub apply: Semaphore, } @@ -25,7 +22,6 @@ impl Default for ParallelismLimit { fn default() -> Self { Self { evaluation: Semaphore::new(1), - build: Semaphore::new(2), apply: Semaphore::new(10), } } diff --git a/src/nix/deployment/mod.rs b/src/nix/deployment/mod.rs index 3ac6505..ec9abfa 100644 --- a/src/nix/deployment/mod.rs +++ b/src/nix/deployment/mod.rs @@ -29,8 +29,7 @@ use super::{ NixError, NixResult, Profile, - ProfileMap, - StoreDerivation, + ProfileDerivation, CopyDirection, key::{Key, UploadAt as UploadKeyAt}, }; @@ -54,6 +53,9 @@ pub struct Deployment { /// Deployment options. options: Options, + /// Options passed to Nix invocations. + nix_options: Vec, + /// Handle to send messages to the ProgressOutput. progress: Option, @@ -102,12 +104,13 @@ impl Deployment { Self { hive, goal, + options: Options::default(), + nix_options: Vec::new(), progress, nodes: targets.keys().cloned().collect(), targets, parallelism_limit: ParallelismLimit::default(), evaluation_node_limit: EvaluationNodeLimit::default(), - options: Options::default(), executed: false, } } @@ -129,6 +132,9 @@ impl Deployment { monitor.set_label_width(width); } + let nix_options = self.hive.nix_options().await?; + self.nix_options = nix_options; + if self.goal == Goal::UploadKeys { // Just upload keys let targets = mem::take(&mut self.targets); @@ -218,45 +224,24 @@ impl Deployment { } let nodes: Vec = chunk.keys().cloned().collect(); - let profiles = self.clone().build_nodes(parent.clone(), nodes.clone()).await?; - - if self.goal == Goal::Build { - return Ok(()); - } + let profile_drvs = self.clone().evaluate_nodes(parent.clone(), nodes.clone()).await?; let mut futures = Vec::new(); - for (name, profile) in profiles.iter() { + for (name, profile_drv) in profile_drvs.iter() { let target = chunk.remove(name).unwrap(); - futures.push(self.clone().deploy_node(parent.clone(), target, profile.clone())); + futures.push(self.clone().deploy_node(parent.clone(), target, profile_drv.clone())); } join_all(futures).await .into_iter().collect::>>()?; - // Create GC root - if self.options.create_gc_roots { - let job = parent.create_job(JobType::CreateGcRoots, nodes.clone())?; - let arc_self = self.clone(); - job.run_waiting(|job| async move { - if let Some(dir) = arc_self.hive.context_dir() { - job.state(JobState::Running)?; - let base = dir.join(".gcroots"); - - profiles.create_gc_roots(&base).await?; - } else { - job.noop("No context directory to create GC roots in".to_string())?; - } - Ok(()) - }).await?; - } - Ok(()) } - /// Evaluates a set of nodes, returning a store derivation. + /// Evaluates a set of nodes, returning their corresponding store derivations. async fn evaluate_nodes(self: DeploymentHandle, parent: JobHandle, nodes: Vec) - -> NixResult> + -> NixResult> { let job = parent.create_job(JobType::Evaluate, nodes.clone())?; @@ -272,33 +257,6 @@ impl Deployment { }).await } - /// Builds a set of nodes, returning a set of profiles. - async fn build_nodes(self: DeploymentHandle, parent: JobHandle, nodes: Vec) - -> NixResult - { - let job = parent.create_job(JobType::Build, nodes.clone())?; - - job.run_waiting(|job| async move { - let derivation = self.clone().evaluate_nodes(job.clone(), nodes.clone()).await?; - - // Wait for build limit - let permit = self.parallelism_limit.apply.acquire().await.unwrap(); - job.state(JobState::Running)?; - - // FIXME: Remote builder? - let nix_options = self.hive.nix_options().await.unwrap(); - let mut builder = host::local(nix_options); - builder.set_job(Some(job.clone())); - - let map = derivation.realize(&mut *builder).await?; - - job.profiles_built(map.clone())?; - - drop(permit); - Ok(map) - }).await - } - /// Only uploads keys to a node. async fn upload_keys_to_node(self: DeploymentHandle, parent: JobHandle, mut target: TargetNode) -> NixResult<()> { let nodes = vec![target.name.clone()]; @@ -315,20 +273,36 @@ impl Deployment { }).await } - /// Pushes and optionally activates a system profile on a given node. + /// Builds, pushes, and optionally activates a system profile on a node. /// /// This will also upload keys to the node. - async fn deploy_node(self: DeploymentHandle, parent: JobHandle, mut target: TargetNode, profile: Profile) + async fn deploy_node(self: DeploymentHandle, parent: JobHandle, mut target: TargetNode, profile_drv: ProfileDerivation) -> NixResult<()> { - if self.goal == Goal::Build { - unreachable!(); - } - let nodes = vec![target.name.clone()]; + let target_name = target.name.clone(); let permit = self.parallelism_limit.apply.acquire().await.unwrap(); + // Build system profile + let build_job = parent.create_job(JobType::Build, nodes.clone())?; + let arc_self = self.clone(); + let profile: Profile = build_job.run(|job| async move { + // FIXME: Remote builder? + let mut builder = host::local(arc_self.nix_options.clone()); + builder.set_job(Some(job.clone())); + + let profile = profile_drv.realize(&mut *builder).await?; + + job.success_with_message(format!("Built {:?}", profile.as_path()))?; + Ok(profile) + }).await?; + + if self.goal == Goal::Build { + return Ok(()); + } + + // Push closure to remote let push_job = parent.create_job(JobType::Push, nodes.clone())?; let push_profile = profile.clone(); let arc_self = self.clone(); @@ -437,6 +411,23 @@ impl Deployment { }).await?; } + // Create GC root + if self.options.create_gc_roots { + let job = parent.create_job(JobType::CreateGcRoots, nodes.clone())?; + let arc_self = self.clone(); + job.run_waiting(|job| async move { + if let Some(dir) = arc_self.hive.context_dir() { + job.state(JobState::Running)?; + let path = dir.join(".gcroots").join(format!("node-{}", &*target_name)); + + profile.create_gc_root(&path).await?; + } else { + job.noop("No context directory to create GC roots in".to_string())?; + } + Ok(()) + }).await?; + } + drop(permit); Ok(()) diff --git a/src/nix/hive/eval.nix b/src/nix/hive/eval.nix index fdf04ad..8660bbf 100644 --- a/src/nix/hive/eval.nix +++ b/src/nix/hive/eval.nix @@ -460,19 +460,11 @@ let deploymentConfigJsonSelected = names: toJSON (listToAttrs (map (name: { inherit name; value = nodes.${name}.config.deployment; }) names)); - buildAll = buildSelected nodeNames; - buildSelected = names: let - # Change in the order of the names should not cause a derivation to be created - selected = lib.attrsets.filterAttrs (name: _: elem name names) toplevel; - in derivation rec { - name = "colmena-${hive.meta.name}"; - system = currentSystem; - json = toJSON (lib.attrsets.mapAttrs (k: v: toString v) selected); - builder = pkgs.writeScript "${name}.sh" '' - #!/bin/sh - echo "$json" > $out - ''; - }; + evalAll = evalSelected nodeNames; + evalSelected = names: let + selected = lib.filterAttrs (name: _: elem name names) toplevel; + drvs = lib.mapAttrs (k: v: v.drvPath) selected; + in drvs; introspect = function: function { inherit pkgs lib nodes; @@ -481,7 +473,7 @@ in { inherit nodes toplevel deploymentConfigJson deploymentConfigJsonSelected - buildAll buildSelected introspect; + evalAll evalSelected introspect; meta = hive.meta; diff --git a/src/nix/hive/mod.rs b/src/nix/hive/mod.rs index 3afb9e5..3c89f5c 100644 --- a/src/nix/hive/mod.rs +++ b/src/nix/hive/mod.rs @@ -11,12 +11,12 @@ use validator::Validate; use super::{ Flake, - StoreDerivation, NixResult, NodeName, NodeConfig, NodeFilter, - ProfileMap, + ProfileDerivation, + StorePath, }; use super::deployment::TargetNode; use super::NixCommand; @@ -250,20 +250,24 @@ impl Hive { /// Evaluation may take up a lot of memory, so we make it possible /// to split up the evaluation process into chunks and run them /// concurrently with other processes (e.g., build and apply). - pub async fn eval_selected(&self, nodes: &[NodeName], job: Option) -> NixResult> { + pub async fn eval_selected(&self, nodes: &[NodeName], job: Option) -> NixResult> { let nodes_expr = SerializedNixExpresssion::new(nodes)?; - let expr = format!("hive.buildSelected {}", nodes_expr.expression()); + let expr = format!("hive.evalSelected {}", nodes_expr.expression()); - let command = self.nix_instantiate(&expr).instantiate_with_builders().await?; + let command = self.nix_instantiate(&expr) + .eval_with_builders().await?; let mut execution = CommandExecution::new(command); execution.set_job(job); - - let path = execution.capture_store_path().await?; - let drv = path.into_derivation() - .expect("The result should be a store derivation"); - - Ok(drv) + execution.set_hide_stdout(true); + + execution + .capture_json::>().await? + .into_iter().map(|(name, path)| { + let path = path.into_derivation()?; + Ok((name, path)) + }) + .collect() } /// Evaluates an expression using values from the configuration @@ -374,8 +378,10 @@ impl<'hive> NixInstantiate<'hive> { fn eval(self) -> Command { let mut command = self.instantiate(); - command.arg("--eval").arg("--json") - .arg("--read-write-mode"); // For cases involving IFD + command.arg("--eval").arg("--json").arg("--strict") + // Ensures the derivations are instantiated + // Required for system profile evaluation and IFD + .arg("--read-write-mode"); command } diff --git a/src/nix/mod.rs b/src/nix/mod.rs index 767f18f..6bb7fc7 100644 --- a/src/nix/mod.rs +++ b/src/nix/mod.rs @@ -30,7 +30,7 @@ pub mod key; pub use key::Key; pub mod profile; -pub use profile::{Profile, ProfileMap}; +pub use profile::{Profile, ProfileDerivation}; pub mod deployment; pub use deployment::Goal; @@ -78,6 +78,9 @@ pub enum NixError { #[snafu(display("Failed to upload keys: {}", error))] KeyError { error: key::KeyError }, + #[snafu(display("Store path {:?} is not a derivation", store_path))] + NotADerivation { store_path: StorePath }, + #[snafu(display("Invalid NixOS system profile"))] InvalidProfile, diff --git a/src/nix/profile.rs b/src/nix/profile.rs index 0abfb2b..4d25de9 100644 --- a/src/nix/profile.rs +++ b/src/nix/profile.rs @@ -1,7 +1,4 @@ -use std::collections::HashMap; use std::convert::TryFrom; -use std::fs; -use std::ops::{Deref, DerefMut}; use std::path::Path; use std::process::Stdio; @@ -11,10 +8,12 @@ use super::{ Goal, NixResult, NixError, - NodeName, StorePath, + StoreDerivation, }; +pub type ProfileDerivation = StoreDerivation; + /// A NixOS system profile. #[derive(Clone, Debug)] pub struct Profile(StorePath); @@ -61,80 +60,40 @@ impl Profile { pub fn as_path(&self) -> &Path { self.0.as_path() } -} - -/// A map of names to their associated NixOS system profiles. -#[derive(Debug, Clone)] -pub struct ProfileMap(HashMap); -impl Deref for ProfileMap { - type Target = HashMap; + /// Create a GC root for this profile. + pub async fn create_gc_root(&self, path: &Path) -> NixResult<()> { + let mut command = Command::new("nix-store"); + command.args(&["--no-build-output", "--indirect", "--add-root", path.to_str().unwrap()]); + command.args(&["--realise", self.as_path().to_str().unwrap()]); + command.stdout(Stdio::null()); - fn deref(&self) -> &Self::Target { - &self.0 - } -} + let status = command.status().await?; + if !status.success() { + return Err(status.into()); + } -impl DerefMut for ProfileMap { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + Ok(()) } } -impl TryFrom> for ProfileMap { +impl TryFrom> for Profile { type Error = NixError; fn try_from(paths: Vec) -> NixResult { - match paths.len() { - 0 => Err(NixError::BadOutput { - output: String::from("Build produced no outputs"), - }), - l if l > 1 => Err(NixError::BadOutput { - output: String::from("Build produced multiple outputs"), - }), - _ => { - // We expect a JSON file containing a - // HashMap - - let path = paths[0].as_path(); - let json: String = fs::read_to_string(path)?; - let mut raw_map: HashMap = serde_json::from_str(&json).map_err(|_| NixError::BadOutput { - output: String::from("The returned profile map is invalid"), - })?; - - let mut checked_map = HashMap::new(); - for (node, profile) in raw_map.drain() { - let profile = Profile::from_store_path(profile)?; - checked_map.insert(node, profile); - } - - Ok(Self(checked_map)) - } + if paths.is_empty() { + return Err(NixError::BadOutput { + output: String::from("There is no store path"), + }); } - } -} -impl ProfileMap { - /// Create GC roots for all profiles in the map. - /// - /// The created links will be located at `{base}/node-{node_name}`. - pub async fn create_gc_roots(&self, base: &Path) -> NixResult<()> { - // This will actually try to build all profiles, but since they - // already exist only the GC roots will be created. - for (node, profile) in self.0.iter() { - let path = base.join(format!("node-{}", node.as_str())); - - let mut command = Command::new("nix-store"); - command.args(&["--no-build-output", "--indirect", "--add-root", path.to_str().unwrap()]); - command.args(&["--realise", profile.as_path().to_str().unwrap()]); - command.stdout(Stdio::null()); - - let status = command.status().await?; - if !status.success() { - return Err(status.into()); - } + if paths.len() > 1 { + return Err(NixError::BadOutput { + output: String::from("Build resulted in more than 1 store path"), + }); } - Ok(()) + let path = paths.into_iter().next().unwrap(); + Self::from_store_path(path) } } diff --git a/src/nix/store.rs b/src/nix/store.rs index eea62f0..e1675a3 100644 --- a/src/nix/store.rs +++ b/src/nix/store.rs @@ -5,8 +5,9 @@ use std::ops::Deref; use std::fmt; use serde::{Serialize, Deserialize}; +use tokio::process::Command; -use super::{Host, NixResult, NixError}; +use super::{Host, NixCommand, NixResult, NixError}; /// A Nix store path. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -27,12 +28,24 @@ impl StorePath { } } + /// Returns the immediate dependencies of the store path. + pub async fn references(&self) -> NixResult> { + let references = Command::new("nix-store") + .args(&["--query", "--references"]) + .arg(&self.0) + .capture_output().await? + .trim_end().split('\n') + .map(|p| StorePath(PathBuf::from(p))).collect(); + + Ok(references) + } + /// Converts the store path into a store derivation. - pub fn into_derivation>>(self) -> Option> { + pub fn into_derivation>>(self) -> NixResult> { if self.is_derivation() { - Some(StoreDerivation::::from_store_path_unchecked(self)) + Ok(StoreDerivation::::from_store_path_unchecked(self)) } else { - None + Err(NixError::NotADerivation { store_path: self }) } } } @@ -64,6 +77,7 @@ impl From for PathBuf { } /// A store derivation (.drv) that will result in a T when built. +#[derive(Debug, Clone)] pub struct StoreDerivation>>{ path: StorePath, _target: PhantomData, diff --git a/src/progress/mod.rs b/src/progress/mod.rs index 74f0ae9..e6da509 100644 --- a/src/progress/mod.rs +++ b/src/progress/mod.rs @@ -135,15 +135,6 @@ impl Line { } } - /// Builder-like interface to set the line as an one-off output. - /// - /// For SpinnerOutput, this will create a new bar that immediately - /// finishes with the style (success or failure). - pub fn one_off(mut self) -> Self { - self.one_off = true; - self - } - /// Builder-like interface to set the line as noisy. pub fn noisy(mut self) -> Self { self.noisy = true; diff --git a/src/util.rs b/src/util.rs index cff8d45..2c54dc2 100644 --- a/src/util.rs +++ b/src/util.rs @@ -14,6 +14,7 @@ use super::job::JobHandle; pub struct CommandExecution { command: Command, job: Option, + hide_stdout: bool, stdout: Option, stderr: Option, } @@ -23,6 +24,7 @@ impl CommandExecution { Self { command, job: None, + hide_stdout: false, stdout: None, stderr: None, } @@ -33,6 +35,11 @@ impl CommandExecution { self.job = job; } + /// Sets whether to hide stdout. + pub fn set_hide_stdout(&mut self, hide_stdout: bool) { + self.hide_stdout = hide_stdout; + } + /// Returns logs from the last invocation. pub fn get_logs(&self) -> (Option<&String>, Option<&String>) { (self.stdout.as_ref(), self.stderr.as_ref()) @@ -52,8 +59,10 @@ impl CommandExecution { let stdout = BufReader::new(child.stdout.take().unwrap()); let stderr = BufReader::new(child.stderr.take().unwrap()); + let stdout_job = if self.hide_stdout { None } else { self.job.clone() }; + let futures = join3( - capture_stream(stdout, self.job.clone(), false), + capture_stream(stdout, stdout_job, false), capture_stream(stderr, self.job.clone(), true), child.wait(), );