Skip to content

Commit

Permalink
Auto merge of #411 - pietroalbini:cleanup-target-dir, r=pietroalbini
Browse files Browse the repository at this point in the history
Cleanup target directory when the disk is full

This PR removes all target directories after disk usage reaches 90% on the partition containing the work directory. Each worker thread will cleanup its own directory when it's not busy, to avoid disrupting running tests.

cc @Mark-Simulacrum
  • Loading branch information
bors committed Apr 23, 2019
2 parents 57937d5 + c058716 commit 3347bbb
Show file tree
Hide file tree
Showing 4 changed files with 326 additions and 76 deletions.
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -65,6 +65,7 @@ env_logger = "0.6.0"
hmac = "0.7"
sha-1 = "0.8"
rust_team_data = { git = "https://github.com/rust-lang/team" }
systemstat = "0.1.4"

[dev-dependencies]
assert_cmd = "0.10.1"
Expand Down
140 changes: 64 additions & 76 deletions src/runner/mod.rs
Expand Up @@ -4,21 +4,27 @@ mod tasks;
mod test;
mod toml_frobber;
mod unstable_features;
mod worker;

use crate::config::Config;
use crate::crates::Crate;
use crate::docker::DockerEnv;
use crate::experiments::{Experiment, Mode};
use crate::logs::LogStorage;
use crate::prelude::*;
use crate::results::{BrokenReason, TestResult, WriteResults};
use crate::runner::graph::{build_graph, WalkResult};
use crate::results::{TestResult, WriteResults};
use crate::runner::graph::build_graph;
use crate::runner::worker::{DiskSpaceWatcher, Worker};
use crate::utils;
use crossbeam_utils::thread::scope;
use crossbeam_utils::thread::{scope, ScopedJoinHandle};
use std::collections::HashMap;
use std::path::Path;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;

const DISK_SPACE_WATCHER_INTERVAL: Duration = Duration::from_secs(1);
const DISK_SPACE_WATCHER_THRESHOLD: f32 = 0.9;

#[derive(Debug, Fail)]
#[fail(display = "overridden task result to {}", _0)]
Expand Down Expand Up @@ -101,86 +107,47 @@ fn run_ex_inner<DB: WriteResults + Sync>(
Mutex::new(HashMap::new());
let state = RunnerState::new();

let workers = (0..threads_count)
.map(|i| {
Worker::new(
format!("worker-{}", i),
ex,
config,
&graph,
&state,
db,
&docker_env,
&parked_threads,
)
})
.collect::<Vec<_>>();

let disk_watcher = DiskSpaceWatcher::new(
DISK_SPACE_WATCHER_INTERVAL,
DISK_SPACE_WATCHER_THRESHOLD,
&workers,
);

scope(|scope| -> Fallible<()> {
let mut threads = Vec::new();

for i in 0..threads_count {
let name = format!("worker-{}", i);
let join = scope.builder().name(name).spawn(|| -> Fallible<()> {
// This uses a `loop` instead of a `while let` to avoid locking the graph too much
loop {
let walk_result = graph.lock().unwrap().next_task(ex, db);
match walk_result {
WalkResult::Task(id, task) => {
info!("running task: {:?}", task);
if let Err(e) = task.run(config, ex, db, &docker_env, &state) {
error!("task failed, marking childs as failed too: {:?}", task);
utils::report_failure(&e);

let mut result = if config.is_broken(&task.krate) {
TestResult::BrokenCrate(BrokenReason::Unknown)
} else {
TestResult::Error
};

for err in e.iter_chain() {
if let Some(&OverrideResult(res)) = err.downcast_ctx() {
result = res;
break;
}
}

graph
.lock()
.unwrap()
.mark_as_failed(id, ex, db, &state, &config, &e, result)?;
} else {
graph.lock().unwrap().mark_as_completed(id);
}

// Unpark all the threads
let mut parked = parked_threads.lock().unwrap();
for (_id, thread) in parked.drain() {
thread.unpark();
}
}
WalkResult::Blocked => {
// Wait until another thread finished before looking for tasks again
// If the thread spuriously wake up (parking does not guarantee no
// spurious wakeups) it's not a big deal, it will just get parked again
{
let mut parked_threads = parked_threads.lock().unwrap();
let current = thread::current();
parked_threads.insert(current.id(), current);
}
thread::park();
}
WalkResult::NotBlocked => unreachable!("NotBlocked leaked from the run"),
WalkResult::Finished => break,
}
}

Ok(())
})?;
for worker in &workers {
let join = scope
.builder()
.name(worker.name().into())
.spawn(move || worker.run())?;
threads.push(join);
}
let disk_watcher_thread = scope
.builder()
.name("disk-space-watcher".into())
.spawn(|| disk_watcher.run())?;

let mut clean_exit = true;
for thread in threads.drain(..) {
match thread.join() {
Ok(Ok(())) => {}
Ok(Err(err)) => {
crate::utils::report_failure(&err);
clean_exit = false;
}
Err(panic) => {
crate::utils::report_panic(&panic);
clean_exit = false;
}
}
}
let clean_exit = join_threads(threads.drain(..));
disk_watcher.stop();
let disk_watcher_clean_exit = join_threads(std::iter::once(disk_watcher_thread));

if clean_exit {
if clean_exit && disk_watcher_clean_exit {
Ok(())
} else {
bail!("some threads returned an error");
Expand All @@ -195,6 +162,27 @@ fn run_ex_inner<DB: WriteResults + Sync>(
Ok(())
}

fn join_threads<'a, I>(iter: I) -> bool
where
I: Iterator<Item = ScopedJoinHandle<'a, Fallible<()>>>,
{
let mut clean_exit = true;
for thread in iter {
match thread.join() {
Ok(Ok(())) => {}
Ok(Err(err)) => {
crate::utils::report_failure(&err);
clean_exit = false;
}
Err(panic) => {
crate::utils::report_panic(&panic);
clean_exit = false;
}
}
}
clean_exit
}

pub fn dump_dot(ex: &Experiment, crates: &[Crate], config: &Config, dest: &Path) -> Fallible<()> {
info!("computing the tasks graph...");
let graph = build_graph(&ex, crates, config);
Expand Down

0 comments on commit 3347bbb

Please sign in to comment.