From a045e63639ae34feaa63fe70850e2d8468335eba Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Wed, 18 Apr 2012 23:49:20 -0700 Subject: [PATCH] std: get_monitor_task_gl() is global_loop::get() default --- src/libstd/uv_global_loop.rs | 107 +++++++++++++++++++++++++++++++++-- src/libstd/uv_hl.rs | 17 ++++-- src/rt/rust_uv.cpp | 5 ++ src/rt/rustrt.def.in | 1 + 4 files changed, 120 insertions(+), 10 deletions(-) diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs index b220ba9fe19ef..e710ef1fc1306 100644 --- a/src/libstd/uv_global_loop.rs +++ b/src/libstd/uv_global_loop.rs @@ -1,15 +1,16 @@ #[doc=" -Process-wide, lazily started/stopped libuv event loop interaction. +A process-wide libuv event loop for library use. "]; import ll = uv_ll; import hl = uv_hl; import get_gl = get; -export get, get_single_task_gl; +export get, get_single_task_gl, get_monitor_task_gl; native mod rustrt { fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t; + fn rust_uv_get_kernel_monitor_global_chan_ptr() -> *libc::uintptr_t; fn rust_uv_get_kernel_global_async_handle() -> *libc::uintptr_t; fn rust_compare_and_swap_ptr(address: *libc::uintptr_t, oldval: libc::uintptr_t, @@ -20,13 +21,34 @@ native mod rustrt { Race-free helper to get access to a global task where a libuv loop is running. +Use `uv::hl::interact`, `uv::hl::ref_handle` and `uv::hl::unref_handle` to +do operations against the global loop that this function returns. + # Return * A `hl::high_level_loop` that encapsulates communication with the global loop. "] fn get() -> hl::high_level_loop { - ret get_single_task_gl(); + ret get_monitor_task_gl(); +} + +#[doc(hidden)] +fn get_monitor_task_gl() -> hl::high_level_loop { + let monitor_loop_chan = + rustrt::rust_uv_get_kernel_monitor_global_chan_ptr(); + ret spawn_global_weak_task( + monitor_loop_chan, + {|weak_exit_po, msg_po, loop_ptr, first_msg| + log(debug, "monitor gl: entering inner loop"); + unsafe { + monitor_task_loop_body(weak_exit_po, msg_po, loop_ptr, + copy(first_msg)) + } + }, + {|msg_ch| + hl::monitor_task_loop({op_chan: msg_ch}) + }); } #[doc(hidden)] @@ -35,7 +57,7 @@ fn get_single_task_gl() -> hl::high_level_loop { ret spawn_global_weak_task( global_loop_chan_ptr, {|weak_exit_po, msg_po, loop_ptr, first_msg| - log(debug, "about to enter inner loop"); + log(debug, "single-task gl: about to enter inner loop"); unsafe { single_task_loop_body(weak_exit_po, msg_po, loop_ptr, copy(first_msg)) @@ -135,6 +157,83 @@ unsafe fn outer_global_loop_body( ll::loop_delete(loop_ptr); } + +unsafe fn monitor_task_loop_body(weak_exit_po_in: comm::port<()>, + msg_po_in: comm::port, + loop_ptr: *libc::c_void, + -first_interaction: hl::high_level_msg) -> bool { + // resend the msg to be handled in the select2 loop below.. + comm::send(comm::chan(msg_po_in), first_interaction); + + // our async_handle + let async_handle_po = comm::port::<*ll::uv_async_t>(); + let async_handle_ch = comm::chan(async_handle_po); + + // the msg_po that libuv will be receiving on.. + let loop_msg_po = comm::port::(); + let loop_msg_po_ptr = ptr::addr_of(loop_msg_po); + let loop_msg_ch = comm::chan(loop_msg_po); + + // the question of whether unsupervising this will even do any + // good is there.. but since this'll go into blocking in libuv with + // a quickness.. any errors that occur (including inside crust) will + // be segfaults.. so yeah. + task::spawn_sched(task::manual_threads(1u)) {|| + let loop_msg_po_in = *loop_msg_po_ptr; + hl::run_high_level_loop( + loop_ptr, + loop_msg_po_in, // here the loop gets handed a different message + // port, as we'll be receiving all of the messages + // initially and then passing them on.. + // before_run + {|async_handle| + log(debug,#fmt("monitor gl: before_run: async_handle %?", + async_handle)); + // when this is ran, our async_handle is set up, so let's + // do an async_send with it.. letting the loop know, once it + // starts, that is has work + ll::async_send(async_handle); + comm::send(async_handle_ch, copy(async_handle)); + }, + // before_msg_drain + {|async_handle| + log(debug,#fmt("monitor gl: b4_msg_drain: async_handle %?", + async_handle)); + true + }, + // before_tear_down + {|async_handle| + log(debug,#fmt("monitor gl: b4_tear_down: async_handle %?", + async_handle)); + }); + }; + + // our loop is set up, so let's emit the handle back out to our users.. + let async_handle = comm::recv(async_handle_po); + // supposed to return a bool to indicate to the enclosing loop whether + // it should continue or not.. + let mut continue_inner_loop = true; + let mut didnt_get_hl_bailout = true; + while continue_inner_loop { + log(debug, "monitor task inner loop.. about to block on select2"); + continue_inner_loop = either::either( + {|left_val| + // bail out.. + log(debug, #fmt("monitor inner weak_exit_po recv'd msg: %?", + left_val)); + // TODO: make loop bail out + didnt_get_hl_bailout = false; + false + }, {|right_val| + // wake up our inner loop and pass it a msg.. + comm::send(loop_msg_ch, copy(right_val)); + ll::async_send(async_handle); + true + }, comm::select2(weak_exit_po_in, msg_po_in) + ) + } + didnt_get_hl_bailout +} unsafe fn single_task_loop_body(weak_exit_po_in: comm::port<()>, msg_po_in: comm::port, diff --git a/src/libstd/uv_hl.rs b/src/libstd/uv_hl.rs index b235cdd5e07c8..7eaf52ad96f5a 100644 --- a/src/libstd/uv_hl.rs +++ b/src/libstd/uv_hl.rs @@ -193,12 +193,17 @@ unsafe fn send_high_level_msg(hl_loop: high_level_loop, // the loop isn't active, so we don't need to wake it up, // (the loop's enclosing task should be blocking on a message // receive on this port) - if (*(hl_loop.async_handle()) != 0 as *ll::uv_async_t) { - log(debug,"global async handle != 0, waking up loop.."); - ll::async_send(*(hl_loop.async_handle())); - } - else { - log(debug,"GLOBAL ASYNC handle == 0"); + alt hl_loop { + single_task_loop({async_handle, op_chan}) { + if ((*async_handle) != 0 as *ll::uv_async_t) { + log(debug,"global async handle != 0, waking up loop.."); + ll::async_send((*async_handle)); + } + else { + log(debug,"GLOBAL ASYNC handle == 0"); + } + } + _ {} } } diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index a0184f78e5f01..199a1aeb55e96 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -446,6 +446,11 @@ rust_uv_get_kernel_global_chan_ptr() { return result; } +extern "C" uintptr_t* +rust_uv_get_kernel_monitor_global_chan_ptr() { + return rust_uv_get_kernel_global_chan_ptr(); +} + extern "C" uintptr_t* rust_uv_get_kernel_global_async_handle() { return rust_get_current_task()->kernel->get_global_async_handle(); diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 029e13e2ca0d6..545c3581425e7 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -138,6 +138,7 @@ rust_uv_set_data_for_req rust_uv_get_base_from_buf rust_uv_get_len_from_buf rust_uv_get_kernel_global_chan_ptr +rust_uv_get_kernel_monitor_global_chan_ptr rust_uv_get_kernel_global_async_handle rust_dbg_lock_create rust_dbg_lock_destroy