diff --git a/Cargo.lock b/Cargo.lock index 46da4ff676..249e4cc668 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1223,6 +1223,16 @@ dependencies = [ "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rs_tracing" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rustc-demangle" version = "0.1.14" @@ -1256,6 +1266,7 @@ dependencies = [ "regex 1.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "retry 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rs_tracing 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "same-file 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "scopeguard 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2046,6 +2057,7 @@ dependencies = [ "checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5" "checksum reqwest 0.9.16 (registry+https://github.com/rust-lang/crates.io-index)" = "ddcfd2c13c6af0f9c45a1086be3b9c68af79e4430b42790759e2d34cce2a6c60" "checksum retry 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7681e3e8e753c2803a449cae112100710092ea4cae1aa580c613e26dc845f50b" +"checksum rs_tracing 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "92b10cbf58a21be5d96a9c0336bd06b50f581dcb735f66ff69781a12336ac03b" "checksum rustc-demangle 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "ccc78bfd5acd7bf3e89cffcf899e5cb1a52d6fafa8dec2739ad70c9577a57288" "checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" "checksum ryu 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "b96a9549dc8d48f2c283938303c4b5a77aa29bfbc5b54b084fb1630408899a8f" diff --git a/Cargo.toml b/Cargo.toml index 50940959b3..8aaacf8920 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,10 @@ xz2 = "0.1.3" version = "0.5" default-features = false +[dependencies.rs_tracing] +version = "1.0.1" +features = ["rs_tracing"] + [target."cfg(windows)".dependencies] cc = "1" winreg = "0.6" diff --git a/README.md b/README.md index 07e8664b61..7d663fc503 100644 --- a/README.md +++ b/README.md @@ -591,6 +591,7 @@ Command | Description ## Environment variables + - `RUSTUP_HOME` (default: `~/.rustup` or `%USERPROFILE%/.rustup`) Sets the root rustup folder, used for storing installed toolchains and configuration options. @@ -611,6 +612,21 @@ Command | Description - `RUSTUP_UPDATE_ROOT` (default `https://static.rust-lang.org/rustup`) Sets the root URL for downloading self-updates. +- `RUSTUP_IO_THREADS` *unstable* (defaults to reported cpu count). Sets the + number of threads to perform close IO in. Set to `disabled` to force + single-threaded IO for troubleshooting, or an arbitrary number to + override automatic detection. + +- `RUSTUP_TRACE_DIR` *unstable* (default: no tracing) + Enables tracing and determines the directory that traces will be + written too. Traces are of the form PID.trace. Traces can be read + by the Catapult project [tracing viewer][tv]. + + [tv]: (https://github.com/catapult-project/catapult/blob/master/tracing/README.md) + +- `RUSTUP_UNPACK_RAM` *unstable* (default 400M, min 100M) + Caps the amount of RAM rustup will use for IO tasks while unpacking. + ## Other installation methods The primary installation method, as described at https://rustup.rs, differs by platform: @@ -698,6 +714,9 @@ yet validate signatures of downloads. [s]: https://github.com/rust-lang/rustup.rs/issues?q=is%3Aopen+is%3Aissue+label%3Asecurity +File modes on installation honor umask as of 1.18.4, use umask if +very tight controls are desired. + ## FAQ ### Is this an official Rust project? diff --git a/src/cli/main.rs b/src/cli/main.rs index 47b4f17626..d31dd69689 100644 --- a/src/cli/main.rs +++ b/src/cli/main.rs @@ -30,9 +30,12 @@ mod term2; use crate::errors::*; use rustup::env_var::RUST_RECURSION_COUNT_MAX; + use std::env; use std::path::PathBuf; +use rs_tracing::*; + fn main() { if let Err(ref e) = run_rustup() { common::report_error(e); @@ -41,6 +44,17 @@ fn main() { } fn run_rustup() -> Result<()> { + if let Ok(dir) = env::var("RUSTUP_TRACE_DIR") { + open_trace_file!(dir)?; + } + let result = run_rustup_inner(); + if let Ok(_) = env::var("RUSTUP_TRACE_DIR") { + close_trace_file!(); + } + result +} + +fn run_rustup_inner() -> Result<()> { // Guard against infinite proxy recursion. This mostly happens due to // bugs in rustup. do_recursion_guard()?; diff --git a/src/diskio/immediate.rs b/src/diskio/immediate.rs new file mode 100644 index 0000000000..0b5801f0df --- /dev/null +++ b/src/diskio/immediate.rs @@ -0,0 +1,27 @@ +/// Immediate IO model: performs IO in the current thread. +/// +/// Use for diagnosing bugs or working around any unexpected issues with the +/// threaded code paths. +use super::{perform, Executor, Item}; + +pub struct ImmediateUnpacker {} +impl ImmediateUnpacker { + pub fn new<'a>() -> ImmediateUnpacker { + ImmediateUnpacker {} + } +} + +impl Executor for ImmediateUnpacker { + fn dispatch(&mut self, mut item: Item) -> Box> { + perform(&mut item); + Box::new(Some(item).into_iter()) + } + + fn join(&mut self) -> Box> { + Box::new(None.into_iter()) + } + + fn completed(&mut self) -> Box> { + Box::new(None.into_iter()) + } +} diff --git a/src/diskio/mod.rs b/src/diskio/mod.rs new file mode 100644 index 0000000000..20b3e0b3ed --- /dev/null +++ b/src/diskio/mod.rs @@ -0,0 +1,216 @@ +/// Disk IO abstraction for rustup. +/// +/// This exists to facilitate high performance extraction even though OS's are +/// imperfect beasts. For detailed design notes see the module source. +// +// When performing IO we have a choice: +// - perform some IO in this thread +// - dispatch some or all IO to another thead +// known tradeoffs: +// NFS: network latency incurred on create, chmod, close calls +// WSLv1: Defender latency incurred on close calls; mutex shared with create calls +// Windows: Defender latency incurred on close calls +// Unix: limited open file count +// Defender : CPU limited, so more service points than cores brings no gain. +// Some machines: IO limited, more service points than cores brings more efficient +// Hello world footprint ~350MB, so around 400MB to install is considered ok. +// IO utilisation. +// All systems: dispatching to a thread has some overhead. +// Basic idea then is a locally measured congestion control problem. +// Underlying system has two +// dimensions - how much work we have queued, and how much work we execute +// at once. Queued work is both memory footprint, and unless each executor +// is performing complex logic, potentially concurrent work. +// Single core machines - thread anyway, they probably don't have SSDs? +// How many service points? Blocking latency due to networks and disks +// is independent of CPU: more threads will garner more throughput up +// to actual resource service capapbility. +// so: +// a) measure time around each IO op from dispatch to completion. +// b) create more threads than CPUs - 2x for now (because threadpool +// doesn't allow creating dynamically), with very shallow stacks +// (say 1MB) +// c) keep adding work while the P95? P80? of completion stays the same +// when pNN starts to increase either (i) we've saturated the system +// or (ii) other work coming in has saturated the system or (iii) this +// sort of work is a lot harder to complete. We use NN<100 to avoid +// having jitter throttle us inappropriately. We use a high NN to +// avoid making the system perform poorly for the user / other users +// on shared components. Perhaps time-to-completion should be scaled by size. +// d) if we have a lot of (iii) we should respond to it the same as (i), so +// lets reduce this to (i) and (ii). Being unable to tell the difference +// between load we created and anothers, we have to throttle back when +// the system saturates. Our most throttled position will be one service +// worker: dispatch IO, extract the next text, wait for IO completion, +// repeat. +// e) scaling up and down: TCP's lessons here are pretty good. So exponential +// up - single thread and measure. two, 4 etc. When Pnn goes bad back off +// for a time and then try again with linear increase (it could be case (ii) +// - lots of room to experiment here; working with a time based approach is important +// as that is the only way we can detect saturation: we are not facing +// loss or errors in this model. +// f) data gathering: record (name, bytes, start, duration) +// write to disk afterwards as a csv file? +pub mod immediate; +pub mod threaded; + +use crate::utils::notifications::Notification; + +use std::env; +use std::fs::OpenOptions; +use std::io::{self, Write}; +use std::path::{Path, PathBuf}; + +use time::precise_time_s; + +#[derive(Debug)] +pub enum Kind { + Directory, + File(Vec), +} + +#[derive(Debug)] +pub struct Item { + /// The path to operate on + pub full_path: PathBuf, + /// The operation to perform + pub kind: Kind, + /// When the operation started + pub start: f64, + /// When the operation ended + pub finish: f64, + /// The length of the file, for files (for stats) + pub size: Option, + /// The result of the operation + pub result: io::Result<()>, + /// The mode to apply + pub mode: u32, +} + +impl Item { + pub fn make_dir(full_path: PathBuf, mode: u32) -> Self { + Item { + full_path, + kind: Kind::Directory, + start: 0.0, + finish: 0.0, + size: None, + result: Ok(()), + mode, + } + } + + pub fn write_file(full_path: PathBuf, content: Vec, mode: u32) -> Self { + let len = content.len(); + Item { + full_path, + kind: Kind::File(content), + start: 0.0, + finish: 0.0, + size: Some(len), + result: Ok(()), + mode, + } + } +} + +/// Trait object for performing IO. At this point the overhead +/// of trait invocation is not a bottleneck, but if it becomes +/// one we could consider an enum variant based approach instead. +pub trait Executor { + /// Perform a single operation. + /// During overload situations previously queued items may + /// need to be completed before the item is accepted: + /// consume the returned iterator. + fn execute(&mut self, mut item: Item) -> Box> { + item.start = precise_time_s(); + self.dispatch(item) + } + + /// Actually dispatch a operation. + /// This is called by the default execute() implementation and + /// should not be called directly. + fn dispatch(&mut self, item: Item) -> Box>; + + /// Wrap up any pending operations and iterate over them. + /// All operations submitted before the join will have been + /// returned either through ready/complete or join once join + /// returns. + fn join(&mut self) -> Box>; + + /// Iterate over completed items. + fn completed(&mut self) -> Box>; +} + +/// Trivial single threaded IO to be used from executors. +/// (Crazy sophisticated ones can obviously ignore this) +pub fn perform(item: &mut Item) { + // directories: make them, TODO: register with the dir existence cache. + // Files, write them. + item.result = match item.kind { + Kind::Directory => create_dir(&item.full_path), + Kind::File(ref contents) => write_file(&item.full_path, &contents, item.mode), + }; + item.finish = precise_time_s(); +} + +#[allow(unused_variables)] +pub fn write_file, C: AsRef<[u8]>>( + path: P, + contents: C, + mode: u32, +) -> io::Result<()> { + let mut opts = OpenOptions::new(); + #[cfg(unix)] + { + use std::os::unix::fs::OpenOptionsExt; + opts.mode(mode); + } + let path = path.as_ref(); + let path_display = format!("{}", path.display()); + let mut f = { + trace_scoped!("creat", "name": path_display); + opts.write(true).create(true).truncate(true).open(path)? + }; + let contents = contents.as_ref(); + let len = contents.len(); + { + trace_scoped!("write", "name": path_display, "len": len); + f.write_all(contents)?; + } + { + trace_scoped!("close", "name:": path_display); + drop(f); + } + Ok(()) +} + +pub fn create_dir>(path: P) -> io::Result<()> { + let path = path.as_ref(); + let path_display = format!("{}", path.display()); + trace_scoped!("create_dir", "name": path_display); + std::fs::create_dir(path) +} + +/// Get the executor for disk IO. +pub fn get_executor<'a>( + notify_handler: Option<&'a dyn Fn(Notification<'_>)>, +) -> Box { + // If this gets lots of use, consider exposing via the config file. + if let Ok(thread_str) = env::var("RUSTUP_IO_THREADS") { + if thread_str == "disabled" { + Box::new(immediate::ImmediateUnpacker::new()) + } else { + if let Ok(thread_count) = thread_str.parse::() { + Box::new(threaded::Threaded::new_with_threads( + notify_handler, + thread_count, + )) + } else { + Box::new(threaded::Threaded::new(notify_handler)) + } + } + } else { + Box::new(threaded::Threaded::new(notify_handler)) + } +} diff --git a/src/diskio/threaded.rs b/src/diskio/threaded.rs new file mode 100644 index 0000000000..1feb6f3a6a --- /dev/null +++ b/src/diskio/threaded.rs @@ -0,0 +1,247 @@ +/// Threaded IO model: A pool of threads is used so that syscall latencies +/// due to (nonexhaustive list) Network file systems, virus scanners, and +/// operating system design, do not cause rustup to be significantly slower +/// than desired. In particular the docs workload with 20K files requires +/// very low latency per file, which even a few ms per syscall per file +/// will cause minutes of wall clock time. +use super::{perform, Executor, Item}; +use crate::utils::notifications::Notification; + +use std::cell::Cell; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::Arc; + +enum Task { + Request(Item), + // Used to synchronise in the join method. + Sentinel, +} + +impl Default for Task { + fn default() -> Self { + Task::Sentinel + } +} + +pub struct Threaded<'a> { + n_files: Arc, + pool: threadpool::ThreadPool, + notify_handler: Option<&'a dyn Fn(Notification<'_>)>, + rx: Receiver, + tx: Sender, +} + +impl<'a> Threaded<'a> { + pub fn new(notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Self { + // Defaults to hardware thread count threads; this is suitable for + // our needs as IO bound operations tend to show up as write latencies + // rather than close latencies, so we don't need to look at + // more threads to get more IO dispatched at this stage in the process. + let pool = threadpool::Builder::new() + .thread_name("CloseHandle".into()) + .thread_stack_size(1_048_576) + .build(); + let (tx, rx) = channel(); + Threaded { + n_files: Arc::new(AtomicUsize::new(0)), + pool, + notify_handler, + rx, + tx, + } + } + + pub fn new_with_threads( + notify_handler: Option<&'a dyn Fn(Notification<'_>)>, + thread_count: usize, + ) -> Self { + // Defaults to hardware thread count threads; this is suitable for + // our needs as IO bound operations tend to show up as write latencies + // rather than close latencies, so we don't need to look at + // more threads to get more IO dispatched at this stage in the process. + let pool = threadpool::Builder::new() + .thread_name("CloseHandle".into()) + .num_threads(thread_count) + .thread_stack_size(1_048_576) + .build(); + let (tx, rx) = channel(); + Threaded { + n_files: Arc::new(AtomicUsize::new(0)), + pool, + notify_handler, + rx, + tx, + } + } + + fn submit(&mut self, mut item: Item) { + let tx = self.tx.clone(); + self.n_files.fetch_add(1, Ordering::Relaxed); + let n_files = self.n_files.clone(); + self.pool.execute(move || { + perform(&mut item); + n_files.fetch_sub(1, Ordering::Relaxed); + tx.send(Task::Request(item)) + .expect("receiver should be listening"); + }); + } +} + +impl<'a> Executor for Threaded<'a> { + fn dispatch(&mut self, item: Item) -> Box> { + // Yield any completed work before accepting new work - keep memory + // pressure under control + // - return an iterator that runs until we can submit and then submits + // as its last action + Box::new(SubmitIterator { + executor: self, + item: Cell::new(Task::Request(item)), + }) + } + + fn join(&mut self) -> Box> { + // Some explanation is in order. Even though the tar we are reading from (if + // any) will have had its FileWithProgress download tracking + // completed before we hit drop, that is not true if we are unwinding due to a + // failure, where the logical ownership of the progress bar is + // ambiguous, and as the tracker itself is abstracted out behind + // notifications etc we cannot just query for that. So: we assume no + // more reads of the underlying tar will take place: either the + // error unwinding will stop reads, or we completed; either way, we + // notify finished to the tracker to force a reset to zero; we set + // the units to files, show our progress, and set our units back + // afterwards. The largest archives today - rust docs - have ~20k + // items, and the download tracker's progress is confounded with + // actual handling of data today, we synthesis a data buffer and + // pretend to have bytes to deliver. + self.notify_handler + .map(|handler| handler(Notification::DownloadFinished)); + self.notify_handler + .map(|handler| handler(Notification::DownloadPushUnits("iops"))); + let mut prev_files = self.n_files.load(Ordering::Relaxed); + self.notify_handler.map(|handler| { + handler(Notification::DownloadContentLengthReceived( + prev_files as u64, + )) + }); + if prev_files > 50 { + println!("{} deferred IO operations", prev_files); + } + let buf: Vec = vec![0; prev_files]; + // Cheap wrap-around correctness check - we have 20k files, more than + // 32K means we subtracted from 0 somewhere. + assert!(32767 > prev_files); + let mut current_files = prev_files; + while current_files != 0 { + use std::thread::sleep; + sleep(std::time::Duration::from_millis(100)); + prev_files = current_files; + current_files = self.n_files.load(Ordering::Relaxed); + let step_count = prev_files - current_files; + self.notify_handler + .map(|handler| handler(Notification::DownloadDataReceived(&buf[0..step_count]))); + } + self.pool.join(); + self.notify_handler + .map(|handler| handler(Notification::DownloadFinished)); + self.notify_handler + .map(|handler| handler(Notification::DownloadPopUnits)); + // close the feedback channel so that blocking reads on it can + // complete. send is atomic, and we know the threads completed from the + // pool join, so this is race-free. It is possible that try_iter is safe + // but the documentation is not clear: it says it will not wait, but not + // whether a put done by another thread on a NUMA machine before (say) + // the mutex in the thread pool is entirely synchronised; since this is + // largely hidden from the clients, digging into check whether we can + // make this tidier (e.g. remove the Marker variant) is left for another + // day. I *have* checked that insertion is barried and ordered such that + // sending the marker cannot come in before markers sent from other + // threads we just joined. + self.tx + .send(Task::Sentinel) + .expect("must still be listening"); + Box::new(JoinIterator { + iter: self.rx.iter(), + consume_sentinel: false, + }) + } + + fn completed(&mut self) -> Box> { + Box::new(JoinIterator { + iter: self.rx.try_iter(), + consume_sentinel: true, + }) + } +} + +impl<'a> Drop for Threaded<'a> { + fn drop(&mut self) { + // We are not permitted to fail - consume but do not handle the items. + self.join().for_each(drop); + } +} + +struct JoinIterator> { + iter: T, + consume_sentinel: bool, +} + +impl> Iterator for JoinIterator { + type Item = Item; + + fn next(&mut self) -> Option { + let task_o = self.iter.next(); + match task_o { + None => None, + Some(task) => match task { + Task::Sentinel => { + if self.consume_sentinel { + self.next() + } else { + None + } + } + Task::Request(item) => Some(item), + }, + } + } +} + +struct SubmitIterator<'a, 'b: 'a> { + executor: &'a mut Threaded<'b>, + item: Cell, +} + +impl<'a, 'b> Iterator for SubmitIterator<'a, 'b> { + type Item = Item; + + fn next(&mut self) -> Option { + // The number here is arbitrary; just a number to stop exhausting fd's on linux + // and still allow rapid decompression to generate work to dispatch + // This function could perhaps be tuned: e.g. it may wait in rx.iter() + // unnecessarily blocking if many items complete at once but threads do + // not pick up work quickly for some reason, until another thread + // actually completes; however, results are presently ok. + let threshold = 5; + if self.executor.pool.queued_count() < threshold { + if let Task::Request(item) = self.item.take() { + self.executor.submit(item); + }; + None + } else { + for task in self.executor.rx.iter() { + if let Task::Request(item) = task { + return Some(item); + } + if self.executor.pool.queued_count() < threshold { + if let Task::Request(item) = self.item.take() { + self.executor.submit(item); + }; + return None; + } + } + unreachable!(); + } + } +} diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index eac4e5187d..0d59452193 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -2,19 +2,24 @@ //! for installing from a directory or tarball to an installation //! prefix, represented by a `Components` instance. +use crate::diskio::{get_executor, Executor, Item, Kind}; use crate::dist::component::components::*; use crate::dist::component::transaction::*; - use crate::dist::temp; use crate::errors::*; use crate::utils::notifications::Notification; use crate::utils::utils; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; +use std::env; use std::fmt; -use std::io::Read; +use std::io::{self, ErrorKind as IOErrorKind, Read}; +use std::iter::FromIterator; +use std::mem; use std::path::{Path, PathBuf}; +use tar::EntryType; + /// The current metadata revision used by rust-installer pub const INSTALLER_VERSION: &str = "3"; pub const VERSION_FILE: &str = "rust-installer-version"; @@ -118,8 +123,6 @@ impl Package for DirectoryPackage { } _ => return Err(ErrorKind::CorruptComponent(name.to_owned()).into()), } - - set_file_perms(&target.prefix().path().join(path), &src_path)?; } let tx = builder.finish()?; @@ -132,65 +135,6 @@ impl Package for DirectoryPackage { } } -// On Unix we need to set up the file permissions correctly so -// binaries are executable and directories readable. This shouldn't be -// necessary: the source files *should* have the right permissions, -// but due to rust-lang/rust#25479 they don't. -#[cfg(unix)] -fn set_file_perms(dest_path: &Path, src_path: &Path) -> Result<()> { - use std::fs::{self, Metadata}; - use std::os::unix::fs::PermissionsExt; - use walkdir::WalkDir; - - // Compute whether this entry needs the X bit - fn needs_x(meta: &Metadata) -> bool { - meta.is_dir() || // Directories need it - meta.permissions().mode() & 0o700 == 0o700 // If it is rwx for the user, it gets the X bit - } - - // By convention, anything in the bin/ directory of the package is a binary - let is_bin = if let Some(p) = src_path.parent() { - p.ends_with("bin") - } else { - false - }; - - let is_dir = utils::is_directory(dest_path); - - if is_dir { - // Walk the directory setting everything - for entry in WalkDir::new(dest_path) { - let entry = entry.chain_err(|| ErrorKind::ComponentDirPermissionsFailed)?; - let meta = entry - .metadata() - .chain_err(|| ErrorKind::ComponentDirPermissionsFailed)?; - - let mut perm = meta.permissions(); - perm.set_mode(if needs_x(&meta) { 0o755 } else { 0o644 }); - fs::set_permissions(entry.path(), perm) - .chain_err(|| ErrorKind::ComponentFilePermissionsFailed)?; - } - } else { - let meta = - fs::metadata(dest_path).chain_err(|| ErrorKind::ComponentFilePermissionsFailed)?; - let mut perm = meta.permissions(); - perm.set_mode(if is_bin || needs_x(&meta) { - 0o755 - } else { - 0o644 - }); - fs::set_permissions(dest_path, perm) - .chain_err(|| ErrorKind::ComponentFilePermissionsFailed)?; - } - - Ok(()) -} - -#[cfg(windows)] -fn set_file_perms(_dest_path: &Path, _src_path: &Path) -> Result<()> { - Ok(()) -} - #[derive(Debug)] pub struct TarPackage<'a>(DirectoryPackage, temp::Dir<'a>); @@ -214,109 +158,114 @@ impl<'a> TarPackage<'a> { } } -#[cfg(windows)] -mod unpacker { - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::Arc; - use threadpool; +struct MemoryBudget { + limit: usize, + used: usize, +} - use crate::utils::notifications::Notification; +// Probably this should live in diskio but ¯\_(ツ)_/¯ +impl MemoryBudget { + fn new(max_file_size: usize) -> MemoryBudget { + const DEFAULT_UNPACK_RAM: usize = 400 * 1024 * 1024; + let unpack_ram = if let Ok(budget_str) = env::var("RUSTUP_UNPACK_RAM") { + if let Ok(budget) = budget_str.parse::() { + budget + } else { + DEFAULT_UNPACK_RAM + } + } else { + DEFAULT_UNPACK_RAM + }; + if max_file_size > unpack_ram { + panic!("RUSTUP_UNPACK_RAM must be larger than {}", max_file_size); + } + MemoryBudget { + limit: unpack_ram, + used: 0, + } + } + fn reclaim(&mut self, op: &Item) { + match &op.kind { + Kind::Directory => {} + Kind::File(content) => self.used -= content.len(), + }; + } - pub struct Unpacker<'a> { - n_files: Arc, - pool: threadpool::ThreadPool, - notify_handler: Option<&'a dyn Fn(Notification<'_>)>, + fn claim(&mut self, op: &Item) { + match &op.kind { + Kind::Directory => {} + Kind::File(content) => self.used += content.len(), + }; } - impl<'a> Unpacker<'a> { - pub fn new(notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Self { - // Defaults to hardware thread count threads; this is suitable for - // our needs as IO bound operations tend to show up as write latencies - // rather than close latencies, so we don't need to look at - // more threads to get more IO dispatched at this stage in the process. - let pool = threadpool::Builder::new() - .thread_name("CloseHandle".into()) - .build(); - Unpacker { - n_files: Arc::new(AtomicUsize::new(0)), - pool: pool, - notify_handler: notify_handler, - } - } + fn available(&self) -> usize { + self.limit - self.used + } +} - pub fn handle(&mut self, unpacked: tar::Unpacked) { - if let tar::Unpacked::File(f) = unpacked { - self.n_files.fetch_add(1, Ordering::Relaxed); - let n_files = self.n_files.clone(); - self.pool.execute(move || { - drop(f); - n_files.fetch_sub(1, Ordering::Relaxed); - }); +/// Handle the async result of io operations +/// Replaces op.result with Ok(()) +fn filter_result(op: &mut Item) -> io::Result<()> { + let result = mem::replace(&mut op.result, Ok(())); + match result { + Ok(_) => Ok(()), + Err(e) => match e.kind() { + IOErrorKind::AlreadyExists => { + // mkdir of e.g. ~/.rustup already existing is just fine; + // for others it would be better to know whether it is + // expected to exist or not -so put a flag in the state. + if let Kind::Directory = op.kind { + Ok(()) + } else { + Err(e) + } } - } + _ => Err(e), + }, } +} - impl<'a> Drop for Unpacker<'a> { - fn drop(&mut self) { - // Some explanation is in order. Even though the tar we are reading from (if - // any) will have had its FileWithProgress download tracking - // completed before we hit drop, that is not true if we are unwinding due to a - // failure, where the logical ownership of the progress bar is - // ambiguous, and as the tracker itself is abstracted out behind - // notifications etc we cannot just query for that. So: we assume no - // more reads of the underlying tar will take place: either the - // error unwinding will stop reads, or we completed; either way, we - // notify finished to the tracker to force a reset to zero; we set - // the units to files, show our progress, and set our units back - // afterwards. The largest archives today - rust docs - have ~20k - // items, and the download tracker's progress is confounded with - // actual handling of data today, we synthesis a data buffer and - // pretend to have bytes to deliver. - self.notify_handler - .map(|handler| handler(Notification::DownloadFinished)); - self.notify_handler - .map(|handler| handler(Notification::DownloadPushUnits("handles"))); - let mut prev_files = self.n_files.load(Ordering::Relaxed); - self.notify_handler.map(|handler| { - handler(Notification::DownloadContentLengthReceived( - prev_files as u64, - )) - }); - if prev_files > 50 { - println!("Closing {} deferred file handles", prev_files); - } - let buf: Vec = vec![0; prev_files]; - assert!(32767 > prev_files); - let mut current_files = prev_files; - while current_files != 0 { - use std::thread::sleep; - sleep(std::time::Duration::from_millis(100)); - prev_files = current_files; - current_files = self.n_files.load(Ordering::Relaxed); - let step_count = prev_files - current_files; - self.notify_handler.map(|handler| { - handler(Notification::DownloadDataReceived(&buf[0..step_count])) - }); +/// Dequeue the children of directories queued up waiting for the directory to +/// be created. +/// +/// Currently the volume of queued items does not count as backpressure against +/// the main tar extraction process. +fn trigger_children( + io_executor: &mut dyn Executor, + directories: &mut HashMap, + budget: &mut MemoryBudget, + item: Item, +) -> Result { + let mut result = 0; + if let Kind::Directory = item.kind { + let mut pending = Vec::new(); + directories + .entry(item.full_path) + .and_modify(|status| match status { + DirStatus::Exists => unreachable!(), + DirStatus::Pending(pending_inner) => { + pending.append(pending_inner); + *status = DirStatus::Exists; + } + }) + .or_insert_with(|| unreachable!()); + result += pending.len(); + for pending_item in pending.into_iter() { + for mut item in Vec::from_iter(io_executor.execute(pending_item)) { + // TODO capture metrics + budget.reclaim(&item); + filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; + result += trigger_children(io_executor, directories, budget, item)?; } - self.pool.join(); - self.notify_handler - .map(|handler| handler(Notification::DownloadFinished)); - self.notify_handler - .map(|handler| handler(Notification::DownloadPopUnits)); } - } + }; + Ok(result) } -#[cfg(not(windows))] -mod unpacker { - use crate::utils::notifications::Notification; - pub struct Unpacker {} - impl Unpacker { - pub fn new<'a>(_notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker { - Unpacker {} - } - pub fn handle(&mut self, _unpacked: tar::Unpacked) {} - } +/// What is the status of this directory ? +enum DirStatus { + Exists, + Pending(Vec), } fn unpack_without_first_dir<'a, R: Read>( @@ -324,45 +273,158 @@ fn unpack_without_first_dir<'a, R: Read>( path: &Path, notify_handler: Option<&'a dyn Fn(Notification<'_>)>, ) -> Result<()> { - let mut unpacker = unpacker::Unpacker::new(notify_handler); + let mut io_executor: Box = get_executor(notify_handler); let entries = archive .entries() .chain_err(|| ErrorKind::ExtractingPackage)?; - let mut checked_parents: HashSet = HashSet::new(); + const MAX_FILE_SIZE: u64 = 100_000_000; + let mut budget = MemoryBudget::new(MAX_FILE_SIZE as usize); + + let mut directories: HashMap = HashMap::new(); + // Path is presumed to exist. Call it a precondition. + directories.insert(path.to_owned(), DirStatus::Exists); + + 'entries: for entry in entries { + // drain completed results to keep memory pressure low and respond + // rapidly to completed events even if we couldn't submit work (because + // our unpacked item is pending dequeue) + for mut item in Vec::from_iter(io_executor.completed()) { + // TODO capture metrics + budget.reclaim(&item); + filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; + trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?; + } - for entry in entries { let mut entry = entry.chain_err(|| ErrorKind::ExtractingPackage)?; let relpath = { let path = entry.path(); let path = path.chain_err(|| ErrorKind::ExtractingPackage)?; path.into_owned() }; + // Reject path components that are not normal (.|..|/| etc) + for part in relpath.components() { + match part { + std::path::Component::Normal(_) => {} + _ => return Err(ErrorKind::BadPath(relpath).into()), + } + } let mut components = relpath.components(); - // Throw away the first path component + // Throw away the first path component: our root was supplied. components.next(); let full_path = path.join(&components.as_path()); + if full_path == path { + // The tmp dir code makes the root dir for us. + continue; + } - // Create the full path to the entry if it does not exist already - if let Some(parent) = full_path.parent() { - if !checked_parents.contains(parent) { - checked_parents.insert(parent.to_owned()); - // It would be nice to optimise this stat out, but the tar could be like so: - // a/deep/file.txt - // a/file.txt - // which would require tracking the segments rather than a simple hash. - // Until profile shows that one stat per dir is a problem (vs one stat per file) - // leave till later. - - if !parent.exists() { - std::fs::create_dir_all(&parent).chain_err(|| ErrorKind::ExtractingPackage)? + let size = entry.header().size()?; + if size > MAX_FILE_SIZE { + return Err(format!("File too big {} {}", relpath.display(), size).into()); + } + while size > budget.available() as u64 { + for mut item in Vec::from_iter(io_executor.completed()) { + // TODO capture metrics + budget.reclaim(&item); + filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; + trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?; + } + } + // Bail out if we get hard links, device nodes or any other unusual content + // - it is most likely an attack, as rusts cross-platform nature precludes + // such artifacts + let kind = entry.header().entry_type(); + // https://github.com/rust-lang/rustup.rs/issues/1140 and before that + // https://github.com/rust-lang/rust/issues/25479 + // tl;dr: code got convoluted and we *may* have damaged tarballs out + // there. + // However the mandate we have is very simple: unpack as the current + // user with modes matching the tar contents. No documented tars with + // bad modes are in the bug tracker : the previous permission splatting + // code was inherited from interactions with sudo that are best + // addressed outside of rustup (run with an appropriate effective uid). + // THAT SAID: If regressions turn up immediately post release this code - + // https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=a8549057f0827bf3a068d8917256765a + // is a translation of the prior helper function into an in-iterator + // application. + let tar_mode = entry.header().mode().ok().unwrap(); + // That said, the tarballs that are shipped way back have single-user + // permissions: + // -rwx------ rustbuild/rustbuild ..... release/test-release.sh + // so we should normalise the mode to match the previous behaviour users + // may be expecting where the above file would end up with mode 0o755 + let u_mode = tar_mode & 0o700; + let g_mode = (u_mode & 0o0500) >> 3; + let o_mode = g_mode >> 3; + let mode = u_mode | g_mode | o_mode; + + let mut item = match kind { + EntryType::Directory => { + directories.insert(full_path.to_owned(), DirStatus::Pending(Vec::new())); + Item::make_dir(full_path, mode) + } + EntryType::Regular => { + let mut v = Vec::with_capacity(size as usize); + entry.read_to_end(&mut v)?; + Item::write_file(full_path, v, mode) + } + _ => return Err(ErrorKind::UnsupportedKind(format!("{:?}", kind)).into()), + }; + budget.claim(&item); + + let item = loop { + // Create the full path to the entry if it does not exist already + if let Some(parent) = item.full_path.to_owned().parent() { + match directories.get_mut(parent) { + None => { + // Tar has item before containing directory + // Complain about this so we can see if these exist. + eprintln!( + "Unexpected: missing parent '{}' for '{}'", + parent.display(), + entry.path()?.display() + ); + directories.insert(parent.to_owned(), DirStatus::Pending(vec![item])); + item = Item::make_dir(parent.to_owned(), 0o755); + // Check the parent's parent + continue; + } + Some(DirStatus::Exists) => { + break item; + } + Some(DirStatus::Pending(pending)) => { + // Parent dir is being made, take next item from tar + pending.push(item); + continue 'entries; + } } + } else { + // We should never see a path with no parent. + panic!(); } + }; + + for mut item in Vec::from_iter(io_executor.execute(item)) { + // TODO capture metrics + budget.reclaim(&item); + filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; + trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?; + } + } + + loop { + let mut triggered = 0; + for mut item in Vec::from_iter(io_executor.join()) { + // handle final IOs + // TODO capture metrics + budget.reclaim(&item); + filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; + triggered += trigger_children(&mut *io_executor, &mut directories, &mut budget, item)?; + } + if triggered == 0 { + // None of the IO submitted before the prior join triggered any new + // submissions + break; } - entry.set_preserve_mtime(false); - entry - .unpack(&full_path) - .map(|unpacked| unpacker.handle(unpacked)) - .chain_err(|| ErrorKind::ExtractingPackage)?; } Ok(()) diff --git a/src/errors.rs b/src/errors.rs index dfd09d8d41..3779bb6b3f 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -24,6 +24,7 @@ error_chain! { Temp(temp::Error); Io(io::Error); Open(opener::OpenError); + Thread(std::sync::mpsc::RecvError); } errors { @@ -325,6 +326,14 @@ error_chain! { NoExeName { description("couldn't determine self executable name") } + UnsupportedKind(v: String) { + description("unsupported tar entry") + display("tar entry kind '{}' is not supported", v) + } + BadPath(v: PathBuf) { + description("bad path in tar") + display("tar path '{}' is not supported", v.display()) + } } } diff --git a/src/lib.rs b/src/lib.rs index 74dc045fd3..e2b97d04ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,9 @@ pub use crate::notifications::*; pub use crate::toolchain::*; pub use crate::utils::{notify, toml_utils}; +#[macro_use] +extern crate rs_tracing; + // A list of all binaries which Rustup will proxy. pub static TOOLS: &[&str] = &[ "rustc", @@ -50,6 +53,7 @@ fn component_for_bin(binary: &str) -> Option<&'static str> { pub mod command; mod config; +pub mod diskio; pub mod dist; pub mod env_var; pub mod errors; diff --git a/tests/dist_install.rs b/tests/dist_install.rs index 50deb4aa3d..89b1963880 100644 --- a/tests/dist_install.rs +++ b/tests/dist_install.rs @@ -291,106 +291,6 @@ fn component_bad_version() { } } -// Directories should be 0755, normal files 0644, files that come -// from the bin/ directory 0755. -#[test] -#[cfg(unix)] -fn unix_permissions() { - use std::fs; - use std::os::unix::fs::PermissionsExt; - - let pkgdir = TempDir::new("rustup").unwrap(); - - let mock = MockInstallerBuilder { - components: vec![MockComponentBuilder { - name: "mycomponent".to_string(), - files: vec![ - MockFile::new("bin/foo", b"foo"), - MockFile::new("lib/bar", b"bar"), - MockFile::new("lib/foobar", b"foobar").executable(true), - MockFile::new_dir( - "doc/stuff", - &[ - ("doc1", b"", false), - ("morestuff/doc2", b"", false), - ("morestuff/tool", b"", true), - ], - ), - ], - }], - }; - - mock.build(pkgdir.path()); - - let instdir = TempDir::new("rustup").unwrap(); - let prefix = InstallPrefix::from(instdir.path().to_owned()); - - let tmpdir = TempDir::new("rustup").unwrap(); - let tmpcfg = temp::Cfg::new( - tmpdir.path().to_owned(), - DEFAULT_DIST_SERVER, - Box::new(|_| ()), - ); - let notify = |_: Notification<'_>| (); - let tx = Transaction::new(prefix.clone(), &tmpcfg, ¬ify); - - let components = Components::open(prefix.clone()).unwrap(); - - let pkg = DirectoryPackage::new(pkgdir.path().to_owned(), true).unwrap(); - - let tx = pkg.install(&components, "mycomponent", None, tx).unwrap(); - tx.commit(); - - let m = 0o777 - & fs::metadata(instdir.path().join("bin/foo")) - .unwrap() - .permissions() - .mode(); - assert_eq!(m, 0o755); - let m = 0o777 - & fs::metadata(instdir.path().join("lib/bar")) - .unwrap() - .permissions() - .mode(); - assert_eq!(m, 0o644); - let m = 0o777 - & fs::metadata(instdir.path().join("lib/foobar")) - .unwrap() - .permissions() - .mode(); - assert_eq!(m, 0o755); - let m = 0o777 - & fs::metadata(instdir.path().join("doc/stuff/")) - .unwrap() - .permissions() - .mode(); - assert_eq!(m, 0o755); - let m = 0o777 - & fs::metadata(instdir.path().join("doc/stuff/doc1")) - .unwrap() - .permissions() - .mode(); - assert_eq!(m, 0o644); - let m = 0o777 - & fs::metadata(instdir.path().join("doc/stuff/morestuff")) - .unwrap() - .permissions() - .mode(); - assert_eq!(m, 0o755); - let m = 0o777 - & fs::metadata(instdir.path().join("doc/stuff/morestuff/doc2")) - .unwrap() - .permissions() - .mode(); - assert_eq!(m, 0o644); - let m = 0o777 - & fs::metadata(instdir.path().join("doc/stuff/morestuff/tool")) - .unwrap() - .permissions() - .mode(); - assert_eq!(m, 0o755); -} - // Installing to a prefix that doesn't exist creates it automatically #[test] fn install_to_prefix_that_does_not_exist() {