Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

unix: rework pending handle/req logic

  • Loading branch information
bnoordhuis committed May 29, 2012
1 parent 12ee388 commit 58a272e5563754eb51ef54d76f8ab3db1300c731
Showing with 75 additions and 110 deletions.
  1. +2 −2 include/uv-private/uv-unix.h
  2. +70 −86 src/unix/core.c
  3. +0 −14 src/unix/internal.h
  4. +1 −1 src/unix/loop.c
  5. +1 −1 src/unix/pipe.c
  6. +1 −6 src/unix/stream.c
@@ -106,7 +106,7 @@ struct uv__io_s {
uv_async_t uv_eio_want_poll_notifier; \
uv_async_t uv_eio_done_poll_notifier; \
uv_idle_t uv_eio_poller; \
uv_handle_t* pending_handles; \
uv_handle_t* closing_handles; \
ngx_queue_t prepare_handles; \
ngx_queue_t check_handles; \
ngx_queue_t idle_handles; \
@@ -144,7 +144,7 @@ struct uv__io_s {
/* TODO: union or classes please! */
#define UV_HANDLE_PRIVATE_FIELDS \
int flags; \
uv_handle_t* next_pending; \
uv_handle_t* next_closing; \


#define UV_STREAM_PRIVATE_FIELDS \
@@ -59,8 +59,6 @@
static uv_loop_t default_loop_struct;
static uv_loop_t* default_loop_ptr;

static void uv__finish_close(uv_handle_t* handle);


void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
handle->close_cb = close_cb;
@@ -116,7 +114,72 @@ void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
}

handle->flags |= UV_CLOSING;
uv__make_pending(handle);

handle->next_closing = handle->loop->closing_handles;
handle->loop->closing_handles = handle;
}


static void uv__finish_close(uv_handle_t* handle) {
assert(!uv__is_active(handle));
assert(handle->flags & UV_CLOSING);
assert(!(handle->flags & UV_CLOSED));
handle->flags |= UV_CLOSED;

switch (handle->type) {
case UV_PREPARE:
case UV_CHECK:
case UV_IDLE:
case UV_ASYNC:
case UV_TIMER:
case UV_PROCESS:
break;

case UV_NAMED_PIPE:
case UV_TCP:
case UV_TTY:
assert(!uv__io_active(&((uv_stream_t*)handle)->read_watcher));
assert(!uv__io_active(&((uv_stream_t*)handle)->write_watcher));
assert(((uv_stream_t*)handle)->fd == -1);
uv__stream_destroy((uv_stream_t*)handle);
break;

case UV_UDP:
uv__udp_finish_close((uv_udp_t*)handle);
break;

case UV_FS_EVENT:
break;

case UV_POLL:
break;

default:
assert(0);
break;
}


if (handle->close_cb) {
handle->close_cb(handle);
}

uv__handle_unref(handle);
}


static void uv__run_closing_handles(uv_loop_t* loop) {
uv_handle_t* p;
uv_handle_t* q;

p = loop->closing_handles;
loop->closing_handles = NULL;

while (p) {
q = p->next_closing;
uv__finish_close(p);
p = q;
}
}


@@ -163,36 +226,6 @@ void uv_loop_delete(uv_loop_t* loop) {
}


static void uv__run_pending(uv_loop_t* loop) {
uv_handle_t* p;
uv_handle_t* q;

if (!loop->pending_handles)
return;

for (p = loop->pending_handles, loop->pending_handles = NULL; p; p = q) {
q = p->next_pending;
p->next_pending = NULL;
p->flags &= ~UV__PENDING;

if (p->flags & UV_CLOSING) {
uv__finish_close(p);
continue;
}

switch (p->type) {
case UV_NAMED_PIPE:
case UV_TCP:
case UV_TTY:
uv__stream_pending((uv_stream_t*)p);
break;
default:
abort();
}
}
}


