Skip to content

Commit

Permalink
Enable threaded closing on all platforms.
Browse files Browse the repository at this point in the history
Set RUSTUP_CLOSE_THREADS=disabled to force single threaded IO, or
to a specific number if desired for testing/tuning.

This may improve rust-lang#1867, but has no impact on rust-lang#1866 due to the coarse
lock around the fd-handle table inside WSL.
  • Loading branch information
rbtcollins committed May 29, 2019
1 parent a540515 commit 5281b53
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 8 deletions.
5 changes: 5 additions & 0 deletions README.md
Expand Up @@ -611,6 +611,11 @@ Command | Description
- `RUSTUP_UPDATE_ROOT` (default `https://static.rust-lang.org/rustup`)
Sets the root URL for downloading self-updates.

- `RUSTUP_CLOSE_THREADS` (defaults to reported cpu count). Sets the
number of threads to perform close IO in. Set to `disabled` to force
single-threaded IO for troubleshooting, or an arbitrary number to
override automatic detection.

## Other installation methods

The primary installation method, as described at https://rustup.rs, differs by platform:
Expand Down
57 changes: 49 additions & 8 deletions src/dist/component/package.rs
Expand Up @@ -11,6 +11,7 @@ use crate::utils::notifications::Notification;
use crate::utils::utils;

use std::collections::HashSet;
use std::env;
use std::fmt;
use std::io::Read;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -214,8 +215,11 @@ impl<'a> TarPackage<'a> {
}
}

#[cfg(windows)]
mod unpacker {
trait Unpacker {
fn handle(&mut self, unpacked: tar::Unpacked);
}

mod threadedunpacker {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use threadpool;
Expand All @@ -239,12 +243,33 @@ mod unpacker {
.build();
Unpacker {
n_files: Arc::new(AtomicUsize::new(0)),
pool: pool,
notify_handler: notify_handler,
pool,
notify_handler,
}
}

pub fn handle(&mut self, unpacked: tar::Unpacked) {
pub fn new_with_threads(
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
thread_count: usize,
) -> Self {
// Defaults to hardware thread count threads; this is suitable for
// our needs as IO bound operations tend to show up as write latencies
// rather than close latencies, so we don't need to look at
// more threads to get more IO dispatched at this stage in the process.
let pool = threadpool::Builder::new()
.thread_name("CloseHandle".into())
.num_threads(thread_count)
.build();
Unpacker {
n_files: Arc::new(AtomicUsize::new(0)),
pool,
notify_handler,
}
}
}

impl<'a> super::Unpacker for Unpacker<'a> {
fn handle(&mut self, unpacked: tar::Unpacked) {
if let tar::Unpacked::File(f) = unpacked {
self.n_files.fetch_add(1, Ordering::Relaxed);
let n_files = self.n_files.clone();
Expand Down Expand Up @@ -307,15 +332,16 @@ mod unpacker {
}
}

#[cfg(not(windows))]
mod unpacker {
use crate::utils::notifications::Notification;
pub struct Unpacker {}
impl Unpacker {
pub fn new<'a>(_notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker {
Unpacker {}
}
pub fn handle(&mut self, _unpacked: tar::Unpacked) {}
}
impl super::Unpacker for Unpacker {
fn handle(&mut self, _unpacked: tar::Unpacked) {}
}
}

Expand All @@ -324,7 +350,22 @@ fn unpack_without_first_dir<'a, R: Read>(
path: &Path,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Result<()> {
let mut unpacker = unpacker::Unpacker::new(notify_handler);
let mut unpacker : Box<dyn Unpacker> =
// If this gets lots of use, consider exposing via the config file.
if let Ok(thread_str) = env::var("RUSTUP_CLOSE_THREADS") {
if thread_str == "disabled" {
Box::new(unpacker::Unpacker::new(notify_handler))
} else {
if let Ok(thread_count) = thread_str.parse::<usize>() {
Box::new(threadedunpacker::Unpacker::new_with_threads(notify_handler, thread_count))
} else {
Box::new(threadedunpacker::Unpacker::new(notify_handler))
}
}
} else {
Box::new(threadedunpacker::Unpacker::new(notify_handler))
}
;
let entries = archive
.entries()
.chain_err(|| ErrorKind::ExtractingPackage)?;
Expand Down

0 comments on commit 5281b53

Please sign in to comment.