Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable fully threaded IO for installs #1876

Merged
merged 7 commits into from Jun 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Expand Up @@ -59,6 +59,10 @@ xz2 = "0.1.3"
version = "0.5"
default-features = false

[dependencies.rs_tracing]
version = "1.0.1"
features = ["rs_tracing"]

[target."cfg(windows)".dependencies]
cc = "1"
winreg = "0.6"
Expand Down
19 changes: 19 additions & 0 deletions README.md
Expand Up @@ -591,6 +591,7 @@ Command | Description

## Environment variables


- `RUSTUP_HOME` (default: `~/.rustup` or `%USERPROFILE%/.rustup`)
Sets the root rustup folder, used for storing installed
toolchains and configuration options.
Expand All @@ -611,6 +612,21 @@ Command | Description
- `RUSTUP_UPDATE_ROOT` (default `https://static.rust-lang.org/rustup`)
Sets the root URL for downloading self-updates.

- `RUSTUP_IO_THREADS` *unstable* (defaults to reported cpu count). Sets the
number of threads to perform close IO in. Set to `disabled` to force
single-threaded IO for troubleshooting, or an arbitrary number to
override automatic detection.

- `RUSTUP_TRACE_DIR` *unstable* (default: no tracing)
Enables tracing and determines the directory that traces will be
written too. Traces are of the form PID.trace. Traces can be read
by the Catapult project [tracing viewer][tv].

