Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

import/export streams accross loops

  • Loading branch information...
commit 2ce0058251373bc08016acd579aaf5bb0d8494c3 1 parent cdb44df
Igor Zinkovsky authored bnoordhuis committed
View
5 include/uv-private/uv-unix.h
@@ -192,6 +192,11 @@ typedef void* uv_lib_t;
struct termios orig_termios; \
int mode;
+#define UV_STREAM_INFO_PRIVATE_FIELDS \
+ union { \
+ int fd; \
+ };
+
/* UV_FS_EVENT_PRIVATE_FIELDS */
#if defined(__linux__)
View
5 include/uv-private/uv-win.h
@@ -450,6 +450,11 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
wchar_t* dirw; \
char* buffer;
+#define UV_STREAM_INFO_PRIVATE_FIELDS \
+ union { \
+ WSAPROTOCOL_INFOW socket_info; \
+ };
+
int uv_utf16_to_utf8(const wchar_t* utf16Buffer, size_t utf16Size,
char* utf8Buffer, size_t utf8Size);
int uv_utf8_to_utf16(const char* utf8Buffer, wchar_t* utf16Buffer,
View
23 include/uv.h
@@ -180,6 +180,7 @@ typedef struct uv_process_s uv_process_t;
typedef struct uv_counters_s uv_counters_t;
typedef struct uv_cpu_info_s uv_cpu_info_t;
typedef struct uv_interface_address_s uv_interface_address_t;
+typedef struct uv_stream_info_s uv_stream_info_t;
/* Request types */
typedef struct uv_req_s uv_req_t;
typedef struct uv_shutdown_s uv_shutdown_t;
@@ -531,6 +532,28 @@ UV_EXTERN int uv_tcp_getpeername(uv_tcp_t* handle, struct sockaddr* name,
int* namelen);
/*
+ * uv_stream_info_t is used to store exported stream (using uv_export),
+ * which can be imported into a different event-loop within the same process
+ * (using uv_import).
+ */
+struct uv_stream_info_s {
+ uv_handle_type type;
+ UV_STREAM_INFO_PRIVATE_FIELDS
+};
+
+/*
+ * Exports uv_stream_t as uv_stream_info_t value, which could
+ * be used to initialize shared streams within the same process.
+ */
+UV_EXTERN int uv_export(uv_stream_t* stream, uv_stream_info_t* info);
+
+/*
+ * Imports uv_stream_info_t value into uv_stream_t to initialize
+ * shared stream.
+ */
+UV_EXTERN int uv_import(uv_stream_t* stream, uv_stream_info_t* info);
+
+/*
* uv_tcp_connect, uv_tcp_connect6
* These functions establish IPv4 and IPv6 TCP connections. Provide an
* initialized TCP handle and an uninitialized uv_connect_t*. The callback
View
10 src/unix/stream.c
@@ -966,3 +966,13 @@ int uv_read_stop(uv_stream_t* stream) {
}
+int uv_export(uv_stream_t* stream, uv_stream_info_t* info) {
+ /* Implement me */
+ return uv__new_artificial_error(UV_ENOSYS);
+}
+
+
+int uv_import(uv_stream_t* stream, uv_stream_info_t* info) {
+ /* Implement me */
+ return uv__new_artificial_error(UV_ENOSYS);
+}
View
5 src/win/internal.h
@@ -143,11 +143,14 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle);
-int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info);
+int uv__tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info);
int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
LPWSAPROTOCOL_INFOW protocol_info);
+int uv_tcp_export(uv_tcp_t* tcp, uv_stream_info_t* info);
+int uv_tcp_import(uv_tcp_t* tcp, uv_stream_info_t* info);
+
/*
* UDP
View
2  src/win/pipe.c
@@ -649,7 +649,7 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
return -1;
}
- return uv_tcp_import((uv_tcp_t*)client, server->pending_socket_info);
+ return uv__tcp_import((uv_tcp_t*)client, server->pending_socket_info);
} else {
pipe_client = (uv_pipe_t*)client;
View
24 src/win/stream.c
@@ -186,3 +186,27 @@ size_t uv_count_bufs(uv_buf_t bufs[], int count) {
return bytes;
}
+
+
+int uv_export(uv_stream_t* stream, uv_stream_info_t* info) {
+ switch (stream->type) {
+ case UV_TCP:
+ return uv_tcp_export((uv_tcp_t*)stream, info);
+ default:
+ assert(0);
+ uv__set_sys_error(stream->loop, WSAEINVAL);
+ return -1;
+ }
+}
+
+
+int uv_import(uv_stream_t* stream, uv_stream_info_t* info) {
+ switch (stream->type) {
+ case UV_TCP:
+ return uv_tcp_import((uv_tcp_t*)stream, info);
+ default:
+ assert(0);
+ uv__set_sys_error(stream->loop, WSAEINVAL);
+ return -1;
+ }
+}
View
25 src/win/tcp.c
@@ -1019,7 +1019,7 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
}
-int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) {
+int uv__tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) {
SOCKET socket = WSASocketW(AF_INET,
SOCK_STREAM,
IPPROTO_IP,
@@ -1140,4 +1140,25 @@ int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
}
return 0;
-}
+}
+
+
+int uv_tcp_export(uv_tcp_t* tcp, uv_stream_info_t* info) {
+ if (uv_tcp_duplicate_socket(tcp, GetCurrentProcessId(),
+ &info->socket_info) == -1) {
+ return -1;
+ }
+
+ info->type = UV_TCP;
+ return 0;
+}
+
+
+int uv_tcp_import(uv_tcp_t* tcp, uv_stream_info_t* info) {
+ if (info->type != UV_TCP) {
+ uv__set_sys_error(tcp->loop, WSAEINVAL);
+ return -1;
+ }
+
+ return uv__tcp_import(tcp, &info->socket_info);
+}
View
224 test/test-ipc-threads.c
@@ -0,0 +1,224 @@
+/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to
+ * deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#include "uv.h"
+#include "runner.h"
+#include "task.h"
+
+#include <stdio.h>
+#include <string.h>
+
+typedef struct {
+ uv_loop_t* loop;
+ uv_thread_t thread;
+ uv_async_t recv_channel;
+ uv_async_t send_channel;
+ uv_tcp_t server;
+ uv_tcp_t conn;
+ int connection_accepted;
+ int close_cb_called;
+} worker_t;
+
+static worker_t parent, child;
+
+static volatile uv_stream_info_t dup_stream;
+
+typedef struct {
+ uv_connect_t conn_req;
+ uv_tcp_t conn;
+} tcp_conn;
+
+#define CONN_COUNT 100
+
+static void close_cb(uv_handle_t* handle) {
+ worker_t* worker = (worker_t*)handle->data;
+ ASSERT(worker);
+ worker->close_cb_called++;
+}
+
+
+static void on_connection(uv_stream_t* server, int status) {
+ int r;
+ worker_t* worker = CONTAINING_RECORD(server, worker_t, server);
+
+ if (!worker->connection_accepted) {
+ /*
+ * Accept the connection and close it.
+ */
+ ASSERT(status == 0);
+
+ r = uv_tcp_init(server->loop, &worker->conn);
+ ASSERT(r == 0);
+
+ worker->conn.data = worker;
+
+ r = uv_accept(server, (uv_stream_t*)&worker->conn);
+ ASSERT(r == 0);
+
+ worker->connection_accepted = 1;
+
+ uv_close((uv_handle_t*)&worker->conn, close_cb);
+ uv_close((uv_handle_t*)&worker->recv_channel, close_cb);
+ uv_close((uv_handle_t*)server, close_cb);
+ }
+}
+
+
+static void close_client_conn_cb(uv_handle_t* handle) {
+ tcp_conn* p = (tcp_conn*)handle->data;
+ free(p);
+}
+
+
+static void connect_cb(uv_connect_t* req, int status) {
+ uv_close((uv_handle_t*)req->handle, close_client_conn_cb);
+}
+
+
+static void make_many_connections() {
+ tcp_conn* conn;
+ struct sockaddr_in addr;
+ int r, i;
+
+ for (i = 0; i < CONN_COUNT; i++) {
+ conn = malloc(sizeof(*conn));
+ ASSERT(conn);
+
+ r = uv_tcp_init(uv_default_loop(), &conn->conn);
+ ASSERT(r == 0);
+
+ addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
+
+ r = uv_tcp_connect(&conn->conn_req, (uv_tcp_t*)&conn->conn, addr, connect_cb);
+ ASSERT(r == 0);
+
+ conn->conn.data = conn;
+ }
+}
+
+
+void on_parent_msg(uv_async_t* handle, int status) {
+ int r;
+
+ ASSERT(dup_stream.type == UV_TCP);
+
+ /* Import the shared TCP server, and start listening on it. */
+ r = uv_tcp_init(parent.loop, &parent.server);
+ ASSERT(r == 0);
+
+ parent.server.data = &parent;
+
+ r = uv_import((uv_stream_t*)&parent.server,
+ (uv_stream_info_t*)&dup_stream);
+ ASSERT(r == 0);
+
+ r = uv_listen((uv_stream_t*)&parent.server, 12, on_connection);
+ ASSERT(r == 0);
+
+ /* Create a bunch of connections to get both servers to accept. */
+ make_many_connections();
+}
+
+
+void on_child_msg(uv_async_t* handle, int status) {
+ ASSERT(!"no");
+}
+
+
+static void child_thread_entry(void* arg) {
+ int r;
+ int listen_after_write = (int)arg;
+
+ r = uv_tcp_init(child.loop, &child.server);
+ ASSERT(r == 0);
+
+ child.server.data = &child;
+
+ r = uv_tcp_bind(&child.server, uv_ip4_addr("0.0.0.0", TEST_PORT));
+ ASSERT(r == 0);
+
+ if (!listen_after_write) {
+ r = uv_listen((uv_stream_t*)&child.server, 12, on_connection);
+ ASSERT(r == 0);
+ }
+
+ r = uv_export((uv_stream_t*)&child.server,
+ (uv_stream_info_t*)&dup_stream);
+ ASSERT(r == 0);
+
+ r = uv_async_send(&child.send_channel);
+ ASSERT(r == 0);
+
+ if (listen_after_write) {
+ r = uv_listen((uv_stream_t*)&child.server, 12, on_connection);
+ ASSERT(r == 0);
+ }
+
+ r = uv_run(child.loop);
+ ASSERT(r == 0);
+
+ ASSERT(child.connection_accepted == 1);
+ ASSERT(child.close_cb_called == 3);
+}
+
+
+static void run_ipc_threads_test(int listen_after_write) {
+ int r;
+
+ parent.loop = uv_default_loop();
+ child.loop = uv_loop_new();
+ ASSERT(child.loop);
+
+ r = uv_async_init(parent.loop, &parent.recv_channel, on_parent_msg);
+ ASSERT(r == 0);
+ parent.recv_channel.data = &parent;
+
+ r = uv_async_init(child.loop, &parent.send_channel, on_child_msg);
+ ASSERT(r == 0);
+ parent.send_channel.data = &child;
+
+ child.send_channel = parent.recv_channel;
+ child.recv_channel = parent.send_channel;
+
+ r = uv_thread_create(&child.thread, child_thread_entry, (void*)listen_after_write);
+ ASSERT(r == 0);
+
+ r = uv_run(parent.loop);
+ ASSERT(r == 0);
+
+ ASSERT(parent.connection_accepted == 1);
+ ASSERT(parent.close_cb_called == 3);
+
+ r = uv_thread_join(&child.thread);
+ ASSERT(r == 0);
+}
+
+
+TEST_IMPL(ipc_threads_listen_after_write) {
+ run_ipc_threads_test(1);
+ return 0;
+}
+
+
+TEST_IMPL(ipc_threads_listen_before_write) {
+ run_ipc_threads_test(0);
+ return 0;
+}
View
4 test/test-list.h
@@ -24,6 +24,8 @@ TEST_DECLARE (tty)
TEST_DECLARE (stdio_over_pipes)
TEST_DECLARE (ipc_listen_before_write)
TEST_DECLARE (ipc_listen_after_write)
+TEST_DECLARE (ipc_threads_listen_after_write)
+TEST_DECLARE (ipc_threads_listen_before_write)
TEST_DECLARE (tcp_ping_pong)
TEST_DECLARE (tcp_ping_pong_v6)
TEST_DECLARE (pipe_ping_pong)
@@ -161,6 +163,8 @@ TASK_LIST_START
TEST_ENTRY (stdio_over_pipes)
TEST_ENTRY (ipc_listen_before_write)
TEST_ENTRY (ipc_listen_after_write)
+ TEST_ENTRY (ipc_threads_listen_after_write)
+ TEST_ENTRY (ipc_threads_listen_before_write)
TEST_ENTRY (tcp_ping_pong)
TEST_HELPER (tcp_ping_pong, tcp4_echo_server)
View
1  uv.gyp
@@ -301,6 +301,7 @@
'test/test-hrtime.c',
'test/test-idle.c',
'test/test-ipc.c',
+ 'test/test-ipc-threads.c',
'test/test-list.h',
'test/test-loop-handles.c',
'test/test-multiple-listen.c',
Please sign in to comment.
Something went wrong with that request. Please try again.