From 9fa59ae37b06899b9ea7b92bdc062c4c81f09c7e Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Wed, 5 Jun 2019 18:19:49 +1200 Subject: [PATCH] WIPWIP --- src/diskio/immediate.rs | 4 +++- src/diskio/mod.rs | 32 +++++++++++++++++++++++++++++--- src/diskio/threaded.rs | 8 +++++--- src/dist/component/package.rs | 3 ++- 4 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/diskio/immediate.rs b/src/diskio/immediate.rs index ed53763988..318789f84a 100644 --- a/src/diskio/immediate.rs +++ b/src/diskio/immediate.rs @@ -21,5 +21,7 @@ impl Executor for ImmediateUnpacker { true } - fn join(&mut self) {} + fn join(&mut self, marker: mut Item) -> Option { + marker + } } diff --git a/src/diskio/mod.rs b/src/diskio/mod.rs index b1197f2434..92b9a0e66c 100644 --- a/src/diskio/mod.rs +++ b/src/diskio/mod.rs @@ -54,10 +54,12 @@ pub mod immediate; pub mod threaded; +use std::env; use std::fs::OpenOptions; use std::io::{self, Write}; use std::path::{Path, PathBuf}; +use lazy_static::lazy_static; use time::precise_time_s; #[derive(Debug)] @@ -129,9 +131,11 @@ pub trait Executor { /// always be ready for work if they have no in-progress work. fn ready(&mut self) -> bool; - // Wrap up any pending operations and close the transmit channel - // so that rx.iter() can be used (and thus a race-free termination). - fn join(&mut self); + // Wrap up any pending operations and send marker back when done + // (or return it imnmediately). + // This permits blocking rx.iter() calls to avoid races with slow + // IO. + fn join(&mut self, mut marker: Item) -> Option; } /// Trivial single threaded IO to be used from executors. @@ -183,3 +187,25 @@ pub fn create_dir>(path: P) -> io::Result<()> { trace_scoped!("create_dir", "name": path_display); std::fs::create_dir(path) } + +/// Get the executor for disk IO. +pub fn get_executor() -> &'static dyn Executor { + lazy_static! { + static ref EXECUTOR: Box = + // If this gets lots of use, consider exposing via the config file. + if let Ok(thread_str) = env::var("RUSTUP_IO_THREADS") { + if thread_str == "disabled" { + Box::new(immediate::ImmediateUnpacker::new()) + } else { + if let Ok(thread_count) = thread_str.parse::() { + Box::new(threaded::Threaded::new_with_threads(tx, notify_handler, thread_count)) + } else { + Box::new(threaded::Threaded::new(tx, notify_handler)) + } + } + } else { + Box::new(threaded::Threaded::new(tx, notify_handler)) + }; + } + &EXECUTOR +} diff --git a/src/diskio/threaded.rs b/src/diskio/threaded.rs index 59e5c246b3..d94e7c7653 100644 --- a/src/diskio/threaded.rs +++ b/src/diskio/threaded.rs @@ -79,7 +79,8 @@ impl<'a> Executor for Threaded<'a> { self.pool.queued_count() < 5 } - fn join(&mut self) { + + fn join(&mut self, marker: mut Item) { // Some explanation is in order. Even though the tar we are reading from (if // any) will have had its FileWithProgress download tracking // completed before we hit drop, that is not true if we are unwinding due to a @@ -125,8 +126,9 @@ impl<'a> Executor for Threaded<'a> { self.notify_handler .map(|handler| handler(Notification::DownloadPopUnits)); // close the feedback channel so that blocking reads on it can - // complete. - self.tx = None; + // complete. send is atomic, and we know the threads completed from the + // pool join, so this is race-free. + self.tx.send(marker); } } diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index 33b36d13e2..1da604299e 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -340,7 +340,8 @@ fn unpack_without_first_dir<'a, R: Read>( filter_result(prev_item).chain_err(|| ErrorKind::ExtractingPackage)?; } } - io_executor.join(); + let marker = Item {} + io_executor.join(marker); // And drain final results for item in rx.try_iter() { // TODO capture metrics, add directories to created cache