Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

stream: uv_thread_listen

  • Loading branch information...
indutny committed Feb 17, 2013
1 parent b6f72e5 commit 8176142fabe70d8c03cfde43d1f6eaa59ed7836b
Showing with 189 additions and 0 deletions.
  1. +1 −0 include/uv-private/uv-unix.h
  2. +1 −0 include/uv.h
  3. +14 −0 src/unix/internal.h
  4. +153 −0 src/unix/stream.c
  5. +20 −0 src/unix/tcp.c
@@ -222,6 +222,7 @@ typedef struct {
uv_connection_cb connection_cb; \
int delayed_error; \
int accepted_fd; \
void* thread_listen_state; \
UV_STREAM_PRIVATE_PLATFORM_FIELDS \

#define UV_TCP_PRIVATE_FIELDS /* empty */
@@ -523,6 +523,7 @@ struct uv_stream_s {
};

UV_EXTERN int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb);
UV_EXTERN int uv_thread_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb);

/*
* This call is used in conjunction with uv_listen() to accept incoming
@@ -143,6 +143,7 @@ int uv__accept(int sockfd);

/* tcp */
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb);
int uv_tcp_thread_listen_init(uv_tcp_t* tcp, int backlog);
int uv__tcp_nodelay(int fd, int on);
int uv__tcp_keepalive(int fd, int on, unsigned int delay);

@@ -171,6 +172,19 @@ int uv__kqueue_init(uv_loop_t* loop);
int uv__platform_loop_init(uv_loop_t* loop, int default_loop);
void uv__platform_loop_delete(uv_loop_t* loop);

/* Forward declaration */
typedef struct uv__stream_thread_listen_s uv__stream_thread_listen_t;

struct uv__stream_thread_listen_s {
uv_thread_t thread;
uv_async_t conn_notify;
uv_sem_t conn_sem;
uv_stream_t* stream;
int accepted_fd;
int accept_error;
int wait_accept;
};

/* various */
void uv__async_close(uv_async_t* handle);
void uv__check_close(uv_check_t* handle);
@@ -109,6 +109,7 @@ void uv__stream_init(uv_loop_t* loop,
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->delayed_error = 0;
stream->thread_listen_state = NULL;
ngx_queue_init(&stream->write_queue);
ngx_queue_init(&stream->write_completed_queue);
stream->write_queue_size = 0;
@@ -268,6 +269,14 @@ void uv__stream_osx_cb_close(uv_handle_t* async) {
}


void uv__stream_osx_thread_cb_close(uv_handle_t* async) {
uv__stream_thread_listen_t* s;

s = container_of(async, uv__stream_thread_listen_t, conn_notify);
free(s);
}


int uv__stream_try_select(uv_stream_t* stream, int fd) {
/*
* kqueue doesn't work with some files from /dev mount on osx.
@@ -624,6 +633,17 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
status = 0;

out:
if (streamServer->thread_listen_state != NULL) {
uv__stream_thread_listen_t* s;

s = streamServer->thread_listen_state;

/* We was waiting for accept, accept() thread should continue now */
if (s->wait_accept) {
s->wait_accept = 0;
uv_sem_post(&s->conn_sem);
}
}
errno = saved_errno;
return status;
}
@@ -653,6 +673,127 @@ int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
}


void uv__thread_listen_notify(uv_async_t* handle, int status) {
uv__stream_thread_listen_t* s;
uv_stream_t* stream;

s = container_of(handle, uv__stream_thread_listen_t, conn_notify);
stream = s->stream;

if (s->accepted_fd == -1) {
uv__set_sys_error(stream->loop, s->accepted_fd);
stream->connection_cb(stream, -1);
} else {
stream->accepted_fd = s->accepted_fd;
stream->connection_cb(stream, 0);

if (stream->accepted_fd != -1) {
/* The user hasn't yet accepted called uv_accept() */
s->wait_accept = 1;
return;
}
}

uv_sem_post(&s->conn_sem);
}


void uv__thread_listen_accept(void* arg) {
int fd;
uv__stream_thread_listen_t* s;
uv_stream_t* stream;

stream = arg;
s = stream->thread_listen_state;
assert(s != NULL);

while (uv__stream_fd(stream) != -1) {
fd = uv__accept(uv__stream_fd(stream));
fprintf(stdout, "accept %d\n", fd);

s->accepted_fd = fd;
if (fd == -1) {
switch (errno) {
#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
case EAGAIN:
case ECONNABORTED:
continue; /* Ignore. */

case EMFILE:
case ENFILE:
abort();
return;

default:
s->accept_error = errno;
goto send;
}
}

s->accept_error = 0;

send:
uv_async_send(&s->conn_notify);
/* XXX: This is a little bit oversynchronization */
uv_sem_wait(&s->conn_sem);
}
}


int uv_thread_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
int r;
uv__stream_thread_listen_t* s;

assert(stream->thread_listen_state == NULL);

/* Only TCP is supported */
if (stream->type != UV_TCP)
return uv__set_artificial_error(stream->loop, UV_ENOTSUP);

if ((r = uv_tcp_thread_listen_init((uv_tcp_t*) stream, backlog)))
return r;

s = malloc(sizeof(*s));
if (s == NULL)
return uv__set_artificial_error(stream->loop, UV_ENOMEM);

if ((r = uv_async_init(stream->loop,
&s->conn_notify,
uv__thread_listen_notify))) {
goto fatal0;
}

if ((r = uv_sem_init(&s->conn_sem, 0)))
goto fatal1;

s->wait_accept = 0;
s->stream = stream;
stream->connection_cb = cb;
stream->thread_listen_state = s;
if ((r = uv_thread_create(&s->thread, uv__thread_listen_accept, stream)))
goto fatal2;

return 0;

fatal2:
stream->connection_cb = NULL;
stream->thread_listen_state = NULL;
uv_sem_destroy(&s->conn_sem);

fatal1:
uv_close((uv_handle_t*) &s->conn_notify, uv__stream_osx_thread_cb_close);
return r;

fatal0:
free(s);
stream->thread_listen_state = NULL;

return r;
}


static void uv__drain(uv_stream_t* stream) {
uv_shutdown_t* req;

@@ -1388,4 +1529,16 @@ void uv__stream_close(uv_stream_t* handle) {
}

assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));

/* Terminate thread listen thread */
if (handle->thread_listen_state != NULL) {
uv__stream_thread_listen_t* s;

s = handle->thread_listen_state;
uv_thread_join(&s->thread);

uv_sem_destroy(&s->conn_sem);
uv_close((uv_handle_t*) &s->conn_notify, uv__stream_osx_thread_cb_close);
handle->thread_listen_state = NULL;
}
}
@@ -255,6 +255,26 @@ int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
}


int uv_tcp_thread_listen_init(uv_tcp_t* tcp, int backlog) {
int r;

if (tcp->delayed_error)
return uv__set_sys_error(tcp->loop, tcp->delayed_error);

/* Socket should become blocking again */
if ((r = uv__nonblock(uv__stream_fd(tcp), 0)))
return r;

if (maybe_new_socket(tcp, AF_INET, UV_STREAM_READABLE))
return -1;

if (listen(uv__stream_fd(tcp), backlog))
return uv__set_sys_error(tcp->loop, errno);

return 0;
}


int uv__tcp_connect(uv_connect_t* req,
uv_tcp_t* handle,
struct sockaddr_in addr,

0 comments on commit 8176142

Please sign in to comment.
You can’t perform that action at this time.