From a63deeb3d32fc21f36d484d62a3ea1d3d0c82500 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 12 Mar 2014 17:04:34 -0700 Subject: [PATCH] io: Bind to shutdown() for TCP streams This is something that is plausibly useful, and is provided by libuv. This is not currently surfaced as part of the `TcpStream` type, but it may possibly appear in the future. For now only the raw functionality is provided through the Rtio objects. --- src/libnative/io/net.rs | 5 +++++ src/librustuv/net.rs | 32 ++++++++++++++++++++++++++++++++ src/librustuv/uvll.rs | 4 ++++ src/libstd/io/net/tcp.rs | 18 ++++++++++++++++++ src/libstd/libc.rs | 18 ++++++++++++++++++ src/libstd/rt/rtio.rs | 1 + 6 files changed, 78 insertions(+) diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 7445e4c099261..6a71107294207 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -351,6 +351,11 @@ impl rtio::RtioTcpStream for TcpStream { fn clone(&self) -> ~rtio::RtioTcpStream { ~TcpStream { inner: self.inner.clone() } as ~rtio::RtioTcpStream } + fn close_write(&mut self) -> IoResult<()> { + super::mkerr_libc(unsafe { + libc::shutdown(self.fd(), libc::SHUT_WR) + }) + } } impl rtio::RtioSocket for TcpStream { diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index a091829f297e8..a0eb2be3d4d2f 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -305,6 +305,38 @@ impl rtio::RtioTcpStream for TcpWatcher { read_access: self.read_access.clone(), } as ~rtio::RtioTcpStream } + + fn close_write(&mut self) -> Result<(), IoError> { + struct Ctx { + slot: Option, + status: c_int, + } + let mut req = Request::new(uvll::UV_SHUTDOWN); + + return match unsafe { + uvll::uv_shutdown(req.handle, self.handle, shutdown_cb) + } { + 0 => { + req.defuse(); // uv callback now owns this request + let mut cx = Ctx { slot: None, status: 0 }; + + wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || { + req.set_data(&cx); + }); + + status_to_io_result(cx.status) + } + n => Err(uv_error_to_io_error(UvError(n))) + }; + + extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) { + let req = Request::wrap(req); + assert!(status != uvll::ECANCELED); + let cx: &mut Ctx = unsafe { req.get_data() }; + cx.status = status; + wakeup(&mut cx.slot); + } + } } impl UvHandle for TcpWatcher { diff --git a/src/librustuv/uvll.rs b/src/librustuv/uvll.rs index 039f2e8bc85de..b9b7ed13cc1b1 100644 --- a/src/librustuv/uvll.rs +++ b/src/librustuv/uvll.rs @@ -157,6 +157,7 @@ pub type uv_process_t = c_void; pub type uv_pipe_t = c_void; pub type uv_tty_t = c_void; pub type uv_signal_t = c_void; +pub type uv_shutdown_t = c_void; pub struct uv_timespec_t { tv_sec: libc::c_long, @@ -248,6 +249,7 @@ pub type uv_exit_cb = extern "C" fn(handle: *uv_process_t, pub type uv_signal_cb = extern "C" fn(handle: *uv_signal_t, signum: c_int); pub type uv_fs_cb = extern "C" fn(req: *uv_fs_t); +pub type uv_shutdown_cb = extern "C" fn(req: *uv_shutdown_t, status: c_int); #[cfg(unix)] pub type uv_uid_t = libc::types::os::arch::posix88::uid_t; #[cfg(unix)] pub type uv_gid_t = libc::types::os::arch::posix88::gid_t; @@ -539,6 +541,8 @@ extern { on_alloc: uv_alloc_cb, on_read: uv_read_cb) -> c_int; pub fn uv_read_stop(stream: *uv_stream_t) -> c_int; + pub fn uv_shutdown(req: *uv_shutdown_t, handle: *uv_stream_t, + cb: uv_shutdown_cb) -> c_int; // idle bindings pub fn uv_idle_init(l: *uv_loop_t, i: *uv_idle_t) -> c_int; diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index 53129f3df9b6f..95be3add0db95 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -751,5 +751,23 @@ mod test { p.recv(); }) + + iotest!(fn shutdown_smoke() { + use rt::rtio::RtioTcpStream; + + let addr = next_test_ip4(); + let a = TcpListener::bind(addr).unwrap().listen(); + spawn(proc() { + let mut a = a; + let mut c = a.accept().unwrap(); + assert_eq!(c.read_to_end(), Ok(~[])); + c.write([1]).unwrap(); + }); + + let mut s = TcpStream::connect(addr).unwrap(); + assert!(s.obj.close_write().is_ok()); + assert!(s.write([1]).is_err()); + assert_eq!(s.read_to_end(), Ok(~[1])); + }) } diff --git a/src/libstd/libc.rs b/src/libstd/libc.rs index afd524e9d7afe..585ffebd979b1 100644 --- a/src/libstd/libc.rs +++ b/src/libstd/libc.rs @@ -1611,6 +1611,10 @@ pub mod consts { pub static SO_KEEPALIVE: c_int = 8; pub static SO_BROADCAST: c_int = 32; pub static SO_REUSEADDR: c_int = 4; + + pub static SHUT_RD: c_int = 0; + pub static SHUT_WR: c_int = 1; + pub static SHUT_RDWR: c_int = 2; } pub mod extra { use libc::types::os::arch::c95::c_int; @@ -2391,6 +2395,10 @@ pub mod consts { pub static SO_KEEPALIVE: c_int = 9; pub static SO_BROADCAST: c_int = 6; pub static SO_REUSEADDR: c_int = 2; + + pub static SHUT_RD: c_int = 0; + pub static SHUT_WR: c_int = 1; + pub static SHUT_RDWR: c_int = 2; } #[cfg(target_arch = "x86")] #[cfg(target_arch = "x86_64")] @@ -2842,6 +2850,10 @@ pub mod consts { pub static SO_KEEPALIVE: c_int = 0x0008; pub static SO_BROADCAST: c_int = 0x0020; pub static SO_REUSEADDR: c_int = 0x0004; + + pub static SHUT_RD: c_int = 0; + pub static SHUT_WR: c_int = 1; + pub static SHUT_RDWR: c_int = 2; } pub mod extra { use libc::types::os::arch::c95::c_int; @@ -3221,6 +3233,10 @@ pub mod consts { pub static SO_KEEPALIVE: c_int = 0x0008; pub static SO_BROADCAST: c_int = 0x0020; pub static SO_REUSEADDR: c_int = 0x0004; + + pub static SHUT_RD: c_int = 0; + pub static SHUT_WR: c_int = 1; + pub static SHUT_RDWR: c_int = 2; } pub mod extra { use libc::types::os::arch::c95::c_int; @@ -3939,6 +3955,7 @@ pub mod funcs { pub fn sendto(socket: c_int, buf: *c_void, len: size_t, flags: c_int, addr: *sockaddr, addrlen: socklen_t) -> ssize_t; + pub fn shutdown(socket: c_int, how: c_int) -> c_int; } } @@ -3975,6 +3992,7 @@ pub mod funcs { pub fn sendto(socket: SOCKET, buf: *c_void, len: c_int, flags: c_int, addr: *sockaddr, addrlen: c_int) -> c_int; + pub fn shutdown(socket: SOCKET, how: c_int) -> c_int; } } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index edb480fe4cb33..0dc1a11d267d2 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -206,6 +206,7 @@ pub trait RtioTcpStream : RtioSocket { fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError>; fn letdie(&mut self) -> Result<(), IoError>; fn clone(&self) -> ~RtioTcpStream; + fn close_write(&mut self) -> Result<(), IoError>; } pub trait RtioSocket {