Skip to content

Commit

Permalink
Working homing UDP socket prototype.
Browse files Browse the repository at this point in the history
  • Loading branch information
anasazi committed Aug 19, 2013
1 parent 88f7183 commit d7b6fcb
Showing 1 changed file with 151 additions and 1 deletion.
152 changes: 151 additions & 1 deletion src/libstd/rt/uv/uvio.rs
Expand Up @@ -23,7 +23,7 @@ use rt::io::net::ip::{SocketAddr, IpAddr};
use rt::io::{standard_error, OtherIoError};
use rt::local::Local;
use rt::rtio::*;
use rt::sched::Scheduler;
use rt::sched::{Scheduler, SchedHandle};
use rt::tube::Tube;
use rt::uv::*;
use rt::uv::idle::IdleWatcher;
Expand Down Expand Up @@ -239,6 +239,27 @@ impl UvIoFactory {
pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
match self { &UvIoFactory(ref mut ptr) => ptr }
}

pub fn homed_udp_bind(&mut self, addr: SocketAddr) -> Result<~HomedUvUdpSocket, IoError> {
let mut watcher = UdpWatcher::new(self.uv_loop());
match watcher.bind(addr) {
Ok(_) => {
let home = do Local::borrow::<Scheduler, SchedHandle> |sched| {sched.make_handle()};
Ok(~HomedUvUdpSocket { watcher: watcher, home: home })
}
Err(uverr) => {
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do watcher.close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
Err(uv_error_to_io_error(uverr))
}
}
}
}

impl IoFactory for UvIoFactory {
Expand Down Expand Up @@ -582,6 +603,135 @@ impl RtioTcpStream for UvTcpStream {
}
}

pub struct HomedUvUdpSocket {
watcher: UdpWatcher,
home: SchedHandle,
}

impl HomedUvUdpSocket {
fn go_home(&mut self) {
use rt::sched::PinnedTask;
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
do task.wake().map_move |task| { self.home.send(PinnedTask(task)); };
}
}
}

impl Drop for HomedUvUdpSocket {
fn drop(&self) {
rtdebug!("closing homed udp socket");
// first go home
// XXX need mutable finalizer
let this = unsafe { transmute::<&HomedUvUdpSocket, &mut HomedUvUdpSocket>(self) };
this.go_home();
// now we're home so block the task and start IO
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do this.watcher.close {
// now IO is finished so resume the blocked task
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
}

impl RtioSocket for HomedUvUdpSocket {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
self.go_home();
socket_name(Udp, self.watcher)
}
}

#[test]
fn test_simple_homed_udp_io_bind_only() {
do run_in_newsched_task {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let addr = next_test_ip4();
let maybe_socket = (*io).homed_udp_bind(addr);
assert!(maybe_socket.is_ok());
}
}
}

#[test]
fn test_simple_homed_udp_io_bind_then_move_then_home_and_close() {
use rt::sleeper_list::SleeperList;
use rt::work_queue::WorkQueue;
use rt::thread::Thread;
use rt::task::Task;
use rt::sched::{Shutdown, TaskFromFriend};
do run_in_bare_thread {
let sleepers = SleeperList::new();
let work_queue1 = WorkQueue::new();
let work_queue2 = WorkQueue::new();
let queues = ~[work_queue1.clone(), work_queue2.clone()];

let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(),
sleepers.clone());
let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(),
sleepers.clone());

let handle1 = Cell::new(sched1.make_handle());
let handle2 = Cell::new(sched2.make_handle());
let tasksFriendHandle = Cell::new(sched2.make_handle());

let on_exit: ~fn(bool) = |exit_status| {
handle1.take().send(Shutdown);
handle2.take().send(Shutdown);
rtassert!(exit_status);
};

let test_function: ~fn() = || {
let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
let addr = next_test_ip4();
let maybe_socket = unsafe { (*io).homed_udp_bind(addr) };
// this socket is bound to this event loop
assert!(maybe_socket.is_ok());

// block self on sched1
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
// unblock task
do task.wake().map_move |task| {
// send self to sched2
tasksFriendHandle.take().send(TaskFromFriend(task));
};
// sched1 should now sleep since it has nothing else to do
}
// sched2 will wake up and get the task
// as we do nothing else, the function ends and the socket goes out of scope
// sched2 will start to run the destructor
// the destructor will first block the task, set it's home as sched1, then enqueue it
// sched2 will dequeue the task, see that it has a home, and send it to sched1
// sched1 will wake up, execute the close function on the correct loop, and then we're done
};

let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function);
main_task.death.on_exit = Some(on_exit);
let main_task = Cell::new(main_task);

let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {});

let sched1 = Cell::new(sched1);
let sched2 = Cell::new(sched2);

// XXX could there be a race on the threads that causes a crash?
let thread1 = do Thread::start {
sched1.take().bootstrap(main_task.take());
};
let thread2 = do Thread::start {
sched2.take().bootstrap(null_task.take());
};

thread1.join();
thread2.join();
}
}

pub struct UvUdpSocket(UdpWatcher);

impl Drop for UvUdpSocket {
Expand Down

0 comments on commit d7b6fcb

Please sign in to comment.