From a6e91769378d407f0bf580bb489f3d80dd7890bb Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Mon, 15 Mar 2021 06:32:29 +0100 Subject: [PATCH] Stream large files during unpacking Fixes #2632, #2145, #2564 Files over 16M are now written incrementally chunks rather than buffered in memory in one full linear buffer. This chunk size is not configurable. For threaded unpacking, the entire memory buffer will be used to buffer chunks and a single worker thread will dispatch IO operations from the buffer, so minimal performance impact should be anticipated (file size/16M round trips at worst, and most network file systems will latency hide linear writes). For immediate unpacking, each chunk is dispatched directly to disk, which may impact performance as less latency hiding is possible - but for immediate unpacking clarity of behaviour is the priority. --- src/diskio/immediate.rs | 203 +++++++++++++++++++++++++++++-- src/diskio/mod.rs | 184 ++++++++++++++++++++++++++-- src/diskio/test.rs | 79 +++++++++++++ src/diskio/threaded.rs | 44 ++++--- src/dist/component/package.rs | 217 ++++++++++++++++++++++------------ src/errors.rs | 4 + 6 files changed, 617 insertions(+), 114 deletions(-) create mode 100644 src/diskio/test.rs diff --git a/src/diskio/immediate.rs b/src/diskio/immediate.rs index 8c154fc3ff..a83d0162d6 100644 --- a/src/diskio/immediate.rs +++ b/src/diskio/immediate.rs @@ -2,27 +2,208 @@ /// /// Use for diagnosing bugs or working around any unexpected issues with the /// threaded code paths. -use super::{perform, Executor, Item}; +use std::{ + fmt::Debug, + fs::{File, OpenOptions}, + io::{self, Write}, + path::Path, + sync::{Arc, Mutex}, + time::Instant, +}; + +use super::{CompletedIO, Executor, Item}; + +#[derive(Debug)] +pub struct _IncrementalFileState { + completed_chunks: Vec, + err: Option>, + item: Option, + finished: bool, +} + +pub(super) type IncrementalFileState = Arc>>; + +#[derive(Default, Debug)] +pub struct ImmediateUnpacker { + incremental_state: IncrementalFileState, +} -#[derive(Default)] -pub struct ImmediateUnpacker {} impl ImmediateUnpacker { pub fn new() -> Self { - Self {} + Self { + ..Default::default() + } + } + + fn deque(&self) -> Box> { + let mut guard = self.incremental_state.lock().unwrap(); + // incremental file in progress + if let Some(ref mut state) = *guard { + // Case 1: pending errors + if state.finished { + let mut item = state.item.take().unwrap(); + if state.err.is_some() { + let err = state.err.take().unwrap(); + item.result = err; + } + item.finish = item + .start + .map(|s| Instant::now().saturating_duration_since(s)); + if state.finished { + *guard = None; + } + Box::new(Some(CompletedIO::Item(item)).into_iter()) + } else { + // Case 2: pending chunks (which might be empty) + let mut completed_chunks = vec![]; + completed_chunks.append(&mut state.completed_chunks); + Box::new( + completed_chunks + .into_iter() + .map(|size| CompletedIO::Chunk(size)), + ) + } + } else { + Box::new(None.into_iter()) + } } } impl Executor for ImmediateUnpacker { - fn dispatch(&self, mut item: Item) -> Box + '_> { - perform(&mut item); - Box::new(Some(item).into_iter()) + fn dispatch(&self, mut item: Item) -> Box + '_> { + item.result = match &mut item.kind { + super::Kind::Directory => super::create_dir(&item.full_path), + super::Kind::File(ref contents) => { + super::write_file(&item.full_path, &contents, item.mode) + } + super::Kind::IncrementalFile(_incremental_file) => { + return { + // If there is a pending error, return it, otherwise stash the + // Item for eventual return when the file is finished. + let mut guard = self.incremental_state.lock().unwrap(); + if let Some(ref mut state) = *guard { + if state.err.is_some() { + let err = state.err.take().unwrap(); + item.result = err; + item.finish = item + .start + .map(|s| Instant::now().saturating_duration_since(s)); + *guard = None; + Box::new(Some(CompletedIO::Item(item)).into_iter()) + } else { + state.item = Some(item); + Box::new(None.into_iter()) + } + } else { + unreachable!(); + } + }; + } + }; + item.finish = item + .start + .map(|s| Instant::now().saturating_duration_since(s)); + Box::new(Some(CompletedIO::Item(item)).into_iter()) + } + + fn join(&mut self) -> Box> { + self.deque() } - fn join(&mut self) -> Box> { - Box::new(None.into_iter()) + fn completed(&self) -> Box> { + self.deque() } - fn completed(&self) -> Box> { - Box::new(None.into_iter()) + fn incremental_file_state(&self) -> super::IncrementalFileState { + let mut state = self.incremental_state.lock().unwrap(); + if let Some(_) = *state { + unreachable!(); + } else { + *state = Some(_IncrementalFileState { + completed_chunks: vec![], + err: None, + item: None, + finished: false, + }); + super::IncrementalFileState::Immediate(self.incremental_state.clone()) + } + } +} + +/// The non-shared state for writing a file incrementally +#[derive(Debug)] +pub(super) struct IncrementalFileWriter { + state: IncrementalFileState, + file: Option, + path_display: String, +} + +impl IncrementalFileWriter { + #[allow(unused_variables)] + pub fn new>( + path: P, + mode: u32, + state: IncrementalFileState, + ) -> std::result::Result { + let mut opts = OpenOptions::new(); + #[cfg(unix)] + { + use std::os::unix::fs::OpenOptionsExt; + opts.mode(mode); + } + let path = path.as_ref(); + let path_display = format!("{}", path.display()); + let file = Some({ + trace_scoped!("creat", "name": path_display); + opts.write(true).create(true).truncate(true).open(path)? + }); + Ok(IncrementalFileWriter { + file, + state, + path_display, + }) + } + + pub fn chunk_submit(&mut self, chunk: Vec) -> bool { + if (self.state.lock().unwrap()).is_none() { + return false; + } + match self.write(chunk) { + Ok(v) => v, + Err(e) => { + let mut state = self.state.lock().unwrap(); + if let Some(ref mut state) = *state { + state.err.replace(Err(e)); + state.finished = true; + false + } else { + false + } + } + } + } + + fn write(&mut self, chunk: Vec) -> std::result::Result { + let mut state = self.state.lock().unwrap(); + if let Some(ref mut state) = *state { + if let Some(ref mut file) = (&mut self.file).as_mut() { + // Length 0 vector is used for clean EOF signalling. + if chunk.len() == 0 { + trace_scoped!("close", "name:": self.path_display); + drop(std::mem::take(&mut self.file)); + state.finished = true; + } else { + trace_scoped!("write_segment", "name": self.path_display, "len": chunk.len()); + file.write_all(&chunk)?; + + state.completed_chunks.push(chunk.len()); + } + Ok(true) + } else { + Ok(false) + } + } else { + unreachable!(); + } } } diff --git a/src/diskio/mod.rs b/src/diskio/mod.rs index cc2ae9e65e..06e3d05328 100644 --- a/src/diskio/mod.rs +++ b/src/diskio/mod.rs @@ -52,23 +52,74 @@ // f) data gathering: record (name, bytes, start, duration) // write to disk afterwards as a csv file? pub mod immediate; +#[cfg(test)] +mod test; pub mod threaded; -use std::fs::OpenOptions; use std::io::{self, Write}; use std::path::{Path, PathBuf}; +use std::sync::mpsc::Receiver; use std::time::{Duration, Instant}; +use std::{fmt::Debug, fs::OpenOptions}; use crate::errors::{Result, ResultExt}; use crate::process; use crate::utils::notifications::Notification; +/// Carries the implementation specific channel data into the executor. +#[derive(Debug)] +pub enum IncrementalFile { + ImmediateReceiver, + ThreadedReceiver(Receiver>), +} + +// The basic idea is that in single threaded mode we get this pattern: +// package budget io-layer +// +<-claim-> +// +-submit--------+ | write +// +-complete------+ +// + +// .. loop .. +// In thread mode with lots of memory we want the following: +// +<-claim-> +// +-submit--------+ +// +<-claim-> +// +-submit--------+ +// .. loop .. | writes +// +-complete------+ +// + +// +-complete------+ +// + +// In thread mode with limited memory we want the following: +// +<-claim-> +// +-submit--------+ +// +<-claim-> +// +-submit--------+ +// .. loop up to budget .. | writes +// +-complete------+ +// + +// +<-claim-> +// +-submit--------+ +// .. loop etc .. +// +// lastly we want pending IOs such as directory creation to be able to complete in the same way, so a chunk completion +// needs to be able to report back in the same fashion; folding it into the same enum will make the driver code easier to write. +// +// The implementation is done via a pair of MPSC channels. One to send data to write. In +// the immediate model, acknowledgements are sent after doing the write immediately. In the threaded model, +// acknowledgements are sent after the write completes in the thread pool handler. In the packages code the inner that +// handles iops and continues processing incremental mode files handles the connection between the acks and the budget. +// Error reporting is passed through the regular completion port, to avoid creating a new special case. + +/// What kind of IO operation to perform #[derive(Debug)] pub enum Kind { Directory, File(Vec), + IncrementalFile(IncrementalFile), } +/// The details of the IO operation #[derive(Debug)] pub struct Item { /// The path to operate on @@ -81,12 +132,20 @@ pub struct Item { pub finish: Option, /// The length of the file, for files (for stats) pub size: Option, - /// The result of the operation + /// The result of the operation (could now be factored into CompletedIO...) pub result: io::Result<()>, /// The mode to apply pub mode: u32, } +#[derive(Debug)] +pub enum CompletedIO { + /// A submitted Item has completed + Item(Item), + /// An IncrementalFile has completed a single chunk + Chunk(usize), +} + impl Item { pub fn make_dir(full_path: PathBuf, mode: u32) -> Self { Self { @@ -112,6 +171,61 @@ impl Item { mode, } } + + pub fn write_file_segmented<'a>( + full_path: PathBuf, + mode: u32, + state: IncrementalFileState, + ) -> Result<(Self, Box) -> bool + 'a>)> { + let (chunk_submit, content_callback) = state.incremental_file_channel(&full_path, mode)?; + let result = Self { + full_path, + kind: Kind::IncrementalFile(content_callback), + start: None, + finish: None, + size: None, + result: Ok(()), + mode, + }; + Ok((result, Box::new(chunk_submit))) + } +} + +// This could be a boxed trait object perhaps... but since we're looking at +// rewriting this all into an aio layer anyway, and not looking at plugging +// different backends in at this time, it can keep. +/// Implementation specific state for incremental file writes. This effectively +/// just allows the immediate codepath to get access to the Arc referenced state +/// without holding a lifetime reference to the executor, as the threaded code +/// path is all message passing. +pub enum IncrementalFileState { + Threaded, + Immediate(immediate::IncrementalFileState), +} + +impl IncrementalFileState { + /// Get a channel for submitting incremental file chunks to the executor + fn incremental_file_channel( + &self, + path: &Path, + mode: u32, + ) -> Result<(Box) -> bool>, IncrementalFile)> { + use std::sync::mpsc::channel; + match self { + &IncrementalFileState::Threaded => { + let (tx, rx) = channel::>(); + let content_callback = IncrementalFile::ThreadedReceiver(rx); + let chunk_submit = move |chunk: Vec| tx.send(chunk).is_ok(); + Ok((Box::new(chunk_submit), content_callback)) + } + &IncrementalFileState::Immediate(ref state) => { + let content_callback = IncrementalFile::ImmediateReceiver; + let mut writer = immediate::IncrementalFileWriter::new(path, mode, state.clone())?; + let chunk_submit = move |chunk: Vec| writer.chunk_submit(chunk); + Ok((Box::new(chunk_submit), content_callback)) + } + } + } } /// Trait object for performing IO. At this point the overhead @@ -122,7 +236,7 @@ pub trait Executor { /// During overload situations previously queued items may /// need to be completed before the item is accepted: /// consume the returned iterator. - fn execute(&self, mut item: Item) -> Box + '_> { + fn execute(&self, mut item: Item) -> Box + '_> { item.start = Some(Instant::now()); self.dispatch(item) } @@ -130,26 +244,35 @@ pub trait Executor { /// Actually dispatch a operation. /// This is called by the default execute() implementation and /// should not be called directly. - fn dispatch(&self, item: Item) -> Box + '_>; + fn dispatch(&self, item: Item) -> Box + '_>; /// Wrap up any pending operations and iterate over them. /// All operations submitted before the join will have been /// returned either through ready/complete or join once join /// returns. - fn join(&mut self) -> Box + '_>; + fn join(&mut self) -> Box + '_>; /// Iterate over completed items. - fn completed(&self) -> Box + '_>; + fn completed(&self) -> Box + '_>; + + /// Get any state needed for incremental file processing + fn incremental_file_state(&self) -> IncrementalFileState; } /// Trivial single threaded IO to be used from executors. /// (Crazy sophisticated ones can obviously ignore this) -pub fn perform(item: &mut Item) { +pub fn perform(item: &mut Item, chunk_complete_callback: F) { // directories: make them, TODO: register with the dir existence cache. // Files, write them. - item.result = match item.kind { + item.result = match &mut item.kind { Kind::Directory => create_dir(&item.full_path), Kind::File(ref contents) => write_file(&item.full_path, &contents, item.mode), + Kind::IncrementalFile(incremental_file) => write_file_incremental( + &item.full_path, + incremental_file, + item.mode, + chunk_complete_callback, + ), }; item.finish = item .start @@ -187,6 +310,51 @@ pub fn write_file, C: AsRef<[u8]>>( Ok(()) } +#[allow(unused_variables)] +pub fn write_file_incremental, F: Fn(usize)>( + path: P, + content_callback: &mut IncrementalFile, + mode: u32, + chunk_complete_callback: F, +) -> io::Result<()> { + let mut opts = OpenOptions::new(); + #[cfg(unix)] + { + use std::os::unix::fs::OpenOptionsExt; + opts.mode(mode); + } + let path = path.as_ref(); + let path_display = format!("{}", path.display()); + let mut f = { + trace_scoped!("creat", "name": path_display); + opts.write(true).create(true).truncate(true).open(path)? + }; + if let IncrementalFile::ThreadedReceiver(recv) = content_callback { + loop { + // We unwrap here because the documented only reason for recv to fail is a close by the sender, which is reading + // from the tar file: a failed read there will propogate the error in the main thread directly. + let contents = recv.recv().unwrap(); + let len = contents.len(); + // Length 0 vector is used for clean EOF signalling. + if len == 0 { + break; + } + { + trace_scoped!("write_segment", "name": path_display, "len": len); + f.write_all(&contents)?; + chunk_complete_callback(len); + } + } + } else { + unreachable!(); + } + { + trace_scoped!("close", "name:": path_display); + drop(f); + } + Ok(()) +} + pub fn create_dir>(path: P) -> io::Result<()> { let path = path.as_ref(); let path_display = format!("{}", path.display()); diff --git a/src/diskio/test.rs b/src/diskio/test.rs new file mode 100644 index 0000000000..7cba480f7b --- /dev/null +++ b/src/diskio/test.rs @@ -0,0 +1,79 @@ +use std::collections::HashMap; + +use crate::errors::Result; +use crate::test::test_dir; + +use super::{get_executor, Executor, Item}; +use crate::currentprocess; + +fn test_incremental_file(io_threads: &str) -> Result<()> { + let work_dir = test_dir()?; + let mut vars = HashMap::new(); + vars.insert("RUSTUP_IO_THREADS".to_string(), io_threads.to_string()); + let tp = Box::new(currentprocess::TestProcess { + vars, + ..Default::default() + }); + currentprocess::with(tp.clone(), || -> Result<()> { + let mut written = 0; + let mut file_finished = false; + let mut io_executor: Box = get_executor(None)?; + let (item, mut sender) = Item::write_file_segmented( + work_dir.path().join("scratch"), + 0o666, + io_executor.incremental_file_state(), + )?; + for _ in io_executor.execute(item).collect::>() { + // The file should be open and incomplete, and no completed chunks + unreachable!(); + } + let mut chunk: Vec = vec![]; + chunk.extend(b"0123456789"); + // We should be able to submit more than one chunk + sender(chunk.clone()); + sender(chunk.clone()); + loop { + for work in io_executor.completed().collect::>() { + match work { + super::CompletedIO::Chunk(size) => written += size, + super::CompletedIO::Item(item) => unreachable!(format!("{:?}", item)), + } + } + if written == 20 { + break; + } + } + // sending a zero length chunk closes the file + sender(vec![]); + loop { + for work in io_executor.completed().collect::>() { + match work { + super::CompletedIO::Chunk(_) => unreachable!(), + super::CompletedIO::Item(_) => { + file_finished = true; + } + } + } + if file_finished == true { + break; + } + } + assert_eq!(true, file_finished); + for _ in io_executor.join().collect::>() { + // no more work should be outstanding + unreachable!(); + } + Ok(()) + })?; + Ok(()) +} + +#[test] +fn test_incremental_file_immediate() -> Result<()> { + test_incremental_file("1") +} + +#[test] +fn test_incremental_file_threaded() -> Result<()> { + test_incremental_file("2") +} diff --git a/src/diskio/threaded.rs b/src/diskio/threaded.rs index 8fb4ea586d..64e504e7ee 100644 --- a/src/diskio/threaded.rs +++ b/src/diskio/threaded.rs @@ -4,17 +4,17 @@ /// than desired. In particular the docs workload with 20K files requires /// very low latency per file, which even a few ms per syscall per file /// will cause minutes of wall clock time. -use super::{perform, Executor, Item}; -use crate::utils::notifications::Notification; -use crate::utils::units::Unit; - use std::cell::Cell; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; +use super::{perform, CompletedIO, Executor, Item}; +use crate::utils::notifications::Notification; +use crate::utils::units::Unit; + enum Task { - Request(Item), + Request(CompletedIO), // Used to synchronise in the join method. Sentinel, } @@ -59,27 +59,31 @@ impl<'a> Threaded<'a> { self.n_files.fetch_add(1, Ordering::Relaxed); let n_files = self.n_files.clone(); self.pool.execute(move || { - perform(&mut item); + let chunk_complete_callback = |size| { + tx.send(Task::Request(CompletedIO::Chunk(size))) + .expect("receiver should be listening") + }; + perform(&mut item, chunk_complete_callback); n_files.fetch_sub(1, Ordering::Relaxed); - tx.send(Task::Request(item)) + tx.send(Task::Request(CompletedIO::Item(item))) .expect("receiver should be listening"); }); } } impl<'a> Executor for Threaded<'a> { - fn dispatch(&self, item: Item) -> Box + '_> { + fn dispatch(&self, item: Item) -> Box + '_> { // Yield any completed work before accepting new work - keep memory // pressure under control // - return an iterator that runs until we can submit and then submits // as its last action Box::new(SubmitIterator { executor: self, - item: Cell::new(Task::Request(item)), + item: Cell::new(Some(item)), }) } - fn join(&mut self) -> Box + '_> { + fn join(&mut self) -> Box + '_> { // Some explanation is in order. Even though the tar we are reading from (if // any) will have had its FileWithProgress download tracking // completed before we hit drop, that is not true if we are unwinding due to a @@ -145,12 +149,16 @@ impl<'a> Executor for Threaded<'a> { }) } - fn completed(&self) -> Box + '_> { + fn completed(&self) -> Box + '_> { Box::new(JoinIterator { iter: self.rx.try_iter(), consume_sentinel: true, }) } + + fn incremental_file_state(&self) -> super::IncrementalFileState { + super::IncrementalFileState::Threaded + } } impl<'a> Drop for Threaded<'a> { @@ -166,9 +174,9 @@ struct JoinIterator> { } impl> Iterator for JoinIterator { - type Item = Item; + type Item = CompletedIO; - fn next(&mut self) -> Option { + fn next(&mut self) -> Option { let task_o = self.iter.next(); match task_o { None => None, @@ -188,13 +196,13 @@ impl> Iterator for JoinIterator { struct SubmitIterator<'a, 'b> { executor: &'a Threaded<'b>, - item: Cell, + item: Cell>, } impl<'a, 'b> Iterator for SubmitIterator<'a, 'b> { - type Item = Item; + type Item = CompletedIO; - fn next(&mut self) -> Option { + fn next(&mut self) -> Option { // The number here is arbitrary; just a number to stop exhausting fd's on linux // and still allow rapid decompression to generate work to dispatch // This function could perhaps be tuned: e.g. it may wait in rx.iter() @@ -203,7 +211,7 @@ impl<'a, 'b> Iterator for SubmitIterator<'a, 'b> { // actually completes; however, results are presently ok. let threshold = 5; if self.executor.pool.queued_count() < threshold { - if let Task::Request(item) = self.item.take() { + if let Some(item) = self.item.take() { self.executor.submit(item); }; None @@ -213,7 +221,7 @@ impl<'a, 'b> Iterator for SubmitIterator<'a, 'b> { return Some(item); } if self.executor.pool.queued_count() < threshold { - if let Task::Request(item) = self.item.take() { + if let Some(item) = self.item.take() { self.executor.submit(item); }; return None; diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index 3888b5f876..4dd2b0ec26 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -10,7 +10,7 @@ use std::path::{Path, PathBuf}; use tar::EntryType; -use crate::diskio::{get_executor, Executor, Item, Kind}; +use crate::diskio::{get_executor, CompletedIO, Executor, Item, Kind}; use crate::dist::component::components::*; use crate::dist::component::transaction::*; use crate::dist::temp; @@ -165,7 +165,7 @@ struct MemoryBudget { // Probably this should live in diskio but ¯\_(ツ)_/¯ impl MemoryBudget { fn new( - max_file_size: usize, + io_chunk_size: usize, effective_max_ram: Option, notify_handler: Option<&dyn Fn(Notification<'_>)>, ) -> Self { @@ -195,30 +195,38 @@ impl MemoryBudget { } }; - // Future us: this can be removed when IO chunking within a single file is possible: it just helps generate good - // errors rather than allocator-failure panics when we hit the large file on a RAM limited system. - if max_file_size > unpack_ram { - panic!("RUSTUP_UNPACK_RAM must be larger than {}", max_file_size); + if io_chunk_size > unpack_ram { + panic!("RUSTUP_UNPACK_RAM must be larger than {}", io_chunk_size); } Self { limit: unpack_ram, used: 0, } } - fn reclaim(&mut self, op: &Item) { - match &op.kind { - Kind::Directory => {} - Kind::File(content) => self.used -= content.len(), - }; + + fn reclaim(&mut self, op: &CompletedIO) { + match &op { + CompletedIO::Item(op) => match &op.kind { + Kind::Directory => {} + Kind::File(content) => self.used -= content.len(), + Kind::IncrementalFile(_) => {} + }, + CompletedIO::Chunk(size) => self.used -= size, + } } fn claim(&mut self, op: &Item) { match &op.kind { Kind::Directory => {} Kind::File(content) => self.used += content.len(), + Kind::IncrementalFile(_) => {} }; } + fn claim_chunk(&mut self, len: usize) { + self.used += len; + } + fn available(&self) -> usize { self.limit - self.used } @@ -226,23 +234,27 @@ impl MemoryBudget { /// Handle the async result of io operations /// Replaces op.result with Ok(()) -fn filter_result(op: &mut Item) -> io::Result<()> { - let result = mem::replace(&mut op.result, Ok(())); - match result { - Ok(_) => Ok(()), - Err(e) => match e.kind() { - IOErrorKind::AlreadyExists => { - // mkdir of e.g. ~/.rustup already existing is just fine; - // for others it would be better to know whether it is - // expected to exist or not -so put a flag in the state. - if let Kind::Directory = op.kind { - Ok(()) - } else { - Err(e) +fn filter_result(op: &mut CompletedIO) -> io::Result<()> { + if let CompletedIO::Item(op) = op { + let result = mem::replace(&mut op.result, Ok(())); + match result { + Ok(_) => Ok(()), + Err(e) => match e.kind() { + IOErrorKind::AlreadyExists => { + // mkdir of e.g. ~/.rustup already existing is just fine; + // for others it would be better to know whether it is + // expected to exist or not -so put a flag in the state. + if let Kind::Directory = op.kind { + Ok(()) + } else { + Err(e) + } } - } - _ => Err(e), - }, + _ => Err(e), + }, + } + } else { + Ok(()) } } @@ -251,32 +263,35 @@ fn filter_result(op: &mut Item) -> io::Result<()> { /// /// Currently the volume of queued items does not count as backpressure against /// the main tar extraction process. +/// Returns the number of triggered children fn trigger_children( io_executor: &dyn Executor, directories: &mut HashMap, budget: &mut MemoryBudget, - item: Item, + op: CompletedIO, ) -> Result { let mut result = 0; - if let Kind::Directory = item.kind { - let mut pending = Vec::new(); - directories - .entry(item.full_path) - .and_modify(|status| match status { - DirStatus::Exists => unreachable!(), - DirStatus::Pending(pending_inner) => { - pending.append(pending_inner); - *status = DirStatus::Exists; + if let CompletedIO::Item(item) = op { + if let Kind::Directory = item.kind { + let mut pending = Vec::new(); + directories + .entry(item.full_path) + .and_modify(|status| match status { + DirStatus::Exists => unreachable!(), + DirStatus::Pending(pending_inner) => { + pending.append(pending_inner); + *status = DirStatus::Exists; + } + }) + .or_insert_with(|| unreachable!()); + result += pending.len(); + for pending_item in pending.into_iter() { + for mut item in io_executor.execute(pending_item).collect::>() { + // TODO capture metrics + budget.reclaim(&item); + filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; + result += trigger_children(io_executor, directories, budget, item)?; } - }) - .or_insert_with(|| unreachable!()); - result += pending.len(); - for pending_item in pending.into_iter() { - for mut item in io_executor.execute(pending_item).collect::>() { - // TODO capture metrics - budget.reclaim(&item); - filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; - result += trigger_children(io_executor, directories, budget, item)?; } } }; @@ -298,7 +313,7 @@ fn unpack_without_first_dir<'a, R: Read>( let entries = archive .entries() .chain_err(|| ErrorKind::ExtractingPackage)?; - const MAX_FILE_SIZE: u64 = 220_000_000; + const IO_CHUNK_SIZE: u64 = 16_777_216; let effective_max_ram = match effective_limits::memory_limit() { Ok(ram) => Some(ram as usize), Err(e) => { @@ -308,7 +323,7 @@ fn unpack_without_first_dir<'a, R: Read>( None } }; - let mut budget = MemoryBudget::new(MAX_FILE_SIZE as usize, effective_max_ram, notify_handler); + let mut budget = MemoryBudget::new(IO_CHUNK_SIZE as usize, effective_max_ram, notify_handler); let mut directories: HashMap = HashMap::new(); // Path is presumed to exist. Call it a precondition. @@ -349,36 +364,44 @@ fn unpack_without_first_dir<'a, R: Read>( continue; } - let size = entry.header().size()?; - if size > MAX_FILE_SIZE { - // If we cannot tell the user we will either succeed (great), or fail (and we may get a bug report), either - // way, we will most likely get reports from users about this, so the possible set of custom builds etc that - // don't report are not a great concern. - if let Some(notify_handler) = notify_handler { - notify_handler(Notification::Error(format!( - "File too big {} {}", - relpath.display(), - size - ))); - } - } - - fn flush_ios( + /// true if either no sender_entry was provided, or the incremental file + /// has been fully dispatched. + fn flush_ios<'a, R: std::io::Read, P: AsRef>( mut budget: &mut MemoryBudget, io_executor: &dyn Executor, mut directories: &mut HashMap, - ) -> Result<()> { - for mut item in io_executor.completed().collect::>() { + mut sender_entry: Option<&mut ( + Box) -> bool + 'a>, + &mut tar::Entry<'_, R>, + )>, + full_path: P, + ) -> Result { + let mut result = sender_entry.is_none(); + for mut op in io_executor.completed().collect::>() { // TODO capture metrics - budget.reclaim(&item); - filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; - trigger_children(&*io_executor, &mut directories, &mut budget, item)?; + budget.reclaim(&op); + filter_result(&mut op).chain_err(|| ErrorKind::ExtractingPackage)?; + trigger_children(&*io_executor, &mut directories, &mut budget, op)?; } - Ok(()) - } - - while size > budget.available() as u64 { - flush_ios(&mut budget, &*io_executor, &mut directories)?; + // Maybe stream a file incrementally + if let Some((sender, entry)) = sender_entry.as_mut() { + if budget.available() as u64 >= IO_CHUNK_SIZE { + let mut v = vec![0; IO_CHUNK_SIZE as usize]; + let len = entry.read(&mut v)?; + if len == 0 { + result = true; + } + v.resize(len, 0); + budget.claim_chunk(len); + if !sender(v) { + return Err(ErrorKind::DisconnectedChannel( + full_path.as_ref().to_path_buf(), + ) + .into()); + } + } + } + Ok(result) } // Bail out if we get hard links, device nodes or any other unusual content @@ -409,15 +432,39 @@ fn unpack_without_first_dir<'a, R: Read>( let o_mode = g_mode >> 3; let mode = u_mode | g_mode | o_mode; + let file_size = entry.header().size()?; + let size = std::cmp::min(IO_CHUNK_SIZE, file_size); + + while size > budget.available() as u64 { + flush_ios::, _>( + &mut budget, + &*io_executor, + &mut directories, + None, + &full_path, + )?; + } + + let mut incremental_file_sender: Option) -> bool + '_>> = None; let mut item = match kind { EntryType::Directory => { directories.insert(full_path.to_owned(), DirStatus::Pending(Vec::new())); - Item::make_dir(full_path, mode) + Item::make_dir(full_path.clone(), mode) } EntryType::Regular => { - let mut v = Vec::with_capacity(size as usize); - entry.read_to_end(&mut v)?; - Item::write_file(full_path, v, mode) + if file_size > IO_CHUNK_SIZE { + let (item, sender) = Item::write_file_segmented( + full_path.clone(), + mode, + io_executor.incremental_file_state(), + )?; + incremental_file_sender = Some(sender); + item + } else { + let mut v = Vec::with_capacity(size as usize); + entry.read_to_end(&mut v)?; + Item::write_file(full_path.clone(), v, mode) + } } _ => return Err(ErrorKind::UnsupportedKind(format!("{:?}", kind)).into()), }; @@ -456,12 +503,28 @@ fn unpack_without_first_dir<'a, R: Read>( } }; + // Submit the new item for mut item in io_executor.execute(item).collect::>() { // TODO capture metrics budget.reclaim(&item); filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?; trigger_children(&*io_executor, &mut directories, &mut budget, item)?; } + let mut incremental_file_sender = + if let Some(incremental_file_sender) = incremental_file_sender { + Some((incremental_file_sender, &mut entry)) + } else { + None + }; + + // monitor io queue and feed in the content of the file (if needed) + while !flush_ios( + &mut budget, + &*io_executor, + &mut directories, + incremental_file_sender.as_mut(), + &full_path, + )? {} } loop { diff --git a/src/errors.rs b/src/errors.rs index 2e43e86b42..dafc69c9ed 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -106,6 +106,10 @@ error_chain! { description("could not download file") display("could not download file from '{}' to '{}'", url, path.display()) } + DisconnectedChannel (v: PathBuf) { + description("IO channel disconnected") + display("IO receiver for '{}' disconnected", v.display()) + } InvalidUrl { url: String, } {