Skip to content

Commit

Permalink
Thread dir creation as well
Browse files Browse the repository at this point in the history
When directories complete, start writing the files/dirs within
that directory that have been decompressed already.

Avoids a stat() + create_dir_all() in the main thread permitting
more concurrent IO dispatch in exchange for memory pressure.
  • Loading branch information
rbtcollins committed Jun 6, 2019
1 parent b29c436 commit 970e041
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 57 deletions.
8 changes: 4 additions & 4 deletions src/diskio/immediate.rs
Expand Up @@ -42,11 +42,11 @@ impl Executor for ImmediateUnpacker {
Box::new(ImmediateIterator(Cell::new(IterateOne::Item(item))))
}

fn join(&mut self) -> Option<Box<dyn Iterator<Item = Item>>> {
None
fn join(&mut self) -> Box<dyn Iterator<Item = Item>> {
Box::new(ImmediateIterator(Cell::new(IterateOne::None)))
}

fn completed(&mut self) -> Option<Box<dyn Iterator<Item = Item>>> {
None
fn completed(&mut self) -> Box<dyn Iterator<Item = Item>> {
Box::new(ImmediateIterator(Cell::new(IterateOne::None)))
}
}
4 changes: 2 additions & 2 deletions src/diskio/mod.rs
Expand Up @@ -136,10 +136,10 @@ pub trait Executor {
/// All operations submitted before the join will have been
/// returned either through ready/complete or join once join
/// returns.
fn join(&mut self) -> Option<Box<dyn '_ + Iterator<Item = Item>>>;
fn join(&mut self) -> Box<dyn '_ + Iterator<Item = Item>>;

/// Iterate over completed items.
fn completed(&mut self) -> Option<Box<dyn '_ + Iterator<Item = Item>>>;
fn completed(&mut self) -> Box<dyn '_ + Iterator<Item = Item>>;
}

/// Trivial single threaded IO to be used from executors.
Expand Down
16 changes: 7 additions & 9 deletions src/diskio/threaded.rs
Expand Up @@ -98,7 +98,7 @@ impl<'a> Executor for Threaded<'a> {
})
}

fn join(&mut self) -> Option<Box<dyn '_ + Iterator<Item = Item>>> {
fn join(&mut self) -> Box<dyn '_ + Iterator<Item = Item>> {
// Some explanation is in order. Even though the tar we are reading from (if
// any) will have had its FileWithProgress download tracking
// completed before we hit drop, that is not true if we are unwinding due to a
Expand Down Expand Up @@ -157,26 +157,24 @@ impl<'a> Executor for Threaded<'a> {
self.tx
.send(Task::Sentinel)
.expect("must still be listening");
Some(Box::new(JoinIterator {
Box::new(JoinIterator {
iter: self.rx.iter(),
consume_sentinel: false,
}))
})
}

fn completed(&mut self) -> Option<Box<dyn '_ + Iterator<Item = Item>>> {
Some(Box::new(JoinIterator {
fn completed(&mut self) -> Box<dyn '_ + Iterator<Item = Item>> {
Box::new(JoinIterator {
iter: self.rx.try_iter(),
consume_sentinel: true,
}))
})
}
}

impl<'a> Drop for Threaded<'a> {
fn drop(&mut self) {
// We are not permitted to fail - consume but do not handle the items.
if let Some(iter) = self.join() {
for _ in iter {}
}
self.join().for_each(drop);
}
}

Expand Down
160 changes: 118 additions & 42 deletions src/dist/component/package.rs
Expand Up @@ -10,9 +10,11 @@ use crate::errors::*;
use crate::utils::notifications::Notification;
use crate::utils::utils;

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::io::{self, ErrorKind as IOErrorKind, Read};
use std::iter::FromIterator;
use std::mem;
use std::path::{Path, PathBuf};

use tar::EntryType;
Expand Down Expand Up @@ -155,14 +157,17 @@ impl<'a> TarPackage<'a> {
}
}

