From 833d8e7854499a02ba8ed02241fbffbcb56020c2 Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Sun, 26 May 2019 21:36:43 +1200 Subject: [PATCH] Enable threaded closing on all platforms. Set RUSTUP_CLOSE_THREADS=disabled to force single threaded IO, or to a specific number if desired for testing/tuning. This may improve #1867, but has no impact on #1866 due to the coarse lock around the fd-handle table inside WSL. --- README.md | 5 +++ src/dist/component/package.rs | 57 ++++++++++++++++++++++++++++++----- 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 07e8664b61..1aeb7d8916 100644 --- a/README.md +++ b/README.md @@ -611,6 +611,11 @@ Command | Description - `RUSTUP_UPDATE_ROOT` (default `https://static.rust-lang.org/rustup`) Sets the root URL for downloading self-updates. +- `RUSTUP_CLOSE_THREADS` (defaults to reported cpu count). Sets the + number of threads to perform close IO in. Set to `disabled` to force + single-threaded IO for troubleshooting, or an arbitrary number to + override automatic detection. + ## Other installation methods The primary installation method, as described at https://rustup.rs, differs by platform: diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index eac4e5187d..be5e562d56 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -11,6 +11,7 @@ use crate::utils::notifications::Notification; use crate::utils::utils; use std::collections::HashSet; +use std::env; use std::fmt; use std::io::Read; use std::path::{Path, PathBuf}; @@ -214,8 +215,11 @@ impl<'a> TarPackage<'a> { } } -#[cfg(windows)] -mod unpacker { +trait Unpacker { + fn handle(&mut self, unpacked: tar::Unpacked); +} + +mod threadedunpacker { use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use threadpool; @@ -239,12 +243,33 @@ mod unpacker { .build(); Unpacker { n_files: Arc::new(AtomicUsize::new(0)), - pool: pool, - notify_handler: notify_handler, + pool, + notify_handler, } } - pub fn handle(&mut self, unpacked: tar::Unpacked) { + pub fn new_with_threads( + notify_handler: Option<&'a dyn Fn(Notification<'_>)>, + thread_count: usize, + ) -> Self { + // Defaults to hardware thread count threads; this is suitable for + // our needs as IO bound operations tend to show up as write latencies + // rather than close latencies, so we don't need to look at + // more threads to get more IO dispatched at this stage in the process. + let pool = threadpool::Builder::new() + .thread_name("CloseHandle".into()) + .num_threads(thread_count) + .build(); + Unpacker { + n_files: Arc::new(AtomicUsize::new(0)), + pool, + notify_handler, + } + } + } + + impl<'a> super::Unpacker for Unpacker<'a> { + fn handle(&mut self, unpacked: tar::Unpacked) { if let tar::Unpacked::File(f) = unpacked { self.n_files.fetch_add(1, Ordering::Relaxed); let n_files = self.n_files.clone(); @@ -307,7 +332,6 @@ mod unpacker { } } -#[cfg(not(windows))] mod unpacker { use crate::utils::notifications::Notification; pub struct Unpacker {} @@ -315,7 +339,9 @@ mod unpacker { pub fn new<'a>(_notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker { Unpacker {} } - pub fn handle(&mut self, _unpacked: tar::Unpacked) {} + } + impl super::Unpacker for Unpacker { + fn handle(&mut self, _unpacked: tar::Unpacked) {} } } @@ -324,7 +350,22 @@ fn unpack_without_first_dir<'a, R: Read>( path: &Path, notify_handler: Option<&'a dyn Fn(Notification<'_>)>, ) -> Result<()> { - let mut unpacker = unpacker::Unpacker::new(notify_handler); + let mut unpacker : Box = + // If this gets lots of use, consider exposing via the config file. + if let Ok(thread_str) = env::var("RUSTUP_CLOSE_THREADS") { + if thread_str == "disabled" { + Box::new(unpacker::Unpacker::new(notify_handler)) + } else { + if let Ok(thread_count) = thread_str.parse::() { + Box::new(threadedunpacker::Unpacker::new_with_threads(notify_handler, thread_count)) + } else { + Box::new(threadedunpacker::Unpacker::new(notify_handler)) + } + } + } else { + Box::new(threadedunpacker::Unpacker::new(notify_handler)) + } + ; let entries = archive .entries() .chain_err(|| ErrorKind::ExtractingPackage)?;