diff --git a/src/cli/rustup_mode.rs b/src/cli/rustup_mode.rs index eb6026e733..fe9666e345 100644 --- a/src/cli/rustup_mode.rs +++ b/src/cli/rustup_mode.rs @@ -1012,7 +1012,7 @@ async fn update( info!("cleaning up downloads & tmp directories"); utils::delete_dir_contents_following_links(&cfg.download_dir); - cfg.tmp_cx.clean(); + dl_cfg.tmp_cx.clean(); } Ok(exit_code) diff --git a/src/config.rs b/src/config.rs index bbba13a075..3beb36a208 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,7 +12,7 @@ use crate::{ cli::{common, self_update::SelfUpdateMode}, dist::{ self, AutoInstallMode, DistOptions, PartialToolchainDesc, Profile, TargetTriple, - ToolchainDesc, temp, + ToolchainDesc, }, errors::RustupError, fallback_settings::FallbackSettings, @@ -239,9 +239,9 @@ pub(crate) struct Cfg<'a> { pub toolchains_dir: PathBuf, update_hash_dir: PathBuf, pub download_dir: PathBuf, - pub tmp_cx: temp::Context, pub toolchain_override: Option, env_override: Option, + pub(crate) dist_root_server: String, pub dist_root_url: String, pub quiet: bool, pub current_dir: PathBuf, @@ -300,8 +300,7 @@ impl<'a> Cfg<'a> { }; let dist_root_server = dist_root_server(process)?; - let tmp_cx = temp::Context::new(rustup_dir.join("tmp"), dist_root_server.as_str()); - let dist_root = dist_root_server + "/dist"; + let dist_root = dist_root_server.clone() + "/dist"; let cfg = Self { profile_override: None, @@ -311,9 +310,9 @@ impl<'a> Cfg<'a> { toolchains_dir, update_hash_dir, download_dir, - tmp_cx, toolchain_override: None, env_override, + dist_root_server, dist_root_url: dist_root, quiet, current_dir, @@ -954,9 +953,9 @@ impl Debug for Cfg<'_> { toolchains_dir, update_hash_dir, download_dir, - tmp_cx, toolchain_override, env_override, + dist_root_server, dist_root_url, quiet, current_dir, @@ -971,9 +970,9 @@ impl Debug for Cfg<'_> { .field("toolchains_dir", toolchains_dir) .field("update_hash_dir", update_hash_dir) .field("download_dir", download_dir) - .field("tmp_cx", tmp_cx) .field("toolchain_override", toolchain_override) .field("env_override", env_override) + .field("dist_root_server", dist_root_server) .field("dist_root_url", dist_root_url) .field("quiet", quiet) .field("current_dir", current_dir) diff --git a/src/diskio/mod.rs b/src/diskio/mod.rs index bfb09ebb44..549101e0dc 100644 --- a/src/diskio/mod.rs +++ b/src/diskio/mod.rs @@ -290,7 +290,7 @@ impl IncrementalFileState { /// Trait object for performing IO. At this point the overhead /// of trait invocation is not a bottleneck, but if it becomes /// one we could consider an enum variant based approach instead. -pub(crate) trait Executor { +pub(crate) trait Executor: Send { /// Perform a single operation. /// During overload situations previously queued items may /// need to be completed before the item is accepted: diff --git a/src/dist/component/components.rs b/src/dist/component/components.rs index 5c2231c61a..a4240c591a 100644 --- a/src/dist/component/components.rs +++ b/src/dist/component/components.rs @@ -54,7 +54,7 @@ impl Components { Ok(None) } } - fn write_version(&self, tx: &mut Transaction<'_>) -> Result<()> { + fn write_version(&self, tx: &mut Transaction) -> Result<()> { tx.modify_file(self.prefix.rel_manifest_file(VERSION_FILE))?; utils::write_file( VERSION_FILE, @@ -78,7 +78,7 @@ impl Components { }) .collect()) } - pub(crate) fn add<'a>(&self, name: &str, tx: Transaction<'a>) -> ComponentBuilder<'a> { + pub(crate) fn add(&self, name: &str, tx: Transaction) -> ComponentBuilder { ComponentBuilder { components: self.clone(), name: name.to_owned(), @@ -95,14 +95,14 @@ impl Components { } } -pub(crate) struct ComponentBuilder<'a> { +pub(crate) struct ComponentBuilder { components: Components, name: String, parts: Vec, - tx: Transaction<'a>, + tx: Transaction, } -impl<'a> ComponentBuilder<'a> { +impl ComponentBuilder { pub(crate) fn copy_file(&mut self, path: PathBuf, src: &Path) -> Result<()> { self.parts.push(ComponentPart { kind: ComponentPartKind::File, @@ -131,7 +131,7 @@ impl<'a> ComponentBuilder<'a> { }); self.tx.move_dir(&self.name, path, src) } - pub(crate) fn finish(mut self) -> Result> { + pub(crate) fn finish(mut self) -> Result { // Write component manifest let path = self.components.rel_component_manifest(&self.name); let abs_path = self.components.prefix.abs_path(&path); @@ -254,7 +254,7 @@ impl Component { } Ok(result) } - pub fn uninstall<'a>(&self, mut tx: Transaction<'a>) -> Result> { + pub fn uninstall(&self, mut tx: Transaction) -> Result { // Update components file let path = self.components.rel_components_file(); let abs_path = self.components.prefix.abs_path(&path); diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index 0cc5855fe6..dc545d4b62 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -96,13 +96,13 @@ impl> DirectoryPackage

{ } } - pub fn install<'a>( + pub fn install( &self, target: &Components, name: &str, short_name: Option<&str>, - tx: Transaction<'a>, - ) -> Result> { + tx: Transaction, + ) -> Result { let actual_name = if self.components.contains(name) { name } else if let Some(n) = short_name { diff --git a/src/dist/component/transaction.rs b/src/dist/component/transaction.rs index be23d6dbab..386a2ea82b 100644 --- a/src/dist/component/transaction.rs +++ b/src/dist/component/transaction.rs @@ -11,6 +11,7 @@ use std::fs::File; use std::path::{Path, PathBuf}; +use std::sync::Arc; use anyhow::{Context, Result, anyhow}; use tracing::{error, info}; @@ -33,16 +34,20 @@ use crate::utils; /// /// All operations that create files will fail if the destination /// already exists. -pub struct Transaction<'a> { +pub struct Transaction { prefix: InstallPrefix, changes: Vec, - tmp_cx: &'a temp::Context, + tmp_cx: Arc, committed: bool, pub(super) permit_copy_rename: bool, } -impl<'a> Transaction<'a> { - pub fn new(prefix: InstallPrefix, tmp_cx: &'a temp::Context, permit_copy_rename: bool) -> Self { +impl Transaction { + pub fn new( + prefix: InstallPrefix, + tmp_cx: Arc, + permit_copy_rename: bool, + ) -> Self { Transaction { prefix, changes: Vec::new(), @@ -189,14 +194,14 @@ impl<'a> Transaction<'a> { Ok(()) } - pub(crate) fn temp(&self) -> &'a temp::Context { - self.tmp_cx + pub(crate) fn temp(&self) -> &temp::Context { + &self.tmp_cx } } /// If a Transaction is dropped without being committed, the changes /// are automatically rolled back. -impl Drop for Transaction<'_> { +impl Drop for Transaction { fn drop(&mut self) { if !self.committed { info!("rolling back changes"); diff --git a/src/dist/download.rs b/src/dist/download.rs index 6342d4cdd3..9373964ea9 100644 --- a/src/dist/download.rs +++ b/src/dist/download.rs @@ -3,7 +3,7 @@ use std::fs; use std::io::Read; use std::ops; use std::path::{Path, PathBuf}; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use anyhow::{Context, Result, anyhow}; @@ -23,7 +23,7 @@ use crate::utils; const UPDATE_HASH_LEN: usize = 20; pub struct DownloadCfg<'a> { - pub tmp_cx: &'a temp::Context, + pub tmp_cx: Arc, pub download_dir: &'a PathBuf, pub(super) tracker: DownloadTracker, pub(super) permit_copy_rename: bool, @@ -34,7 +34,10 @@ impl<'a> DownloadCfg<'a> { /// construct a download configuration pub(crate) fn new(cfg: &'a Cfg<'a>) -> Self { DownloadCfg { - tmp_cx: &cfg.tmp_cx, + tmp_cx: Arc::new(temp::Context::new( + cfg.rustup_dir.join("tmp"), + cfg.dist_root_server.as_str(), + )), download_dir: &cfg.download_dir, tracker: DownloadTracker::new(!cfg.quiet, cfg.process), permit_copy_rename: cfg.process.permit_copy_rename(), diff --git a/src/dist/manifestation.rs b/src/dist/manifestation.rs index 0aea681419..ff65834c39 100644 --- a/src/dist/manifestation.rs +++ b/src/dist/manifestation.rs @@ -4,19 +4,25 @@ #[cfg(test)] mod tests; +use std::collections::VecDeque; use std::path::Path; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll, ready}; +use std::vec; -use anyhow::{Context, Result, anyhow, bail}; +use anyhow::{Context as _, Result, anyhow, bail}; +use futures_util::Stream; use futures_util::stream::{FuturesUnordered, StreamExt}; +use tokio::task::{JoinHandle, spawn_blocking}; use tracing::{info, warn}; -use crate::diskio::{IO_CHUNK_SIZE, get_executor, unpack_ram}; +use crate::diskio::{Executor, IO_CHUNK_SIZE, get_executor, unpack_ram}; use crate::dist::component::{Components, DirectoryPackage, Transaction}; use crate::dist::config::Config; use crate::dist::download::{DownloadCfg, DownloadStatus, File}; use crate::dist::manifest::{Component, CompressionKind, HashedBinary, Manifest}; use crate::dist::prefix::InstallPrefix; -#[cfg(test)] use crate::dist::temp; use crate::dist::{DEFAULT_DIST_SERVER, Profile, TargetTriple}; use crate::errors::RustupError; @@ -101,7 +107,7 @@ impl Manifestation { /// It is *not* safe to run two updates concurrently. See /// https://github.com/rust-lang/rustup/issues/988 for the details. pub async fn update( - &self, + self, new_manifest: Manifest, changes: Changes, force_update: bool, @@ -110,14 +116,13 @@ impl Manifestation { implicit_modify: bool, ) -> Result { // Some vars we're going to need a few times - let tmp_cx = download_cfg.tmp_cx; let prefix = self.installation.prefix(); let rel_installed_manifest_path = prefix.rel_manifest_file(DIST_MANIFEST); let installed_manifest_path = prefix.path().join(&rel_installed_manifest_path); // Create the lists of components needed for installation let config = self.read_config()?; - let mut update = Update::new(self, &new_manifest, &changes, &config)?; + let mut update = Update::new(&self, &new_manifest, &changes, &config)?; if update.nothing_changes() { return Ok(UpdateStatus::Unchanged); @@ -170,7 +175,11 @@ impl Manifestation { .unwrap_or(DEFAULT_MAX_RETRIES); // Begin transaction - let mut tx = Transaction::new(prefix.clone(), tmp_cx, download_cfg.permit_copy_rename); + let mut tx = Transaction::new( + prefix.clone(), + download_cfg.tmp_cx.clone(), + download_cfg.permit_copy_rename, + ); // If the previous installation was from a v1 manifest we need // to uninstall it first. @@ -208,33 +217,44 @@ impl Manifestation { } info!("downloading component(s)"); - let mut downloads = FuturesUnordered::new(); - let mut component_iter = components.into_iter(); - let mut cleanup_downloads = vec![]; - loop { - if downloads.is_empty() && component_iter.len() == 0 { - break; - } + let mut tx = if !components.is_empty() { + let mut stream = InstallEvents::new(components.into_iter(), Arc::new(self)); + let mut transaction = Some(tx); + let tx = loop { + // Refill downloads when there's capacity + // Must live outside of `InstallEvents` because we can't write the type of future + while stream.components.len() > 0 && stream.downloads.len() < concurrent_downloads { + if let Some(bin) = stream.components.next() { + stream.downloads.push(bin.download(max_retries)); + } + } - let installable = downloads.next().await.transpose()?; - while component_iter.len() > 0 && downloads.len() < concurrent_downloads { - if let Some(bin) = component_iter.next() { - downloads.push(bin.download(max_retries)); + // Trigger another installation if no other installation is in progress, as evidenced + // by whether `transaction` is `Some` (not held by another installation task). + stream.try_install(&mut transaction); + match stream.next().await { + // Completed an installation, yielding the transaction back + Some(Ok(tx)) => match stream.is_done() { + true => break tx, + false => transaction = Some(tx), + }, + Some(Err(e)) => return Err(e), + // A download completed, so we can trigger another one + None => {} } - } + }; - if let Some((bin, downloaded)) = installable { - cleanup_downloads.push(&bin.binary.hash); - tx = bin.install(downloaded, tx, self)?; - } - } + download_cfg.clean(&stream.cleanup_downloads)?; + drop(stream); + tx + } else { + tx + }; // Install new distribution manifest let new_manifest_str = new_manifest.clone().stringify()?; tx.modify_file(rel_installed_manifest_path)?; utils::write_file("manifest", &installed_manifest_path, &new_manifest_str)?; - download_cfg.clean(&cleanup_downloads)?; - drop(downloads); // Write configuration. // @@ -262,7 +282,7 @@ impl Manifestation { pub(crate) fn uninstall( &self, manifest: &Manifest, - tmp_cx: &temp::Context, + tmp_cx: Arc, permit_copy_rename: bool, ) -> Result<()> { let prefix = self.installation.prefix(); @@ -287,12 +307,12 @@ impl Manifestation { Ok(()) } - fn uninstall_component<'a>( + fn uninstall_component( &self, component: Component, manifest: &Manifest, - mut tx: Transaction<'a>, - ) -> Result> { + mut tx: Transaction, + ) -> Result { // For historical reasons, the rust-installer component // names are not the same as the dist manifest component // names. Some are just the component name some are the @@ -390,7 +410,7 @@ impl Manifestation { info!("installing component rust"); // Begin transaction - let mut tx = Transaction::new(prefix, dl_cfg.tmp_cx, dl_cfg.permit_copy_rename); + let mut tx = Transaction::new(prefix, dl_cfg.tmp_cx.clone(), dl_cfg.permit_copy_rename); // Uninstall components let components = self.installation.list()?; @@ -421,11 +441,11 @@ impl Manifestation { // doesn't have a configuration or manifest-derived list of // component/target pairs. Uninstall it using the installer's // component list before upgrading. - fn maybe_handle_v2_upgrade<'a>( + fn maybe_handle_v2_upgrade( &self, config: &Option, - mut tx: Transaction<'a>, - ) -> Result> { + mut tx: Transaction, + ) -> Result { let installed_components = self.installation.list()?; let looks_like_v1 = config.is_none() && !installed_components.is_empty(); @@ -441,6 +461,89 @@ impl Manifestation { } } +struct InstallEvents<'a, F> { + manifestation: Arc, + components: vec::IntoIter>, + cleanup_downloads: Vec<&'a str>, + install_queue: VecDeque, + installing: Option>>, + downloads: FuturesUnordered, +} + +impl<'a, F> InstallEvents<'a, F> { + fn new( + components: vec::IntoIter>, + manifestation: Arc, + ) -> Self { + Self { + manifestation, + cleanup_downloads: Vec::with_capacity(components.len()), + components, + install_queue: VecDeque::new(), + installing: None, + downloads: FuturesUnordered::new(), + } + } + + fn try_install(&mut self, tx: &mut Option) { + let Some(installable) = self.install_queue.pop_front() else { + return; + }; + + if let Some(tx) = tx.take() { + let manifestation = self.manifestation.clone(); + self.installing = Some(spawn_blocking(|| installable.install(tx, manifestation))); + } else { + self.install_queue.push_front(installable); + } + } + + fn is_done(&self) -> bool { + self.components.len() == 0 && self.downloads.is_empty() && self.install_queue.is_empty() + } +} + +impl<'a, F: Future>> Stream for InstallEvents<'a, F> { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut(); + + // First, see if any of the downloads is complete; if so, yield `None` + // to the caller so it can trigger another download. + match Pin::new(&mut this.downloads).poll_next(cx) { + Poll::Ready(Some(Ok((installable, hash)))) => { + this.cleanup_downloads.push(hash); + this.install_queue.push_back(installable); + return Poll::Ready(None); + } + Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), + Poll::Ready(None) | Poll::Pending => {} + } + + let Some(handle) = &mut this.installing else { + return match self.install_queue.is_empty() { + // Nothing to do, yield control to the runtime + true => Poll::Pending, + // Can try to start the next installation + false => Poll::Ready(None), + }; + }; + + match ready!(Pin::new(handle).poll(cx)) { + Ok(Ok(tx)) => { + // Current `handle` must not be polled again + this.installing = None; + Poll::Ready(Some(Ok(tx))) + } + Ok(Err(e)) => Poll::Ready(Some(Err(e))), + Err(e) => Poll::Ready(Some(Err(anyhow!( + "internal error during installation: {e}" + )))), + } + } +} + #[derive(Debug, Default)] struct Update { components_to_uninstall: Vec, @@ -641,11 +744,11 @@ impl<'a> ComponentBinary<'a> { })) } - async fn download(self, max_retries: usize) -> Result<(Self, File)> { + async fn download(self, max_retries: usize) -> Result<(ComponentInstall, &'a str)> { use tokio_retry::{RetryIf, strategy::FixedInterval}; let url = self.download_cfg.url(&self.binary.url)?; - let downloaded_file = RetryIf::spawn( + let installer = RetryIf::spawn( FixedInterval::from_millis(0).take(max_retries), || { self.download_cfg @@ -668,36 +771,53 @@ impl<'a> ComponentBinary<'a> { RustupError::ComponentDownloadFailed(self.manifest.name(&self.component)) })?; - Ok((self, downloaded_file)) + let install = ComponentInstall { + status: self.status, + compression: self.binary.compression, + installer, + short_name: self.manifest.short_name(&self.component).to_owned(), + component: self.component, + temp_dir: self.download_cfg.tmp_cx.new_directory()?, + io_executor: get_executor( + unpack_ram(IO_CHUNK_SIZE, self.download_cfg.process.unpack_ram()?), + self.download_cfg.process.io_thread_count()?, + ), + }; + + Ok((install, &self.binary.hash)) } +} - fn install<'t>( - self, - installer_file: File, - tx: Transaction<'t>, - manifestation: &Manifestation, - ) -> Result> { +struct ComponentInstall { + component: Component, + status: DownloadStatus, + compression: CompressionKind, + installer: File, + short_name: String, + temp_dir: temp::Dir, + io_executor: Box, +} + +impl ComponentInstall { + fn install(self, tx: Transaction, manifestation: Arc) -> Result { // For historical reasons, the rust-installer component // names are not the same as the dist manifest component // names. Some are just the component name some are the // component name plus the target triple. let pkg_name = self.component.name_in_manifest(); let short_pkg_name = self.component.short_name_in_manifest(); - let short_name = self.manifest.short_name(&self.component); - - let temp_dir = self.download_cfg.tmp_cx.new_directory()?; - let io_executor = get_executor( - unpack_ram(IO_CHUNK_SIZE, self.download_cfg.process.unpack_ram()?), - self.download_cfg.process.io_thread_count()?, - ); - let reader = self.status.unpack(utils::buffered(&installer_file)?); - let package = - DirectoryPackage::compressed(reader, self.binary.compression, temp_dir, io_executor)?; + let reader = self.status.unpack(utils::buffered(&self.installer)?); + let package = DirectoryPackage::compressed( + reader, + self.compression, + self.temp_dir, + self.io_executor, + )?; // If the package doesn't contain the component that the // manifest says it does then somebody must be playing a joke on us. if !package.contains(&pkg_name, Some(short_pkg_name)) { - return Err(RustupError::CorruptComponent(short_name.to_owned()).into()); + return Err(RustupError::CorruptComponent(self.short_name).into()); } self.status.installing(); diff --git a/src/dist/manifestation/tests.rs b/src/dist/manifestation/tests.rs index 7d902881ff..e595295dd6 100644 --- a/src/dist/manifestation/tests.rs +++ b/src/dist/manifestation/tests.rs @@ -410,7 +410,7 @@ struct TestContext { prefix: InstallPrefix, download_dir: PathBuf, tp: TestProcess, - tmp_cx: temp::Context, + tmp_cx: Arc, _tempdirs: Vec, } @@ -450,10 +450,7 @@ impl TestContext { ); let prefix_tempdir = tempfile::Builder::new().prefix("rustup").tempdir().unwrap(); - let work_tempdir = tempfile::Builder::new().prefix("rustup").tempdir().unwrap(); - let tmp_cx = temp::Context::new(work_tempdir.path().to_owned(), DEFAULT_DIST_SERVER); - let toolchain = ToolchainDesc::from_str("nightly-x86_64-apple-darwin").unwrap(); let prefix = InstallPrefix::from(prefix_tempdir.path()); let tp = TestProcess::new(env::current_dir().unwrap(), &["rustup"], env, ""); @@ -464,7 +461,10 @@ impl TestContext { download_dir: prefix.path().join("downloads"), prefix, tp, - tmp_cx, + tmp_cx: Arc::new(temp::Context::new( + work_tempdir.path().to_owned(), + DEFAULT_DIST_SERVER, + )), _tempdirs: vec![prefix_tempdir, work_tempdir], } } @@ -480,7 +480,7 @@ impl TestContext { force: bool, ) -> Result { let dl_cfg = DownloadCfg { - tmp_cx: &self.tmp_cx, + tmp_cx: self.tmp_cx.clone(), download_dir: &self.download_dir, tracker: DownloadTracker::new(false, &self.tp.process), permit_copy_rename: self.tp.process.permit_copy_rename(), @@ -527,7 +527,7 @@ impl TestContext { manifestation.uninstall( &manifest, - &self.tmp_cx, + self.tmp_cx.clone(), self.tp.process.permit_copy_rename(), )?; diff --git a/src/test/dist.rs b/src/test/dist.rs index e6b13ad431..d24462918a 100644 --- a/src/test/dist.rs +++ b/src/test/dist.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::fs::{self, File}; use std::io::{self, Read, Write}; use std::path::{Path, PathBuf}; -use std::sync::{LazyLock, Mutex}; +use std::sync::{Arc, LazyLock, Mutex}; use url::Url; @@ -29,7 +29,7 @@ pub struct DistContext { pub inst_dir: tempfile::TempDir, pub prefix: InstallPrefix, _tmp_dir: tempfile::TempDir, - pub cx: temp::Context, + pub cx: Arc, pub tp: TestProcess, } @@ -48,23 +48,26 @@ impl DistContext { pkg_dir, inst_dir, prefix, - cx: temp::Context::new(tmp_dir.path().to_owned(), DEFAULT_DIST_SERVER), + cx: Arc::new(temp::Context::new( + tmp_dir.path().to_owned(), + DEFAULT_DIST_SERVER, + )), tp: TestProcess::default(), _tmp_dir: tmp_dir, }) } - pub fn start(&self) -> anyhow::Result<(Transaction<'_>, Components, DirectoryPackage<&Path>)> { + pub fn start(&self) -> anyhow::Result<(Transaction, Components, DirectoryPackage<&Path>)> { let tx = self.transaction(); let components = Components::open(self.prefix.clone())?; let pkg = DirectoryPackage::new(self.pkg_dir.path(), true)?; Ok((tx, components, pkg)) } - pub fn transaction(&self) -> Transaction<'_> { + pub fn transaction(&self) -> Transaction { Transaction::new( self.prefix.clone(), - &self.cx, + self.cx.clone(), self.tp.process.permit_copy_rename(), ) } diff --git a/tests/suite/dist_install.rs b/tests/suite/dist_install.rs index e22a3691d4..892f6c7cd7 100644 --- a/tests/suite/dist_install.rs +++ b/tests/suite/dist_install.rs @@ -169,7 +169,7 @@ fn uninstall() { // Now uninstall let mut tx = Transaction::new( cx.prefix.clone(), - &cx.cx, + cx.cx.clone(), cx.tp.process.permit_copy_rename(), ); for component in components.list().unwrap() {