Skip to content

Commit

Permalink
async-llvm(6): Make the LLVM work coordinator get its work package th…
Browse files Browse the repository at this point in the history
…rough a channel instead of upfront.
  • Loading branch information
michaelwoerister committed Jul 31, 2017
1 parent b18a61a commit 8f6894e
Showing 1 changed file with 53 additions and 28 deletions.
81 changes: 53 additions & 28 deletions src/librustc_trans/back/write.rs
Expand Up @@ -780,19 +780,31 @@ pub fn run_passes(sess: &Session,

let (shared_emitter, shared_emitter_main) = SharedEmitter::new();
let (trans_worker_send, trans_worker_receive) = channel();
let (coordinator_send, coordinator_receive) = channel();

let coordinator_thread = start_executing_work(sess,
work_items,
work_items.len(),
shared_emitter,
trans_worker_send,
coordinator_send.clone(),
coordinator_receive,
client,
trans.exported_symbols.clone());
for work_item in work_items {
coordinator_send.send(Message::WorkItem(work_item)).unwrap();
}

loop {
shared_emitter_main.check(sess);

match trans_worker_receive.recv() {
Ok(Message::AllWorkDone) |
Err(_) => break,
Err(_) => {
// An `Err` here means that all senders for this channel have
// been closed. This could happen because all work has
// completed successfully or there has been some error.
// At this point we don't care which it is.
break
}

Ok(Message::CheckErrorMessages) => continue,
Ok(msg) => {
Expand All @@ -801,9 +813,15 @@ pub fn run_passes(sess: &Session,
}
}

coordinator_thread.join().unwrap();
match coordinator_thread.join() {
Ok(()) => {},
Err(err) => {
panic!("error: {:?}", err);
}
}

// Just in case, check this on the way out.
shared_emitter_main.check(sess);
sess.diagnostic().abort_if_errors();

// If in incr. comp. mode, preserve the `.o` files for potential re-use
Expand Down Expand Up @@ -1080,7 +1098,6 @@ pub enum Message {
Done { success: bool },
WorkItem(WorkItem),
CheckErrorMessages,
AllWorkDone,
}


Expand All @@ -1091,15 +1108,14 @@ pub struct Diagnostic {
}

fn start_executing_work(sess: &Session,
mut work_items: Vec<WorkItem>,
total_work_item_count: usize,
shared_emitter: SharedEmitter,
trans_worker_send: Sender<Message>,
coordinator_send: Sender<Message>,
coordinator_receive: Receiver<Message>,
jobserver: Client,
exported_symbols: Arc<ExportedSymbols>)
-> thread::JoinHandle<()> {
let (tx, rx) = channel();
let tx2 = tx.clone();

-> thread::JoinHandle<()> {
// First up, convert our jobserver into a helper thread so we can use normal
// mpsc channels to manage our messages and such. Once we've got the helper
// thread then request `n-1` tokens because all of our work items are ready
Expand All @@ -1110,10 +1126,11 @@ fn start_executing_work(sess: &Session,
//
// After we've requested all these tokens then we'll, when we can, get
// tokens on `rx` above which will get managed in the main loop below.
let coordinator_send2 = coordinator_send.clone();
let helper = jobserver.into_helper_thread(move |token| {
drop(tx2.send(Message::Token(token)));
drop(coordinator_send2.send(Message::Token(token)));
}).expect("failed to spawn helper thread");
for _ in 0..work_items.len() - 1 {
for _ in 0..total_work_item_count - 1 {
helper.request_token();
}

Expand All @@ -1137,7 +1154,7 @@ fn start_executing_work(sess: &Session,
remark: sess.opts.cg.remark.clone(),
worker: 0,
incr_comp_session_dir: sess.incr_comp_session_dir_opt().map(|r| r.clone()),
coordinator_send: tx.clone(),
coordinator_send: coordinator_send,
diag_emitter: shared_emitter.clone(),
};

Expand Down Expand Up @@ -1198,36 +1215,46 @@ fn start_executing_work(sess: &Session,
// the jobserver.

thread::spawn(move || {
let mut work_items_left = total_work_item_count;
let mut work_items = Vec::with_capacity(total_work_item_count);
let mut tokens = Vec::new();
let mut running = 0;
while work_items.len() > 0 || running > 0 {
while work_items_left > 0 || running > 0 {

// Spin up what work we can, only doing this while we've got available
// parallelism slots and work left to spawn.
while work_items.len() > 0 && running < tokens.len() + 1 {
let item = work_items.pop().unwrap();
let worker_index = work_items.len();

let cgcx = CodegenContext {
worker: worker_index,
.. cgcx.clone()
};

spawn_work(cgcx, item);
running += 1;
while work_items_left > 0 && running < tokens.len() + 1 {
if let Some(item) = work_items.pop() {
work_items_left -= 1;
let worker_index = work_items_left;

let cgcx = CodegenContext {
worker: worker_index,
.. cgcx.clone()
};

spawn_work(cgcx, item);
running += 1;
} else {
break
}
}

// Relinquish accidentally acquired extra tokens
tokens.truncate(running.saturating_sub(1));

match rx.recv().unwrap() {
match coordinator_receive.recv().unwrap() {
// Save the token locally and the next turn of the loop will use
// this to spawn a new unit of work, or it may get dropped
// immediately if we have no more work to spawn.
Message::Token(token) => {
tokens.push(token.expect("failed to acquire jobserver token"));
}

Message::WorkItem(work_item) => {
work_items.push(work_item);
}

// If a thread exits successfully then we drop a token associated
// with that worker and update our `running` count. We may later
// re-acquire a token to continue running more work. We may also not
Expand All @@ -1245,8 +1272,6 @@ fn start_executing_work(sess: &Session,
shared_emitter.fatal("aborting due to worker thread panic".to_string());
trans_worker_send.send(Message::CheckErrorMessages).unwrap();
}
msg @ Message::WorkItem(_) |
msg @ Message::AllWorkDone |
msg @ Message::CheckErrorMessages => {
bug!("unexpected message: {:?}", msg);
}
Expand Down

0 comments on commit 8f6894e

Please sign in to comment.