Skip to content

Commit

Permalink
Homed UDP sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
anasazi committed Aug 19, 2013
1 parent d7b6fcb commit d09412a
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/libstd/rt/rtio.rs
Expand Up @@ -22,7 +22,7 @@ pub type RemoteCallbackObject = uvio::UvRemoteCallback;
pub type IoFactoryObject = uvio::UvIoFactory;
pub type RtioTcpStreamObject = uvio::UvTcpStream;
pub type RtioTcpListenerObject = uvio::UvTcpListener;
pub type RtioUdpSocketObject = uvio::UvUdpSocket;
pub type RtioUdpSocketObject = uvio::HomedUvUdpSocket; //uvio::UvUdpSocket;
pub type RtioTimerObject = uvio::UvTimer;

pub trait EventLoop {
Expand Down
235 changes: 212 additions & 23 deletions src/libstd/rt/uv/uvio.rs
Expand Up @@ -239,27 +239,6 @@ impl UvIoFactory {
pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
match self { &UvIoFactory(ref mut ptr) => ptr }
}

pub fn homed_udp_bind(&mut self, addr: SocketAddr) -> Result<~HomedUvUdpSocket, IoError> {
let mut watcher = UdpWatcher::new(self.uv_loop());
match watcher.bind(addr) {
Ok(_) => {
let home = do Local::borrow::<Scheduler, SchedHandle> |sched| {sched.make_handle()};
Ok(~HomedUvUdpSocket { watcher: watcher, home: home })
}
Err(uverr) => {
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do watcher.close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
Err(uv_error_to_io_error(uverr))
}
}
}
}

impl IoFactory for UvIoFactory {
Expand Down Expand Up @@ -331,6 +310,7 @@ impl IoFactory for UvIoFactory {
}
}

/*
fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
let mut watcher = UdpWatcher::new(self.uv_loop());
match watcher.bind(addr) {
Expand All @@ -348,6 +328,28 @@ impl IoFactory for UvIoFactory {
}
}
}
*/

pub fn /*homed_*/udp_bind(&mut self, addr: SocketAddr) -> Result<~/*HomedUvUdpSocket*/RtioUdpSocketObject, IoError> {
let mut watcher = UdpWatcher::new(self.uv_loop());
match watcher.bind(addr) {
Ok(_) => {
let home = do Local::borrow::<Scheduler, SchedHandle> |sched| {sched.make_handle()};
Ok(~HomedUvUdpSocket { watcher: watcher, home: home })
}
Err(uverr) => {
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do watcher.close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
Err(uv_error_to_io_error(uverr))
}
}
}

fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
Ok(~UvTimer(TimerWatcher::new(self.uv_loop())))
Expand Down Expand Up @@ -640,18 +642,205 @@ impl Drop for HomedUvUdpSocket {

impl RtioSocket for HomedUvUdpSocket {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
// first go home
self.go_home();
socket_name(Udp, self.watcher)
}
}

impl RtioUdpSocket for HomedUvUdpSocket {
fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
// first go home
self.go_home();

let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;

let scheduler = Local::take::<Scheduler>();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
rtdebug!("recvfrom: entered scheduler context");
let task_cell = Cell::new(task);
let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
let _ = flags; // /XXX add handling for partials?

watcher.recv_stop();

let result = match status {
None => {
assert!(nread >= 0);
Ok((nread as uint, addr))
}
Some(err) => Err(uv_error_to_io_error(err)),
};

unsafe { (*result_cell_ptr).put_back(result); }

let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}

assert!(!result_cell.is_empty());
return result_cell.take();
}

fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
// first go home
self.go_home();

let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
do self.watcher.send(buf, dst) |_watcher, status| {

let result = match status {
None => Ok(()),
Some(err) => Err(uv_error_to_io_error(err)),
};

unsafe { (*result_cell_ptr).put_back(result); }

let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}

assert!(!result_cell.is_empty());
return result_cell.take();
}

fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
// first go home
self.go_home();

let r = unsafe {
do multi.to_str().as_c_str |m_addr| {
uvll::udp_set_membership(self.watcher.native_handle(), m_addr,
ptr::null(), uvll::UV_JOIN_GROUP)
}
};

match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}

fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
// first go home
self.go_home();

let r = unsafe {
do multi.to_str().as_c_str |m_addr| {
uvll::udp_set_membership(self.watcher.native_handle(), m_addr,
ptr::null(), uvll::UV_LEAVE_GROUP)
}
};

match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}

fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
// first go home
self.go_home();

let r = unsafe {
uvll::udp_set_multicast_loop(self.watcher.native_handle(), 1 as c_int)
};

match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}

fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
// first go home
self.go_home();

let r = unsafe {
uvll::udp_set_multicast_loop(self.watcher.native_handle(), 0 as c_int)
};

match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}

fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
// first go home
self.go_home();

let r = unsafe {
uvll::udp_set_multicast_ttl(self.watcher.native_handle(), ttl as c_int)
};

match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}

fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
// first go home
self.go_home();

let r = unsafe {
uvll::udp_set_ttl(self.watcher.native_handle(), ttl as c_int)
};

match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}

fn hear_broadcasts(&mut self) -> Result<(), IoError> {
// first go home
self.go_home();

let r = unsafe {
uvll::udp_set_broadcast(self.watcher.native_handle(), 1 as c_int)
};

match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}

fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
// first go home
self.go_home();

let r = unsafe {
uvll::udp_set_broadcast(self.watcher.native_handle(), 0 as c_int)
};

match status_to_maybe_uv_error(self.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
}
}

#[test]
fn test_simple_homed_udp_io_bind_only() {
do run_in_newsched_task {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let addr = next_test_ip4();
let maybe_socket = (*io).homed_udp_bind(addr);
let maybe_socket = (*io)./*homed_*/udp_bind(addr);
assert!(maybe_socket.is_ok());
}
}
Expand Down Expand Up @@ -688,7 +877,7 @@ fn test_simple_homed_udp_io_bind_then_move_then_home_and_close() {
let test_function: ~fn() = || {
let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
let addr = next_test_ip4();
let maybe_socket = unsafe { (*io).homed_udp_bind(addr) };
let maybe_socket = unsafe { (*io)./*homed_*/udp_bind(addr) };
// this socket is bound to this event loop
assert!(maybe_socket.is_ok());

Expand Down

0 comments on commit d09412a

Please sign in to comment.