@@ -31,51 +31,28 @@
static void uv__udp_run_completed (uv_udp_t * handle);
static void uv__udp_run_pending (uv_udp_t * handle);
static void uv__udp_recvmsg (uv_loop_t * loop, uv__io_t * w, int revents);
static void uv__udp_sendmsg (uv_loop_t * loop, uv__io_t * w, int revents);
static void uv__udp_io (uv_loop_t * loop, uv__io_t * w, unsigned int revents);
static void uv__udp_recvmsg (uv_loop_t * loop, uv__io_t * w, unsigned int revents);
static void uv__udp_sendmsg (uv_loop_t * loop, uv__io_t * w, unsigned int revents);
static int uv__udp_maybe_deferred_bind (uv_udp_t * handle, int domain);
static int uv__udp_send (uv_udp_send_t * req, uv_udp_t * handle, uv_buf_t bufs[],
int bufcnt, struct sockaddr* addr, socklen_t addrlen, uv_udp_send_cb send_cb);
static void uv__udp_start_watcher (uv_udp_t * handle,
uv__io_t * w,
uv__io_cb cb,
int events) {
if (uv__io_active (w)) return ;
uv__io_init (w, cb, handle->fd , events);
uv__io_start (handle->loop , w);
uv__handle_start (handle);
}
static void uv__udp_stop_watcher (uv_udp_t * handle, uv__io_t * w) {
if (!uv__io_active (w)) return ;
uv__io_stop (handle->loop , w);
if (!uv__io_active (&handle->read_watcher ) &&
!uv__io_active (&handle->write_watcher ))
{
uv__handle_stop (handle);
}
}
void uv__udp_close (uv_udp_t * handle) {
uv__udp_stop_watcher (handle, &handle->write_watcher );
uv__udp_stop_watcher (handle, &handle-> read_watcher );
close (handle->fd );
handle->fd = -1 ;
uv__io_stop (handle-> loop , &handle->io_watcher , UV__POLLIN | UV__POLLOUT );
uv__handle_stop (handle);
close (handle->io_watcher . fd );
handle->io_watcher . fd = -1 ;
}
void uv__udp_finish_close (uv_udp_t * handle) {
uv_udp_send_t * req;
ngx_queue_t * q;
assert (!uv__io_active (&handle->write_watcher ));
assert (!uv__io_active (&handle->read_watcher ));
assert (handle->fd == -1 );
assert (!uv__io_active (&handle->io_watcher , UV__POLLIN | UV__POLLOUT));
assert (handle->io_watcher .fd == -1 );
uv__udp_run_completed (handle);
@@ -126,7 +103,7 @@ static void uv__udp_run_pending(uv_udp_t* handle) {
h.msg_iovlen = req->bufcnt ;
do {
size = sendmsg (handle->fd , &h, 0 );
size = sendmsg (handle->io_watcher . fd , &h, 0 );
}
while (size == -1 && errno == EINTR);
@@ -194,7 +171,18 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
}
static void uv__udp_recvmsg (uv_loop_t * loop, uv__io_t * w, int revents) {
static void uv__udp_io (uv_loop_t * loop, uv__io_t * w, unsigned int revents) {
if (revents & UV__POLLIN)
uv__udp_recvmsg (loop, w, revents);
if (revents & UV__POLLOUT)
uv__udp_sendmsg (loop, w, revents);
}
static void uv__udp_recvmsg (uv_loop_t * loop,
uv__io_t * w,
unsigned int revents) {
struct sockaddr_storage peer;
struct msghdr h;
uv_udp_t * handle;
@@ -203,9 +191,9 @@ static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) {
int flags;
int count;
handle = container_of (w, uv_udp_t , read_watcher );
handle = container_of (w, uv_udp_t , io_watcher );
assert (handle->type == UV_UDP);
assert (revents & UV__IO_READ );
assert (revents & UV__POLLIN );
assert (handle->recv_cb != NULL );
assert (handle->alloc_cb != NULL );
@@ -228,7 +216,7 @@ static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) {
h.msg_iovlen = 1 ;
do {
nread = recvmsg (handle->fd , &h, 0 );
nread = recvmsg (handle->io_watcher . fd , &h, 0 );
}
while (nread == -1 && errno == EINTR);
@@ -258,17 +246,19 @@ static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) {
/* recv_cb callback may decide to pause or close the handle */
while (nread != -1
&& count-- > 0
&& handle->fd != -1
&& handle->io_watcher . fd != -1
&& handle->recv_cb != NULL );
}
static void uv__udp_sendmsg (uv_loop_t * loop, uv__io_t * w, int revents) {
static void uv__udp_sendmsg (uv_loop_t * loop,
uv__io_t * w,
unsigned int revents) {
uv_udp_t * handle;
handle = container_of (w, uv_udp_t , write_watcher );
handle = container_of (w, uv_udp_t , io_watcher );
assert (handle->type == UV_UDP);
assert (revents & UV__IO_WRITE );
assert (revents & UV__POLLOUT );
assert (!ngx_queue_empty (&handle->write_queue )
|| !ngx_queue_empty (&handle->write_completed_queue ));
@@ -281,11 +271,14 @@ static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, int revents) {
if (!ngx_queue_empty (&handle->write_completed_queue )) {
/* Schedule completion callbacks. */
uv__io_feed (handle->loop , &handle->write_watcher , UV__IO_WRITE );
uv__io_feed (handle->loop , &handle->io_watcher );
}
else if (ngx_queue_empty (&handle->write_queue )) {
/* Pending queue and completion queue empty, stop watcher. */
uv__udp_stop_watcher (handle, &handle->write_watcher );
uv__io_stop (loop, &handle->io_watcher , UV__POLLOUT);
if (!uv__io_active (&handle->io_watcher , UV__POLLIN))
uv__handle_stop (handle);
}
}
@@ -316,15 +309,15 @@ static int uv__bind(uv_udp_t* handle,
goto out;
}
if (handle->fd == -1 ) {
if (handle->io_watcher . fd == -1 ) {
if ((fd = uv__socket (domain, SOCK_DGRAM, 0 )) == -1 ) {
uv__set_sys_error (handle->loop , errno);
goto out;
}
handle->fd = fd;
handle->io_watcher . fd = fd;
}
fd = handle->fd ;
fd = handle->io_watcher . fd ;
yes = 1 ;
if (setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) == -1 ) {
uv__set_sys_error (handle->loop , errno);
@@ -365,12 +358,13 @@ static int uv__bind(uv_udp_t* handle,
goto out;
}
handle->io_watcher .fd = fd;
status = 0 ;
out:
if (status) {
close (handle->fd );
handle->fd = -1 ;
close (handle->io_watcher . fd );
handle->io_watcher . fd = -1 ;
}
errno = saved_errno;
@@ -384,7 +378,7 @@ static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain) {
assert (domain == AF_INET || domain == AF_INET6);
if (handle->fd != -1 )
if (handle->io_watcher . fd != -1 )
return 0 ;
switch (domain) {
@@ -442,14 +436,11 @@ static int uv__udp_send(uv_udp_send_t* req,
uv__set_sys_error (handle->loop , ENOMEM);
return -1 ;
}
memcpy (req->bufs , bufs, bufcnt * sizeof (bufs[0 ]));
memcpy (req->bufs , bufs, bufcnt * sizeof (bufs[0 ]));
ngx_queue_insert_tail (&handle->write_queue , &req->queue );
uv__udp_start_watcher (handle,
&handle->write_watcher ,
uv__udp_sendmsg,
UV__IO_WRITE);
uv__io_start (handle->loop , &handle->io_watcher , UV__POLLOUT);
uv__handle_start (handle);
return 0 ;
}
@@ -459,9 +450,10 @@ int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
memset (handle, 0 , sizeof *handle);
uv__handle_init (loop, (uv_handle_t *)handle, UV_UDP);
handle->fd = -1 ;
handle->io_watcher . fd = -1 ;
ngx_queue_init (&handle->write_queue );
ngx_queue_init (&handle->write_completed_queue );
uv__io_init (&handle->io_watcher , uv__udp_io, -1 );
return 0 ;
}
@@ -494,7 +486,7 @@ int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
status = -1 ;
/* Check for already active socket. */
if (handle->fd != -1 ) {
if (handle->io_watcher . fd != -1 ) {
uv__set_artificial_error (handle->loop , UV_EALREADY);
goto out;
}
@@ -521,7 +513,7 @@ int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
}
#endif
handle->fd = sock;
handle->io_watcher . fd = sock;
status = 0 ;
out:
@@ -557,7 +549,7 @@ int uv_udp_set_membership(uv_udp_t* handle, const char* multicast_addr,
return -1 ;
}
if (setsockopt (handle->fd , IPPROTO_IP, optname, (void *) &mreq, sizeof mreq) == -1 ) {
if (setsockopt (handle->io_watcher . fd , IPPROTO_IP, optname, (void *) &mreq, sizeof mreq) == -1 ) {
uv__set_sys_error (handle->loop , errno);
return -1 ;
}
@@ -576,15 +568,15 @@ static int uv__setsockopt_maybe_char(uv_udp_t* handle, int option, int val) {
if (val < 0 || val > 255 )
return uv__set_sys_error (handle->loop , EINVAL);
if (setsockopt (handle->fd , IPPROTO_IP, option, &arg, sizeof (arg)))
if (setsockopt (handle->io_watcher . fd , IPPROTO_IP, option, &arg, sizeof (arg)))
return uv__set_sys_error (handle->loop , errno);
return 0 ;
}
int uv_udp_set_broadcast (uv_udp_t * handle, int on) {
if (setsockopt (handle->fd , SOL_SOCKET, SO_BROADCAST, &on, sizeof (on)))
if (setsockopt (handle->io_watcher . fd , SOL_SOCKET, SO_BROADCAST, &on, sizeof (on)))
return uv__set_sys_error (handle->loop , errno);
return 0 ;
@@ -595,7 +587,7 @@ int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
if (ttl < 1 || ttl > 255 )
return uv__set_sys_error (handle->loop , EINVAL);
if (setsockopt (handle->fd , IPPROTO_IP, IP_TTL, &ttl, sizeof (ttl)))
if (setsockopt (handle->io_watcher . fd , IPPROTO_IP, IP_TTL, &ttl, sizeof (ttl)))
return uv__set_sys_error (handle->loop , errno);
return 0 ;
@@ -620,7 +612,7 @@ int uv_udp_getsockname(uv_udp_t* handle, struct sockaddr* name, int* namelen) {
/* Don't clobber errno. */
saved_errno = errno;
if (handle->fd < 0 ) {
if (handle->io_watcher . fd == - 1 ) {
uv__set_sys_error (handle->loop , EINVAL);
rv = -1 ;
goto out;
@@ -629,7 +621,7 @@ int uv_udp_getsockname(uv_udp_t* handle, struct sockaddr* name, int* namelen) {
/* sizeof(socklen_t) != sizeof(int) on some systems. */
socklen = (socklen_t )*namelen;
if (getsockname (handle->fd , name, &socklen) == -1 ) {
if (getsockname (handle->io_watcher . fd , name, &socklen) == -1 ) {
uv__set_sys_error (handle->loop , errno);
rv = -1 ;
} else {
@@ -682,7 +674,7 @@ int uv_udp_recv_start(uv_udp_t* handle,
return -1 ;
}
if (uv__io_active (&handle->read_watcher )) {
if (uv__io_active (&handle->io_watcher , UV__POLLIN )) {
uv__set_artificial_error (handle->loop , UV_EALREADY);
return -1 ;
}
@@ -693,18 +685,21 @@ int uv_udp_recv_start(uv_udp_t* handle,
handle->alloc_cb = alloc_cb;
handle->recv_cb = recv_cb;
uv__udp_start_watcher (handle,
&handle->read_watcher ,
uv__udp_recvmsg,
UV__IO_READ);
uv__io_start (handle->loop , &handle->io_watcher , UV__POLLIN);
uv__handle_start (handle);
return 0 ;
}
int uv_udp_recv_stop (uv_udp_t * handle) {
uv__udp_stop_watcher (handle, &handle->read_watcher );
uv__io_stop (handle->loop , &handle->io_watcher , UV__POLLIN);
if (!uv__io_active (&handle->io_watcher , UV__POLLOUT))
uv__handle_stop (handle);
handle->alloc_cb = NULL ;
handle->recv_cb = NULL ;
return 0 ;
}