Showing with 286 additions and 31 deletions.
  1. +1 −0 Makefile.am
  2. +22 −0 include/uv.h
  3. +4 −16 src/unix/stream.c
  4. +49 −0 src/unix/udp.c
  5. +33 −0 src/uv-common.c
  6. +8 −0 src/uv-common.h
  7. +2 −2 src/win/pipe.c
  8. +0 −11 src/win/stream-inl.h
  9. +1 −1 src/win/tcp.c
  10. +19 −1 src/win/udp.c
  11. +3 −0 test/test-getsockname.c
  12. +2 −0 test/test-list.h
  13. +3 −0 test/test-udp-multicast-interface.c
  14. +2 −0 test/test-udp-open.c
  15. +3 −0 test/test-udp-send-and-recv.c
  16. +133 −0 test/test-udp-try-send.c
  17. +1 −0 uv.gyp
@@ -214,6 +214,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
test/test-udp-open.c \
test/test-udp-options.c \
test/test-udp-send-and-recv.c \
test/test-udp-try-send.c \
test/test-walk-handles.c \
test/test-watcher-cross-stop.c
test_run_tests_LDADD = libuv.la
@@ -900,6 +900,15 @@ typedef void (*uv_udp_recv_cb)(uv_udp_t* handle,
/* uv_udp_t is a subclass of uv_handle_t */
struct uv_udp_s {
UV_HANDLE_FIELDS
/* read-only */
/* number of bytes queued for sending, ay send
* actually less since udp packets are truncated to the MTU size
*/
size_t send_queue_size;
/* number of send requests currently in the queue awaiting to
* be processed
*/
size_t send_queue_count;
UV_UDP_PRIVATE_FIELDS
};

@@ -1067,6 +1076,19 @@ UV_EXTERN int uv_udp_send(uv_udp_send_t* req,
const struct sockaddr* addr,
uv_udp_send_cb send_cb);

/*
* Same as `uv_udp_send()`, but won't queue a send request if it can't be completed
* immediately.
* Will return either:
* - >= 0: number of bytes written (can be less than the supplied buffer size if the
* packet is truncated)
* - < 0: negative error code (UV_EAGAIN is returned when the message can't be sent
* immediately)
*/
UV_EXTERN int uv_udp_try_send(uv_udp_t* handle,
const uv_buf_t bufs[],
unsigned int nbufs,
const struct sockaddr* addr);
/*
* Receive data. If the socket has not previously been bound with `uv_udp_bind`
* it is bound to 0.0.0.0 (the "all interfaces" address) and a random
@@ -63,18 +63,6 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
static size_t uv__write_req_size(uv_write_t* req);


static size_t uv_count_bufs(const uv_buf_t bufs[], unsigned int nbufs) {
unsigned int i;
size_t bytes;

bytes = 0;
for (i = 0; i < nbufs; i++)
bytes += bufs[i].len;

return bytes;
}


void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type) {
@@ -660,8 +648,8 @@ static size_t uv__write_req_size(uv_write_t* req) {
size_t size;

assert(req->bufs != NULL);
size = uv_count_bufs(req->bufs + req->write_index,
req->nbufs - req->write_index);
size = uv__count_bufs(req->bufs + req->write_index,
req->nbufs - req->write_index);
assert(req->handle->write_queue_size >= size);

return size;
@@ -1327,7 +1315,7 @@ int uv_write2(uv_write_t* req,
memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
req->nbufs = nbufs;
req->write_index = 0;
stream->write_queue_size += uv_count_bufs(bufs, nbufs);
stream->write_queue_size += uv__count_bufs(bufs, nbufs);

/* Append the request to write_queue. */
QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
@@ -1395,7 +1383,7 @@ int uv_try_write(uv_stream_t* stream,
return r;

/* Remove not written bytes from write queue size */
written = uv_count_bufs(bufs, nbufs);
written = uv__count_bufs(bufs, nbufs);
if (req.bufs != NULL)
req_size = uv__write_req_size(&req);
else
@@ -82,6 +82,9 @@ void uv__udp_finish_close(uv_udp_t* handle) {
req->send_cb(req, -ECANCELED);
}

handle->send_queue_size = 0;
handle->send_queue_count = 0;

/* Now tear down the handle. */
handle->recv_cb = NULL;
handle->alloc_cb = NULL;
@@ -127,6 +130,8 @@ static void uv__udp_run_pending(uv_udp_t* handle) {
* why we don't handle partial writes. Just pop the request
* off the write queue and onto the completed queue, done.
*/
handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
handle->send_queue_count--;
QUEUE_REMOVE(&req->queue);
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
}
@@ -433,6 +438,8 @@ int uv__udp_send(uv_udp_send_t* req,
return -ENOMEM;

memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
handle->send_queue_count++;
QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);
uv__handle_start(handle);
@@ -441,6 +448,46 @@ int uv__udp_send(uv_udp_send_t* req,
}


int uv__udp_try_send(uv_udp_t* handle,
const uv_buf_t bufs[],
unsigned int nbufs,
const struct sockaddr* addr,
unsigned int addrlen) {
int err;
struct msghdr h;
ssize_t size;

assert(nbufs > 0);

/* already sending a message */
if (handle->send_queue_count != 0)
return -EAGAIN;

err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
if (err)
return err;

memset(&h, 0, sizeof h);
h.msg_name = (struct sockaddr*) addr;
h.msg_namelen = addrlen;
h.msg_iov = (struct iovec*) bufs;
h.msg_iovlen = nbufs;

do {
size = sendmsg(handle->io_watcher.fd, &h, 0);
} while (size == -1 && errno == EINTR);

if (size == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
return -EAGAIN;
else
return -errno;
}

return size;
}


static int uv__udp_set_membership4(uv_udp_t* handle,
const struct sockaddr_in* multicast_addr,
const char* interface_addr,
@@ -531,6 +578,8 @@ int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
handle->alloc_cb = NULL;
handle->recv_cb = NULL;
handle->send_queue_size = 0;
handle->send_queue_count = 0;
uv__io_init(&handle->io_watcher, uv__udp_io, -1);
QUEUE_INIT(&handle->write_queue);
QUEUE_INIT(&handle->write_completed_queue);
@@ -233,6 +233,26 @@ int uv_udp_send(uv_udp_send_t* req,
}


int uv_udp_try_send(uv_udp_t* handle,
const uv_buf_t bufs[],
unsigned int nbufs,
const struct sockaddr* addr) {
unsigned int addrlen;

if (handle->type != UV_UDP)
return UV_EINVAL;

if (addr->sa_family == AF_INET)
addrlen = sizeof(struct sockaddr_in);
else if (addr->sa_family == AF_INET6)
addrlen = sizeof(struct sockaddr_in6);
else
return UV_EINVAL;

return uv__udp_try_send(handle, bufs, nbufs, addr, addrlen);
}


int uv_udp_recv_start(uv_udp_t* handle,
uv_alloc_cb alloc_cb,
uv_udp_recv_cb recv_cb) {
@@ -446,6 +466,19 @@ int uv__getaddrinfo_translate_error(int sys_err) {
return 0; /* Pacify compiler. */
}


size_t uv__count_bufs(const uv_buf_t bufs[], unsigned int nbufs) {
unsigned int i;
size_t bytes;

bytes = 0;
for (i = 0; i < nbufs; i++)
bytes += (size_t) bufs[i].len;

return bytes;
}


int uv_fs_event_getpath(uv_fs_event_t* handle, char* buf, size_t* len) {
size_t required_len;

@@ -83,6 +83,12 @@ int uv__udp_send(uv_udp_send_t* req,
unsigned int addrlen,
uv_udp_send_cb send_cb);

int uv__udp_try_send(uv_udp_t* handle,
const uv_buf_t bufs[],
unsigned int nbufs,
const struct sockaddr* addr,
unsigned int addrlen);

int uv__udp_recv_start(uv_udp_t* handle, uv_alloc_cb alloccb,
uv_udp_recv_cb recv_cb);

@@ -99,6 +105,8 @@ void uv__work_submit(uv_loop_t* loop,

void uv__work_done(uv_async_t* handle);

size_t uv__count_bufs(const uv_buf_t bufs[], unsigned int nbufs);

#define uv__has_active_reqs(loop) \
(QUEUE_EMPTY(&(loop)->active_reqs) == 0)

@@ -1254,7 +1254,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop,
}

/* Request queued by the kernel. */
req->queued_bytes = uv_count_bufs(bufs, nbufs);
req->queued_bytes = uv__count_bufs(bufs, nbufs);
handle->write_queue_size += req->queued_bytes;
} else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
/* Using overlapped IO, but wait for completion before returning */
@@ -1311,7 +1311,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop,
req->queued_bytes = 0;
} else {
/* Request queued by the kernel. */
req->queued_bytes = uv_count_bufs(bufs, nbufs);
req->queued_bytes = uv__count_bufs(bufs, nbufs);
handle->write_queue_size += req->queued_bytes;
}

@@ -53,15 +53,4 @@ INLINE static void uv_connection_init(uv_stream_t* handle) {
}


INLINE static size_t uv_count_bufs(const uv_buf_t bufs[], unsigned int nbufs) {
unsigned int i;
size_t bytes;

bytes = 0;
for (i = 0; i < nbufs; i++)
bytes += (size_t) bufs[i].len;

return bytes;
}

#endif /* UV_WIN_STREAM_INL_H_ */
@@ -839,7 +839,7 @@ int uv_tcp_write(uv_loop_t* loop,
uv_insert_pending_req(loop, (uv_req_t*) req);
} else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
/* Request queued by the kernel. */
req->queued_bytes = uv_count_bufs(bufs, nbufs);
req->queued_bytes = uv__count_bufs(bufs, nbufs);
handle->reqs_pending++;
handle->write_reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
@@ -129,6 +129,8 @@ int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
handle->activecnt = 0;
handle->func_wsarecv = WSARecv;
handle->func_wsarecvfrom = WSARecvFrom;
handle->send_queue_size = 0;
handle->send_queue_count = 0;

uv_req_init(loop, (uv_req_t*) &(handle->recv_req));
handle->recv_req.type = UV_UDP_RECV;
@@ -400,8 +402,10 @@ static int uv__send(uv_udp_send_t* req,
uv_insert_pending_req(loop, (uv_req_t*)req);
} else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
/* Request queued by the kernel. */
req->queued_bytes = uv_count_bufs(bufs, nbufs);
req->queued_bytes = uv__count_bufs(bufs, nbufs);
handle->reqs_pending++;
handle->send_queue_size += req->queued_bytes;
handle->send_queue_count++;
REGISTER_HANDLE_REQ(loop, handle, req);
} else {
/* Send failed due to an error. */
@@ -524,6 +528,11 @@ void uv_process_udp_send_req(uv_loop_t* loop, uv_udp_t* handle,

assert(handle->type == UV_UDP);

assert(handle->send_queue_size >= req->queued_bytes);
assert(handle->send_queue_count >= 1);
handle->send_queue_size -= req->queued_bytes;
handle->send_queue_count--;

UNREGISTER_HANDLE_REQ(loop, handle, req);

if (req->cb) {
@@ -860,3 +869,12 @@ int uv__udp_send(uv_udp_send_t* req,

return 0;
}


int uv__udp_try_send(uv_udp_t* handle,
const uv_buf_t bufs[],
unsigned int nbufs,
const struct sockaddr* addr,
unsigned int addrlen) {
return UV_ENOSYS;
}
@@ -353,6 +353,9 @@ TEST_IMPL(getsockname_udp) {

ASSERT(getsocknamecount == 2);

ASSERT(udp.send_queue_size == 0);
ASSERT(udpServer.send_queue_size == 0);

MAKE_VALGRIND_HAPPY();
return 0;
}
@@ -95,6 +95,7 @@ TEST_DECLARE (udp_ipv6_only)
TEST_DECLARE (udp_options)
TEST_DECLARE (udp_no_autobind)
TEST_DECLARE (udp_open)
TEST_DECLARE (udp_try_send)
TEST_DECLARE (pipe_bind_error_addrinuse)
TEST_DECLARE (pipe_bind_error_addrnotavail)
TEST_DECLARE (pipe_bind_error_inval)
@@ -376,6 +377,7 @@ TASK_LIST_START
TEST_ENTRY (udp_multicast_join)
TEST_ENTRY (udp_multicast_join6)
TEST_ENTRY (udp_multicast_ttl)
TEST_ENTRY (udp_try_send)

TEST_ENTRY (udp_open)
TEST_HELPER (udp_open, udp4_echo_server)
@@ -91,6 +91,9 @@ TEST_IMPL(udp_multicast_interface) {
ASSERT(sv_send_cb_called == 1);
ASSERT(close_cb_called == 1);

ASSERT(client.send_queue_size == 0);
ASSERT(server.send_queue_size == 0);

MAKE_VALGRIND_HAPPY();
return 0;
}
@@ -159,6 +159,8 @@ TEST_IMPL(udp_open) {
ASSERT(send_cb_called == 1);
ASSERT(close_cb_called == 1);

ASSERT(client.send_queue_size == 0);

MAKE_VALGRIND_HAPPY();
return 0;
}
@@ -206,6 +206,9 @@ TEST_IMPL(udp_send_and_recv) {
ASSERT(sv_recv_cb_called == 1);
ASSERT(close_cb_called == 2);

ASSERT(client.send_queue_size == 0);
ASSERT(server.send_queue_size == 0);

MAKE_VALGRIND_HAPPY();
return 0;
}