static void uv__poll(uv_loop_t* loop, int block) {
/* bump the loop's refcount, otherwise libev does
* a zero timeout poll and we end up busy looping
@@ -210,7 +243,6 @@ static int uv__should_block(uv_loop_t* loop) {

static int uv__run(uv_loop_t* loop) {
uv__run_idle(loop);
uv__run_pending(loop);

if (uv__has_active_handles(loop) || uv__has_active_reqs(loop)) {
uv__run_prepare(loop);
@@ -221,9 +253,9 @@ static int uv__run(uv_loop_t* loop) {
uv__run_check(loop);
}

return uv__has_pending_handles(loop)
|| uv__has_active_handles(loop)
|| uv__has_active_reqs(loop);
uv__run_closing_handles(loop);

return uv__has_active_handles(loop) || uv__has_active_reqs(loop);
}


@@ -245,55 +277,7 @@ void uv__handle_init(uv_loop_t* loop, uv_handle_t* handle,
handle->loop = loop;
handle->type = type;
handle->flags = UV__REF; /* ref the loop when active */
handle->next_pending = NULL;
}


void uv__finish_close(uv_handle_t* handle) {
assert(!uv__is_active(handle));
assert(handle->flags & UV_CLOSING);
assert(!(handle->flags & UV_CLOSED));
handle->flags |= UV_CLOSED;

switch (handle->type) {
case UV_PREPARE:
case UV_CHECK:
case UV_IDLE:
case UV_ASYNC:
case UV_TIMER:
case UV_PROCESS:
break;

case UV_NAMED_PIPE:
case UV_TCP:
case UV_TTY:
assert(!uv__io_active(&((uv_stream_t*)handle)->read_watcher));
assert(!uv__io_active(&((uv_stream_t*)handle)->write_watcher));
assert(((uv_stream_t*)handle)->fd == -1);
uv__stream_destroy((uv_stream_t*)handle);
break;

case UV_UDP:
uv__udp_finish_close((uv_udp_t*)handle);
break;

case UV_FS_EVENT:
break;

case UV_POLL:
break;

default:
assert(0);
break;
}


if (handle->close_cb) {
handle->close_cb(handle);
}

uv__handle_unref(handle);
handle->next_closing = NULL;
}


@@ -98,18 +98,6 @@ enum {
UV__PENDING = 0x800
};

inline static int uv__has_pending_handles(const uv_loop_t* loop) {
return loop->pending_handles != NULL;
}

inline static void uv__make_pending(uv_handle_t* h) {
if (h->flags & UV__PENDING) return;
h->next_pending = h->loop->pending_handles;
h->loop->pending_handles = h;
h->flags |= UV__PENDING;
}
#define uv__make_pending(h) uv__make_pending((uv_handle_t*)(h))

inline static void uv__req_init(uv_loop_t* loop,
uv_req_t* req,
uv_req_type type) {
@@ -180,8 +168,6 @@ void uv__timer_close(uv_timer_t* handle);
void uv__udp_close(uv_udp_t* handle);
void uv__udp_finish_close(uv_udp_t* handle);

void uv__stream_pending(uv_stream_t* handle);

#define UV__F_IPC (1 << 0)
#define UV__F_NONBLOCK (1 << 1)
int uv__make_socketpair(int fds[2], int flags);
@@ -41,7 +41,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
ngx_queue_init(&loop->idle_handles);
ngx_queue_init(&loop->check_handles);
ngx_queue_init(&loop->prepare_handles);
loop->pending_handles = NULL;
loop->closing_handles = NULL;
loop->channel = NULL;
loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags);
ev_set_userdata(loop->ev, loop);
@@ -218,7 +218,7 @@ void uv_pipe_connect(uv_connect_t* req,
ngx_queue_init(&req->queue);

/* Run callback on next tick. */
uv__make_pending(handle);
uv__io_feed(handle->loop, &handle->write_watcher, UV__IO_WRITE);

/* Mimic the Windows pipe implementation, always
* return 0 and let the callback handle errors.
@@ -718,11 +718,6 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
}


void uv__stream_pending(uv_stream_t* handle) {
uv__stream_io(handle->loop, &handle->write_watcher, UV__IO_WRITE);
}


static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, int events) {
uv_stream_t* stream;

@@ -859,7 +854,7 @@ int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr,
uv__io_start(stream->loop, &stream->write_watcher);

if (stream->delayed_error)
uv__make_pending(stream);
uv__io_feed(stream->loop, &stream->write_watcher, UV__IO_WRITE);

return 0;
}

0 comments on commit 58a272e

Please sign in to comment.
You can’t perform that action at this time.