[tv]: (https://github.com/catapult-project/catapult/blob/master/tracing/README.md)

- `RUSTUP_UNPACK_RAM` *unstable* (default 400M, min 100M)
Caps the amount of RAM rustup will use for IO tasks while unpacking.

## Other installation methods

The primary installation method, as described at https://rustup.rs, differs by platform:
Expand Down Expand Up @@ -698,6 +714,9 @@ yet validate signatures of downloads.

[s]: https://github.com/rust-lang/rustup.rs/issues?q=is%3Aopen+is%3Aissue+label%3Asecurity

File modes on installation honor umask as of 1.18.4, use umask if
very tight controls are desired.

## FAQ

### Is this an official Rust project?
Expand Down
14 changes: 14 additions & 0 deletions src/cli/main.rs
Expand Up @@ -30,9 +30,12 @@ mod term2;

use crate::errors::*;
use rustup::env_var::RUST_RECURSION_COUNT_MAX;

use std::env;
use std::path::PathBuf;

use rs_tracing::*;

fn main() {
if let Err(ref e) = run_rustup() {
common::report_error(e);
Expand All @@ -41,6 +44,17 @@ fn main() {
}

fn run_rustup() -> Result<()> {
if let Ok(dir) = env::var("RUSTUP_TRACE_DIR") {
open_trace_file!(dir)?;
}
let result = run_rustup_inner();
if let Ok(_) = env::var("RUSTUP_TRACE_DIR") {
close_trace_file!();
}
result
}

fn run_rustup_inner() -> Result<()> {
// Guard against infinite proxy recursion. This mostly happens due to
// bugs in rustup.
do_recursion_guard()?;
Expand Down
27 changes: 27 additions & 0 deletions src/diskio/immediate.rs
@@ -0,0 +1,27 @@
/// Immediate IO model: performs IO in the current thread.
///
/// Use for diagnosing bugs or working around any unexpected issues with the
/// threaded code paths.
use super::{perform, Executor, Item};

pub struct ImmediateUnpacker {}
impl ImmediateUnpacker {
pub fn new<'a>() -> ImmediateUnpacker {
ImmediateUnpacker {}
}
}

impl Executor for ImmediateUnpacker {
fn dispatch(&mut self, mut item: Item) -> Box<dyn '_ + Iterator<Item = Item>> {
perform(&mut item);
Box::new(Some(item).into_iter())
}

fn join(&mut self) -> Box<dyn Iterator<Item = Item>> {
Box::new(None.into_iter())
}

fn completed(&mut self) -> Box<dyn Iterator<Item = Item>> {
Box::new(None.into_iter())
}
}
216 changes: 216 additions & 0 deletions src/diskio/mod.rs
@@ -0,0 +1,216 @@
/// Disk IO abstraction for rustup.
///
/// This exists to facilitate high performance extraction even though OS's are
/// imperfect beasts. For detailed design notes see the module source.
//
// When performing IO we have a choice:
// - perform some IO in this thread
// - dispatch some or all IO to another thead
// known tradeoffs:
// NFS: network latency incurred on create, chmod, close calls
// WSLv1: Defender latency incurred on close calls; mutex shared with create calls
// Windows: Defender latency incurred on close calls
// Unix: limited open file count
// Defender : CPU limited, so more service points than cores brings no gain.
// Some machines: IO limited, more service points than cores brings more efficient
// Hello world footprint ~350MB, so around 400MB to install is considered ok.
// IO utilisation.
// All systems: dispatching to a thread has some overhead.
// Basic idea then is a locally measured congestion control problem.
// Underlying system has two
// dimensions - how much work we have queued, and how much work we execute
// at once. Queued work is both memory footprint, and unless each executor
// is performing complex logic, potentially concurrent work.
// Single core machines - thread anyway, they probably don't have SSDs?
// How many service points? Blocking latency due to networks and disks
// is independent of CPU: more threads will garner more throughput up
// to actual resource service capapbility.
// so:
// a) measure time around each IO op from dispatch to completion.
// b) create more threads than CPUs - 2x for now (because threadpool
// doesn't allow creating dynamically), with very shallow stacks
// (say 1MB)
// c) keep adding work while the P95? P80? of completion stays the same
// when pNN starts to increase either (i) we've saturated the system
// or (ii) other work coming in has saturated the system or (iii) this
// sort of work is a lot harder to complete. We use NN<100 to avoid
// having jitter throttle us inappropriately. We use a high NN to
// avoid making the system perform poorly for the user / other users
// on shared components. Perhaps time-to-completion should be scaled by size.
// d) if we have a lot of (iii) we should respond to it the same as (i), so
// lets reduce this to (i) and (ii). Being unable to tell the difference
// between load we created and anothers, we have to throttle back when
// the system saturates. Our most throttled position will be one service
// worker: dispatch IO, extract the next text, wait for IO completion,
// repeat.
// e) scaling up and down: TCP's lessons here are pretty good. So exponential
// up - single thread and measure. two, 4 etc. When Pnn goes bad back off
// for a time and then try again with linear increase (it could be case (ii)
// - lots of room to experiment here; working with a time based approach is important
// as that is the only way we can detect saturation: we are not facing
// loss or errors in this model.
// f) data gathering: record (name, bytes, start, duration)
// write to disk afterwards as a csv file?
pub mod immediate;
pub mod threaded;

use crate::utils::notifications::Notification;

use std::env;
use std::fs::OpenOptions;
use std::io::{self, Write};
use std::path::{Path, PathBuf};

use time::precise_time_s;

#[derive(Debug)]
pub enum Kind {
Directory,
File(Vec<u8>),
}

#[derive(Debug)]
pub struct Item {
/// The path to operate on
pub full_path: PathBuf,
/// The operation to perform
pub kind: Kind,
/// When the operation started
pub start: f64,
/// When the operation ended
pub finish: f64,
/// The length of the file, for files (for stats)
pub size: Option<usize>,
/// The result of the operation
pub result: io::Result<()>,
/// The mode to apply
pub mode: u32,
}

impl Item {
pub fn make_dir(full_path: PathBuf, mode: u32) -> Self {
Item {
full_path,
kind: Kind::Directory,
start: 0.0,
finish: 0.0,
size: None,
result: Ok(()),
mode,
}
}

pub fn write_file(full_path: PathBuf, content: Vec<u8>, mode: u32) -> Self {
let len = content.len();
Item {
full_path,
kind: Kind::File(content),
start: 0.0,
finish: 0.0,
size: Some(len),
result: Ok(()),
mode,
}
}
}

/// Trait object for performing IO. At this point the overhead
/// of trait invocation is not a bottleneck, but if it becomes
/// one we could consider an enum variant based approach instead.
pub trait Executor {
/// Perform a single operation.
/// During overload situations previously queued items may
/// need to be completed before the item is accepted:
/// consume the returned iterator.
fn execute(&mut self, mut item: Item) -> Box<dyn '_ + Iterator<Item = Item>> {
item.start = precise_time_s();
self.dispatch(item)
}

/// Actually dispatch a operation.
/// This is called by the default execute() implementation and
/// should not be called directly.
fn dispatch(&mut self, item: Item) -> Box<dyn '_ + Iterator<Item = Item>>;

/// Wrap up any pending operations and iterate over them.
/// All operations submitted before the join will have been
/// returned either through ready/complete or join once join
/// returns.
fn join(&mut self) -> Box<dyn '_ + Iterator<Item = Item>>;

/// Iterate over completed items.
fn completed(&mut self) -> Box<dyn '_ + Iterator<Item = Item>>;
}

/// Trivial single threaded IO to be used from executors.
/// (Crazy sophisticated ones can obviously ignore this)
pub fn perform(item: &mut Item) {
// directories: make them, TODO: register with the dir existence cache.
// Files, write them.
item.result = match item.kind {
Kind::Directory => create_dir(&item.full_path),
Kind::File(ref contents) => write_file(&item.full_path, &contents, item.mode),
};
item.finish = precise_time_s();
}

#[allow(unused_variables)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than unused variables (which I assume is mode on Windows) could we not check mode against 0644/0755 and return an error if the mode is bad as part of our tar verification plans?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Latest rust can do attributes on parameters. Yay. But no, wrong later. push back here. I can call it _mode instead if you prefer to have it narrow; alex didn't like that in tar-rs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine for it to stay like this for now.

pub fn write_file<P: AsRef<Path>, C: AsRef<[u8]>>(
path: P,
contents: C,
mode: u32,
) -> io::Result<()> {
let mut opts = OpenOptions::new();
#[cfg(unix)]
{
use std::os::unix::fs::OpenOptionsExt;
opts.mode(mode);
}
let path = path.as_ref();
let path_display = format!("{}", path.display());
let mut f = {
trace_scoped!("creat", "name": path_display);
opts.write(true).create(true).truncate(true).open(path)?
};
let contents = contents.as_ref();
let len = contents.len();
{
trace_scoped!("write", "name": path_display, "len": len);
f.write_all(contents)?;
}
{
trace_scoped!("close", "name:": path_display);
drop(f);
}
Ok(())
}

pub fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref();
let path_display = format!("{}", path.display());
trace_scoped!("create_dir", "name": path_display);
std::fs::create_dir(path)
}

/// Get the executor for disk IO.
pub fn get_executor<'a>(
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
) -> Box<dyn Executor + 'a> {
// If this gets lots of use, consider exposing via the config file.
if let Ok(thread_str) = env::var("RUSTUP_IO_THREADS") {
if thread_str == "disabled" {
Box::new(immediate::ImmediateUnpacker::new())
} else {
if let Ok(thread_count) = thread_str.parse::<usize>() {
Box::new(threaded::Threaded::new_with_threads(
notify_handler,
thread_count,
))
} else {
Box::new(threaded::Threaded::new(notify_handler))
}
}
} else {
Box::new(threaded::Threaded::new(notify_handler))
}
}