// Handle the async result of io operations
fn filter_result(op: Item) -> io::Result<()> {
match op.result {
/// Handle the async result of io operations
/// Replaces op.result with Ok(())
fn filter_result(op: &mut Item) -> io::Result<()> {
let result = mem::replace(&mut op.result, Ok(()));
match result {
Ok(_) => Ok(()),
Err(e) => match e.kind() {
// TODO: the IO execution logic should pass this back rather than
// being the code to ignore it.
IOErrorKind::AlreadyExists => {
// mkdir of e.g. ~/.rustup already existing is just fine;
// for others it would be better to know whether it is
// expected to exist or not -so put a flag in the state.
if let Kind::Directory = op.kind {
Ok(())
} else {
Expand All @@ -174,6 +179,47 @@ fn filter_result(op: Item) -> io::Result<()> {
}
}

/// Dequeue the children of directories queued up waiting for the directory to
/// be created.
///
/// Currently the volume of queued items does not count as backpressure against
/// the main tar extraction process.
fn trigger_children(
io_executor: &mut dyn Executor,
directories: &mut HashMap<PathBuf, DirStatus>,
item: Item,
) -> Result<usize> {
let mut result = 0;
if let Kind::Directory = item.kind {
let mut pending = Vec::new();
directories
.entry(item.full_path)
.and_modify(|status| match status {
DirStatus::Exists => unreachable!(),
DirStatus::Pending(pending_inner) => {
pending.append(pending_inner);
*status = DirStatus::Exists;
}
})
.or_insert_with(|| unreachable!());
result += pending.len();
for pending_item in pending.into_iter() {
for mut item in Vec::from_iter(io_executor.execute(pending_item)) {
// TODO capture metrics
filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?;
result += trigger_children(io_executor, directories, item)?;
}
}
};
Ok(result)
}

/// What is the status of this directory ?
enum DirStatus {
Exists,
Pending(Vec<Item>),
}

fn unpack_without_first_dir<'a, R: Read>(
archive: &mut tar::Archive<R>,
path: &Path,
Expand All @@ -183,9 +229,20 @@ fn unpack_without_first_dir<'a, R: Read>(
let entries = archive
.entries()
.chain_err(|| ErrorKind::ExtractingPackage)?;
let mut checked_parents: HashSet<PathBuf> = HashSet::new();
let mut directories: HashMap<PathBuf, DirStatus> = HashMap::new();
// Path is presumed to exist. Call it a precondition.
directories.insert(path.to_owned(), DirStatus::Exists);

'entries: for entry in entries {
// drain completed results to keep memory pressure low and respond
// rapidly to completed events even if we couldn't submit work (because
// our unpacked item is pending dequeue)
for mut item in Vec::from_iter(io_executor.completed()) {
// TODO capture metrics
filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?;
trigger_children(&mut *io_executor, &mut directories, item)?;
}

for entry in entries {
let mut entry = entry.chain_err(|| ErrorKind::ExtractingPackage)?;
let relpath = {
let path = entry.path();
Expand All @@ -200,9 +257,13 @@ fn unpack_without_first_dir<'a, R: Read>(
}
}
let mut components = relpath.components();
// Throw away the first path component: we make our own root
// Throw away the first path component: our root was supplied.
components.next();
let full_path = path.join(&components.as_path());
if full_path == path {
// The tmp dir code makes the root dir for us.
continue;
}

let size = entry.header().size()?;
if size > 100_000_000 {
Expand Down Expand Up @@ -236,8 +297,11 @@ fn unpack_without_first_dir<'a, R: Read>(
let o_mode = g_mode >> 3;
let mode = u_mode | g_mode | o_mode;

let item = match kind {
EntryType::Directory => Item::make_dir(full_path, mode),
let mut item = match kind {
EntryType::Directory => {
directories.insert(full_path.to_owned(), DirStatus::Pending(Vec::new()));
Item::make_dir(full_path, mode)
}
EntryType::Regular => {
let mut v = Vec::with_capacity(size as usize);
entry.read_to_end(&mut v)?;
Expand All @@ -246,45 +310,57 @@ fn unpack_without_first_dir<'a, R: Read>(
_ => return Err(ErrorKind::UnsupportedKind(format!("{:?}", kind)).into()),
};

// FUTURE: parallelise or delete (surely all distribution tars are well formed in this regard).
// Create the full path to the entry if it does not exist already
if let Some(parent) = item.full_path.parent() {
if !checked_parents.contains(parent) {
checked_parents.insert(parent.to_owned());
// It would be nice to optimise this stat out, but the tar could be like so:
// a/deep/file.txt
// a/file.txt
// which would require tracking the segments rather than a simple hash.
// Until profile shows that one stat per dir is a problem (vs one stat per file)
// leave till later.

if !parent.exists() {
let path_display = format!("{}", parent.display());
trace_scoped!("create_dir_all", "name": path_display);
std::fs::create_dir_all(&parent).chain_err(|| ErrorKind::ExtractingPackage)?
let item = loop {
// Create the full path to the entry if it does not exist already
if let Some(parent) = item.full_path.to_owned().parent() {
match directories.get_mut(parent) {
None => {
// Tar has item before containing directory
// Complain about this so we can see if these exist.
eprintln!(
"Unexpected: missing parent '{}' for '{}'",
parent.display(),
entry.path()?.display()
);
directories.insert(parent.to_owned(), DirStatus::Pending(vec![item]));
item = Item::make_dir(parent.to_owned(), 0o755);
// Check the parent's parent
continue;
}
Some(DirStatus::Exists) => {
break item;
}
Some(DirStatus::Pending(pending)) => {
// Parent dir is being made, take next item from tar
pending.push(item);
continue 'entries;
}
}
} else {
unreachable!();
}
}
};

for item in io_executor.execute(item) {
// TODO capture metrics, add directories to created cache
filter_result(item).chain_err(|| ErrorKind::ExtractingPackage)?;
for mut item in Vec::from_iter(io_executor.execute(item)) {
// TODO capture metrics
filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?;
trigger_children(&mut *io_executor, &mut directories, item)?;
}

// drain completed results to keep memory pressure low
if let Some(iter) = io_executor.completed() {
for prev_item in iter {
// TODO capture metrics, add directories to created cache
filter_result(prev_item).chain_err(|| ErrorKind::ExtractingPackage)?;
}
}
}

if let Some(iter) = io_executor.join() {
for item in iter {
loop {
let mut triggered = 0;
for mut item in Vec::from_iter(io_executor.join()) {
// handle final IOs
// TODO capture metrics, add directories to created cache
filter_result(item).chain_err(|| ErrorKind::ExtractingPackage)?;
// TODO capture metrics
filter_result(&mut item).chain_err(|| ErrorKind::ExtractingPackage)?;
triggered += trigger_children(&mut *io_executor, &mut directories, item)?;
}
if triggered == 0 {
// None of the IO submitted before the prior join triggered any new
// submissions
break;
}
}

Expand Down

0 comments on commit 970e041

Please sign in to comment.