Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

unix: clean up udp read/write watchers

  • Loading branch information...
bnoordhuis committed Jan 14, 2012
1 parent 28b0867 commit 454e0212b0615037a4becd3f0eb944cce025ed62
Showing with 54 additions and 55 deletions.
  1. +54 −55 src/unix/udp.c
@@ -28,60 +28,65 @@
#include <stdlib.h>


static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w);
static void uv__udp_run_completed(uv_udp_t* handle);
static void uv__udp_run_pending(uv_udp_t* handle);
static void uv__udp_recvmsg(uv_udp_t* handle);
static void uv__udp_sendmsg(uv_udp_t* handle);
static void uv__udp_io(EV_P_ ev_io* w, int events);
static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents);
static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents);
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain);
static int uv__udp_send(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[],
int bufcnt, struct sockaddr* addr, socklen_t addrlen, uv_udp_send_cb send_cb);


static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w) {
int flags;
static void uv__udp_start_watcher(uv_udp_t* handle,
ev_io* w,
void (*cb)(EV_P_ ev_io*, int),
int flags) {
if (ev_is_active(w)) return;
ev_set_cb(w, cb);
ev_io_set(w, handle->fd, flags);
ev_io_start(handle->loop->ev, w);
ev_unref(handle->loop->ev);
}

if (ev_is_active(w)) {
return;
}

assert(w == &handle->read_watcher
|| w == &handle->write_watcher);
static void uv__udp_stop_watcher(uv_udp_t* handle, ev_io* w) {
if (!ev_is_active(w)) return;
ev_ref(handle->loop->ev);
ev_io_stop(handle->loop->ev, w);
ev_io_set(w, -1, 0);
ev_set_cb(w, NULL);
}

flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE);

w->data = handle;
ev_set_cb(w, uv__udp_io);
ev_io_set(w, handle->fd, flags);
ev_io_start(handle->loop->ev, w);
ev_unref(handle->loop->ev);
static void uv__udp_start_read_watcher(uv_udp_t* handle) {
uv__udp_start_watcher(handle,
&handle->read_watcher,
uv__udp_recvmsg,
EV_READ);
}


void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w) {
int flags;
static void uv__udp_start_write_watcher(uv_udp_t* handle) {
uv__udp_start_watcher(handle,
&handle->write_watcher,
uv__udp_sendmsg,
EV_WRITE);
}

if (!ev_is_active(w)) {
return;
}

assert(w == &handle->read_watcher
|| w == &handle->write_watcher);
static void uv__udp_stop_read_watcher(uv_udp_t* handle) {
uv__udp_stop_watcher(handle, &handle->read_watcher);
}

flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE);

ev_ref(handle->loop->ev);
ev_io_stop(handle->loop->ev, w);
ev_io_set(w, -1, flags);
ev_set_cb(w, NULL);
w->data = (void*)0xDEADBABE;
static void uv__udp_stop_write_watcher(uv_udp_t* handle) {
uv__udp_stop_watcher(handle, &handle->write_watcher);
}


void uv__udp_start_close(uv_udp_t* handle) {
uv__udp_watcher_stop(handle, &handle->read_watcher);
uv__udp_watcher_stop(handle, &handle->write_watcher);
uv__udp_stop_write_watcher(handle);
uv__udp_stop_read_watcher(handle);
uv__close(handle->fd);
handle->fd = -1;
}
@@ -206,13 +211,18 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
}


static void uv__udp_recvmsg(uv_udp_t* handle) {
static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents) {
struct sockaddr_storage peer;
struct msghdr h;
uv_udp_t* handle;
ssize_t nread;
uv_buf_t buf;
int flags;

handle = container_of(w, uv_udp_t, read_watcher);
assert(handle->type == UV_UDP);
assert(revents & EV_READ);

assert(handle->recv_cb != NULL);
assert(handle->alloc_cb != NULL);

@@ -263,7 +273,13 @@ static void uv__udp_recvmsg(uv_udp_t* handle) {
}


static void uv__udp_sendmsg(uv_udp_t* handle) {
static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents) {
uv_udp_t* handle;

handle = container_of(w, uv_udp_t, write_watcher);
assert(handle->type == UV_UDP);
assert(revents & EV_WRITE);

assert(!ngx_queue_empty(&handle->write_queue)
|| !ngx_queue_empty(&handle->write_completed_queue));

@@ -279,28 +295,11 @@ static void uv__udp_sendmsg(uv_udp_t* handle) {
}
else if (ngx_queue_empty(&handle->write_queue)) {
/* Pending queue and completion queue empty, stop watcher. */
uv__udp_watcher_stop(handle, &handle->write_watcher);
uv__udp_stop_write_watcher(handle);
}
}


static void uv__udp_io(EV_P_ ev_io* w, int events) {
uv_udp_t* handle;

handle = w->data;
assert(handle != NULL);
assert(handle->type == UV_UDP);
assert(handle->fd >= 0);
assert(!(events & ~(EV_READ|EV_WRITE)));

if (events & EV_READ)
uv__udp_recvmsg(handle);

if (events & EV_WRITE)
uv__udp_sendmsg(handle);
}


static int uv__bind(uv_udp_t* handle,
int domain,
struct sockaddr* addr,
@@ -434,7 +433,7 @@ static int uv__udp_send(uv_udp_send_t* req,
memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0]));

ngx_queue_insert_tail(&handle->write_queue, &req->queue);
uv__udp_watcher_start(handle, &handle->write_watcher);
uv__udp_start_write_watcher(handle);

return 0;
}
@@ -589,14 +588,14 @@ int uv_udp_recv_start(uv_udp_t* handle,

handle->alloc_cb = alloc_cb;
handle->recv_cb = recv_cb;
uv__udp_watcher_start(handle, &handle->read_watcher);
uv__udp_start_read_watcher(handle);

return 0;
}


int uv_udp_recv_stop(uv_udp_t* handle) {
uv__udp_watcher_stop(handle, &handle->read_watcher);
uv__udp_stop_read_watcher(handle);
handle->alloc_cb = NULL;
handle->recv_cb = NULL;
return 0;

0 comments on commit 454e021

Please sign in to comment.
You can’t perform that action at this time.