Skip to content

Commit

Permalink
io: Bind to shutdown() for TCP streams
Browse files Browse the repository at this point in the history
This is something that is plausibly useful, and is provided by libuv. This is
not currently surfaced as part of the `TcpStream` type, but it may possibly
appear in the future. For now only the raw functionality is provided through the
Rtio objects.
  • Loading branch information
alexcrichton committed Mar 13, 2014
1 parent 3316a0e commit a63deeb
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/libnative/io/net.rs
Expand Up @@ -351,6 +351,11 @@ impl rtio::RtioTcpStream for TcpStream {
fn clone(&self) -> ~rtio::RtioTcpStream {
~TcpStream { inner: self.inner.clone() } as ~rtio::RtioTcpStream
}
fn close_write(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe {
libc::shutdown(self.fd(), libc::SHUT_WR)
})
}
}

impl rtio::RtioSocket for TcpStream {
Expand Down
32 changes: 32 additions & 0 deletions src/librustuv/net.rs
Expand Up @@ -305,6 +305,38 @@ impl rtio::RtioTcpStream for TcpWatcher {
read_access: self.read_access.clone(),
} as ~rtio::RtioTcpStream
}

fn close_write(&mut self) -> Result<(), IoError> {
struct Ctx {
slot: Option<BlockedTask>,
status: c_int,
}
let mut req = Request::new(uvll::UV_SHUTDOWN);

return match unsafe {
uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
} {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { slot: None, status: 0 };

wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
req.set_data(&cx);
});

status_to_io_result(cx.status)
}
n => Err(uv_error_to_io_error(UvError(n)))
};

extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.status = status;
wakeup(&mut cx.slot);
}
}
}

impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
Expand Down
4 changes: 4 additions & 0 deletions src/librustuv/uvll.rs
Expand Up @@ -157,6 +157,7 @@ pub type uv_process_t = c_void;
pub type uv_pipe_t = c_void;
pub type uv_tty_t = c_void;
pub type uv_signal_t = c_void;
pub type uv_shutdown_t = c_void;

pub struct uv_timespec_t {
tv_sec: libc::c_long,
Expand Down Expand Up @@ -248,6 +249,7 @@ pub type uv_exit_cb = extern "C" fn(handle: *uv_process_t,
pub type uv_signal_cb = extern "C" fn(handle: *uv_signal_t,
signum: c_int);
pub type uv_fs_cb = extern "C" fn(req: *uv_fs_t);
pub type uv_shutdown_cb = extern "C" fn(req: *uv_shutdown_t, status: c_int);

#[cfg(unix)] pub type uv_uid_t = libc::types::os::arch::posix88::uid_t;
#[cfg(unix)] pub type uv_gid_t = libc::types::os::arch::posix88::gid_t;
Expand Down Expand Up @@ -539,6 +541,8 @@ extern {
on_alloc: uv_alloc_cb,
on_read: uv_read_cb) -> c_int;
pub fn uv_read_stop(stream: *uv_stream_t) -> c_int;
pub fn uv_shutdown(req: *uv_shutdown_t, handle: *uv_stream_t,
cb: uv_shutdown_cb) -> c_int;

// idle bindings
pub fn uv_idle_init(l: *uv_loop_t, i: *uv_idle_t) -> c_int;
Expand Down
18 changes: 18 additions & 0 deletions src/libstd/io/net/tcp.rs
Expand Up @@ -751,5 +751,23 @@ mod test {

p.recv();
})

iotest!(fn shutdown_smoke() {
use rt::rtio::RtioTcpStream;

let addr = next_test_ip4();
let a = TcpListener::bind(addr).unwrap().listen();
spawn(proc() {
let mut a = a;
let mut c = a.accept().unwrap();
assert_eq!(c.read_to_end(), Ok(~[]));
c.write([1]).unwrap();
});

let mut s = TcpStream::connect(addr).unwrap();
assert!(s.obj.close_write().is_ok());
assert!(s.write([1]).is_err());
assert_eq!(s.read_to_end(), Ok(~[1]));
})
}

18 changes: 18 additions & 0 deletions src/libstd/libc.rs
Expand Up @@ -1611,6 +1611,10 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 8;
pub static SO_BROADCAST: c_int = 32;
pub static SO_REUSEADDR: c_int = 4;

pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
pub static SHUT_RDWR: c_int = 2;
}
pub mod extra {
use libc::types::os::arch::c95::c_int;
Expand Down Expand Up @@ -2391,6 +2395,10 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 9;
pub static SO_BROADCAST: c_int = 6;
pub static SO_REUSEADDR: c_int = 2;

pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
pub static SHUT_RDWR: c_int = 2;
}
#[cfg(target_arch = "x86")]
#[cfg(target_arch = "x86_64")]
Expand Down Expand Up @@ -2842,6 +2850,10 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 0x0008;
pub static SO_BROADCAST: c_int = 0x0020;
pub static SO_REUSEADDR: c_int = 0x0004;

pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
pub static SHUT_RDWR: c_int = 2;
}
pub mod extra {
use libc::types::os::arch::c95::c_int;
Expand Down Expand Up @@ -3221,6 +3233,10 @@ pub mod consts {
pub static SO_KEEPALIVE: c_int = 0x0008;
pub static SO_BROADCAST: c_int = 0x0020;
pub static SO_REUSEADDR: c_int = 0x0004;

pub static SHUT_RD: c_int = 0;
pub static SHUT_WR: c_int = 1;
pub static SHUT_RDWR: c_int = 2;
}
pub mod extra {
use libc::types::os::arch::c95::c_int;
Expand Down Expand Up @@ -3939,6 +3955,7 @@ pub mod funcs {
pub fn sendto(socket: c_int, buf: *c_void, len: size_t,
flags: c_int, addr: *sockaddr,
addrlen: socklen_t) -> ssize_t;
pub fn shutdown(socket: c_int, how: c_int) -> c_int;
}
}

Expand Down Expand Up @@ -3975,6 +3992,7 @@ pub mod funcs {
pub fn sendto(socket: SOCKET, buf: *c_void, len: c_int,
flags: c_int, addr: *sockaddr,
addrlen: c_int) -> c_int;
pub fn shutdown(socket: SOCKET, how: c_int) -> c_int;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/libstd/rt/rtio.rs
Expand Up @@ -206,6 +206,7 @@ pub trait RtioTcpStream : RtioSocket {
fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError>;
fn letdie(&mut self) -> Result<(), IoError>;
fn clone(&self) -> ~RtioTcpStream;
fn close_write(&mut self) -> Result<(), IoError>;
}

pub trait RtioSocket {
Expand Down

5 comments on commit a63deeb

@bors
Copy link
Contributor

@bors bors commented on a63deeb Mar 14, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw approval from brson
at alexcrichton@a63deeb

@bors
Copy link
Contributor

@bors bors commented on a63deeb Mar 14, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging alexcrichton/rust/shutdown = a63deeb into auto

@bors
Copy link
Contributor

@bors bors commented on a63deeb Mar 14, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alexcrichton/rust/shutdown = a63deeb merged ok, testing candidate = 4443fb3

@bors
Copy link
Contributor

@bors bors commented on a63deeb Mar 14, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fast-forwarding master to auto = 4443fb3

Please sign in to comment.