Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: Bind to shutdown() for TCP streams #12855

Merged
merged 1 commit into from Mar 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/libnative/io/net.rs
Expand Up @@ -351,6 +351,11 @@ impl rtio::RtioTcpStream for TcpStream {
fn clone(&self) -> ~rtio::RtioTcpStream {
~TcpStream { inner: self.inner.clone() } as ~rtio::RtioTcpStream
}
fn close_write(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe {
libc::shutdown(self.fd(), libc::SHUT_WR)
})
}
}

impl rtio::RtioSocket for TcpStream {
Expand Down
32 changes: 32 additions & 0 deletions src/librustuv/net.rs
Expand Up @@ -305,6 +305,38 @@ impl rtio::RtioTcpStream for TcpWatcher {
read_access: self.read_access.clone(),
} as ~rtio::RtioTcpStream
}

fn close_write(&mut self) -> Result<(), IoError> {
struct Ctx {
slot: Option<BlockedTask>,
status: c_int,
}
let mut req = Request::new(uvll::UV_SHUTDOWN);

return match unsafe {
uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
} {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { slot: None, status: 0 };

wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
req.set_data(&cx);
});

status_to_io_result(cx.status)
}
n => Err(uv_error_to_io_error(UvError(n)))
};

extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.status = status;
wakeup(&mut cx.slot);
}
}
}

impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
Expand Down
4 changes: 4 additions & 0 deletions src/librustuv/uvll.rs
Expand Up @@ -157,6 +157,7 @@ pub type uv_process_t = c_void;
pub type uv_pipe_t = c_void;
pub type uv_tty_t = c_void;
pub type uv_signal_t = c_void;
pub type uv_shutdown_t = c_void;

pub struct uv_timespec_t {
tv_sec: libc::c_long,
Expand Down Expand Up @@ -248,6 +249,7 @@ pub type uv_exit_cb = extern "C" fn(handle: *uv_process_t,
pub type uv_signal_cb = extern "C" fn(handle: *uv_signal_t,
signum: c_int);
pub type uv_fs_cb = extern "C" fn(req: *uv_fs_t);
pub type uv_shutdown_cb = extern "C" fn(req: *uv_shutdown_t, status: c_int);

#[cfg(unix)] pub type uv_uid_t = libc::types::os::arch::posix88::uid_t;
#[cfg(unix)] pub type uv_gid_t = libc::types::os::arch::posix88::gid_t;
Expand Down Expand Up @@ -539,6 +541,8 @@ extern {
on_alloc: uv_alloc_cb,
on_read: uv_read_cb) -> c_int;
pub fn uv_read_stop(stream: *uv_stream_t) -> c_int;
pub fn uv_shutdown(req: *uv_shutdown_t, handle: *uv_stream_t,
cb: uv_shutdown_cb) -> c_int;

// idle bindings
pub fn uv_idle_init(l: *uv_loop_t, i: *uv_idle_t) -> c_int;
Expand Down
18 changes: 18 additions & 0 deletions src/libstd/io/net/tcp.rs
Expand Up @@ -751,5 +751,23 @@ mod test {

p.recv();
})

iotest!(fn shutdown_smoke() {
use rt::rtio::RtioTcpStream;

let addr = next_test_ip4();
let a = TcpListener::bind(addr).unwrap().listen();
spawn(proc() {
let mut a = a;
let mut c = a.accept().unwrap();
assert_eq!(c.read_to_end(), Ok(~[]));
c.write([1]).unwrap();
});

let mut s = TcpStream::connect(addr).unwrap();
assert!(s.obj.close_write().is_ok());
assert!(s.write([1]).is_err());
assert_eq!(s.read_to_end(), Ok(~[1]));
})
}

18 changes: 18 additions & 0 deletions src/libstd/libc.rs
Expand Up @@ -1611,6 +1611,10 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 8;
pub static SO_BROADCAST: c_int = 32;
pub static SO_REUSEADDR: c_int = 4;

pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
pub static SHUT_RDWR: c_int = 2;
}
pub mod extra {
use libc::types::os::arch::c95::c_int;
Expand Down Expand Up @@ -2391,6 +2395,10 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 9;
pub static SO_BROADCAST: c_int = 6;
pub static SO_REUSEADDR: c_int = 2;

pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
pub static SHUT_RDWR: c_int = 2;
}
#[cfg(target_arch = "x86")]
#[cfg(target_arch = "x86_64")]
Expand Down Expand Up @@ -2842,6 +2850,10 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 0x0008;
pub static SO_BROADCAST: c_int = 0x0020;
pub static SO_REUSEADDR: c_int = 0x0004;

pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
pub static SHUT_RDWR: c_int = 2;
}
pub mod extra {
use libc::types::os::arch::c95::c_int;
Expand Down Expand Up @@ -3221,6 +3233,10 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 0x0008;
pub static SO_BROADCAST: c_int = 0x0020;
pub static SO_REUSEADDR: c_int = 0x0004;

pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
pub static SHUT_RDWR: c_int = 2;
}
pub mod extra {
use libc::types::os::arch::c95::c_int;
Expand Down Expand Up @@ -3939,6 +3955,7 @@ pub mod funcs {
pub fn sendto(socket: c_int, buf: *c_void, len: size_t,
flags: c_int, addr: *sockaddr,
addrlen: socklen_t) -> ssize_t;
pub fn shutdown(socket: c_int, how: c_int) -> c_int;
}
}

Expand Down Expand Up @@ -3975,6 +3992,7 @@ pub mod funcs {
pub fn sendto(socket: SOCKET, buf: *c_void, len: c_int,
flags: c_int, addr: *sockaddr,
addrlen: c_int) -> c_int;
pub fn shutdown(socket: SOCKET, how: c_int) -> c_int;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/libstd/rt/rtio.rs
Expand Up @@ -206,6 +206,7 @@ pub trait RtioTcpStream : RtioSocket {
fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError>;
fn letdie(&mut self) -> Result<(), IoError>;
fn clone(&self) -> ~RtioTcpStream;
fn close_write(&mut self) -> Result<(), IoError>;
}

pub trait RtioSocket {
Expand Down