Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
...
Checking mergeability… Don’t worry, you can still create the pull request.
  • 5 commits
  • 11 files changed
  • 0 commit comments
  • 1 contributor
Showing with 415 additions and 29 deletions.
  1. +2 −0 include/uv-unix.h
  2. +6 −0 include/uv-win.h
  3. +23 −12 include/uv.h
  4. +281 −0 prefork.c
  5. +1 −1 src/uv-unix.c
  6. +4 −1 src/win/core.c
  7. +1 −0 src/win/internal.h
  8. +24 −11 src/win/process.c
  9. +4 −1 src/win/req.c
  10. +50 −3 src/win/tcp.c
  11. +19 −0 uv.gyp
View
2 include/uv-unix.h
@@ -39,6 +39,8 @@ typedef struct {
size_t len;
} uv_buf_t;
+#define UV_OBJECT_PRIVATE_FIELDS /* empty */
+
#define UV_REQ_BUFSML_SIZE (4)
#define UV_REQ_PRIVATE_FIELDS /* empty */
View
6 include/uv-win.h
@@ -51,6 +51,9 @@ typedef struct uv_buf_t {
UV_PROCESS_CLOSE, \
UV_UDP_RECV
+#define UV_OBJECT_PRIVATE_FIELDS \
+ /* empty */
+
#define UV_REQ_PRIVATE_FIELDS \
union { \
/* Used by I/O operations */ \
@@ -79,10 +82,13 @@ typedef struct uv_buf_t {
HANDLE pipeHandle; \
struct uv_pipe_accept_s* next_pending; \
} uv_pipe_accept_t; \
+ \
typedef struct uv_tcp_accept_s { \
UV_REQ_FIELDS \
SOCKET accept_socket; \
char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \
+ HANDLE event_handle; \
+ HANDLE wait_handle; \
struct uv_tcp_accept_s* next_pending; \
} uv_tcp_accept_t;
View
35 include/uv.h
@@ -42,6 +42,7 @@ typedef intptr_t ssize_t;
#endif
typedef struct uv_err_s uv_err_t;
+typedef struct uv_object_s uv_object_t;
typedef struct uv_handle_s uv_handle_t;
typedef struct uv_stream_s uv_stream_t;
typedef struct uv_tcp_s uv_tcp_t;
@@ -173,7 +174,7 @@ typedef enum {
} uv_err_code;
typedef enum {
- UV_UNKNOWN_HANDLE = 0,
+ UV_UNKNOWN_OBJECT = 0,
UV_TCP,
UV_UDP,
UV_NAMED_PIPE,
@@ -187,11 +188,7 @@ typedef enum {
UV_ARES_TASK,
UV_ARES_EVENT,
UV_GETADDRINFO,
- UV_PROCESS
-} uv_handle_type;
-
-typedef enum {
- UV_UNKNOWN_REQ = 0,
+ UV_PROCESS,
UV_CONNECT,
UV_ACCEPT,
UV_READ,
@@ -200,7 +197,7 @@ typedef enum {
UV_WAKEUP,
UV_UDP_SEND,
UV_REQ_TYPE_PRIVATE
-} uv_req_type;
+} uv_object_type;
struct uv_err_s {
@@ -221,9 +218,20 @@ char* uv_strerror(uv_err_t err);
const char* uv_err_name(uv_err_t err);
-#define UV_REQ_FIELDS \
+#define UV_OBJECT_FIELDS \
/* read-only */ \
- uv_req_type type; \
+ uv_object_type type; \
+ /* private */ \
+ UV_OBJECT_PRIVATE_FIELDS
+
+/* Abstract base class of all requests and handles. */
+struct uv_object_s {
+ UV_OBJECT_FIELDS
+};
+
+
+#define UV_REQ_FIELDS \
+ UV_OBJECT_FIELDS \
/* public */ \
void* data; \
/* private */ \
@@ -258,15 +266,14 @@ struct uv_shutdown_s {
#define UV_HANDLE_FIELDS \
- /* read-only */ \
- uv_handle_type type; \
+ UV_OBJECT_FIELDS \
/* public */ \
uv_close_cb close_cb; \
void* data; \
/* private */ \
UV_HANDLE_PRIVATE_FIELDS
-/* The abstract base class of all handles. */
+/* The abstract base class of all handles. */
struct uv_handle_s {
UV_HANDLE_FIELDS
};
@@ -397,6 +404,10 @@ struct uv_tcp_s {
int uv_tcp_init(uv_tcp_t* handle);
+#ifdef _WIN32
+ int uv_tcp_import(uv_tcp_t* handle, SOCKET sock);
+#endif
+
int uv_tcp_bind(uv_tcp_t* handle, struct sockaddr_in);
int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6);
View
281 prefork.c
@@ -0,0 +1,281 @@
+
+
+#include <uv.h>
+#include <io.h>
+#include <stdio.h>
+#include <string.h>
+
+int server_id;
+int accepted = 0;
+
+uv_tcp_t server;
+uv_timer_t timer;
+
+char exe_path[1024];
+size_t exe_path_size;
+
+char message[] = "HTTP 1.1 200 OK\r\nContent-Length: 12\r\nConnection: close\r\n\r\nhello world\n";
+
+#define CHECK(r) \
+ if (!(r)) abort();
+
+#define LOG(msg, ...) \
+ printf("Server %d: " ## msg, server_id, __VA_ARGS__); \
+
+
+typedef struct client_s {
+ uv_tcp_t handle;
+ uv_write_t write;
+ char read_buffer[8192];
+ struct client_s* next_free;
+} client_t;
+
+static client_t* free_client_list = NULL;
+
+static client_t* alloc_client() {
+ if (free_client_list == NULL) {
+ return (client_t*) malloc(sizeof(client_t));
+ } else {
+ client_t* client = free_client_list;
+ free_client_list = client->next_free;
+ return client;
+ }
+}
+
+static void free_client(client_t* client) {
+ client->next_free = free_client_list;
+ free_client_list = client;
+}
+
+static client_t* get_client_from_handle(uv_tcp_t* handle) {
+ return CONTAINING_RECORD(handle, client_t, handle);
+}
+
+static client_t* get_client_from_write(uv_write_t* write) {
+ return CONTAINING_RECORD(write, client_t, write);
+}
+
+
+void slave_close_cb(uv_handle_t* handle) {
+ free(handle);
+}
+
+void slave_pipe_close_cb(uv_handle_t* handle) {
+ free(handle);
+}
+
+
+void slave_exit_cb(uv_process_t* handle, int code, int sig) {
+ LOG("A child process exited with exit code %d\n", code);
+ uv_close((uv_handle_t*) handle->stdio_pipes[0].server_pipe, slave_pipe_close_cb);
+ uv_close((uv_handle_t*) handle, slave_close_cb);
+}
+
+void master_write_cb(uv_write_t* write, int status) {
+ CHECK(status == 0);
+
+ free(write->data);
+ free(write);
+}
+
+void spawn(int id, SOCKET sock) {
+ int r;
+ uv_pipe_t* in;
+ uv_process_t* process;
+ WSAPROTOCOL_INFOW* blob;
+ uv_process_options_t options;
+ char* args[4];
+ char id_str[3];
+ uv_write_t* wr_req;
+ uv_buf_t buf;
+
+ in = malloc(sizeof *in);
+ process = malloc(sizeof *process);
+
+ _snprintf(id_str, sizeof id_str, "%d", id);
+
+ args[0] = exe_path;
+ args[1] = "--child";
+ args[2] = id_str;
+ args[3] = NULL;
+
+ r = uv_pipe_init(in);
+ CHECK(r == 0);
+
+ memset(&options, 0, sizeof options);
+ options.file = exe_path;
+ options.args = args;
+ options.exit_cb = slave_exit_cb;
+ options.stdin_stream = in;
+
+ r = uv_spawn(process, options);
+ CHECK(r == 0);
+
+ // Duplicate the socket and send to to the child process
+ blob = malloc(sizeof *blob);
+ wr_req = malloc(sizeof *wr_req);
+
+ r = WSADuplicateSocketW(sock, GetProcessId(process->process_handle), blob);
+ CHECK(r == 0);
+
+ buf = uv_buf_init((char*) blob, sizeof *blob);
+ uv_write(wr_req, (uv_stream_t*) process->stdio_pipes[0].server_pipe, &buf, 1, master_write_cb);
+ wr_req->data = buf.base;
+}
+
+
+static void cl_close_cb(uv_handle_t* handle) {
+ free_client(get_client_from_handle((uv_tcp_t*) handle));
+}
+
+static void cl_write_cb(uv_write_t* req, int status) {
+ uv_close((uv_handle_t*) req->handle, cl_close_cb);
+}
+
+
+static void cl_write(uv_tcp_t* handle) {
+ int r;
+ client_t* client = get_client_from_handle(handle);
+ uv_buf_t buf = uv_buf_init(message, (sizeof message) - 1);
+ uv_write_t* req = &client->write;
+
+ r = uv_write(req, (uv_stream_t*) handle, &buf, 1, cl_write_cb);
+ if (r) {
+ LOG("error");
+ uv_close((uv_handle_t*) handle, cl_close_cb);
+ }
+
+
+ // Pretend our server is very busy:
+ // Sleep(10);
+}
+
+volatile int fubar;
+
+
+static void cl_read_cb(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
+ uv_tcp_t* tcp = (uv_tcp_t*) handle;
+ int r;
+
+ r = uv_read_stop(handle);
+ CHECK(r == 0);
+
+ // Do some work
+ for (fubar = 50000; fubar > 0; fubar--) {}
+
+ cl_write(tcp);
+}
+
+uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) {
+ client_t* client = get_client_from_handle((uv_tcp_t*) handle);
+ uv_buf_t buf;
+ buf.base = client->read_buffer;
+ buf.len = sizeof client->read_buffer;
+ return buf;
+}
+
+
+void connection_cb(uv_stream_t* server, int status) {
+ int r;
+ client_t* client = alloc_client();
+
+ CHECK(status == 0);
+
+ r = uv_tcp_init(&client->handle);
+ CHECK(r == 0);
+
+ r = uv_accept(server, (uv_stream_t*) &client->handle);
+ CHECK(r == 0);
+
+ accepted++;
+
+ r = uv_read_start((uv_stream_t*) &client->handle, alloc_cb, cl_read_cb);
+ CHECK(r == 0);
+}
+
+
+void timer_cb(uv_timer_t* timer, int status) {
+ LOG("accepted %d connections\n", accepted);
+}
+
+
+void master() {
+ int r;
+
+ r = uv_tcp_init(&server);
+ CHECK(r == 0);
+
+ r = uv_tcp_bind(&server, uv_ip4_addr("0.0.0.0", 80));
+ CHECK(r == 0);
+
+ exe_path_size = sizeof exe_path;
+ r = uv_exepath(exe_path, &exe_path_size);
+ CHECK(r == 0);
+ exe_path[exe_path_size] = '\0';
+}
+
+
+void slave() {
+ int r;
+ HANDLE in = (HANDLE) _get_osfhandle(0);
+ WSAPROTOCOL_INFOW blob;
+ DWORD bytes_read;
+ SOCKET sock;
+
+ r = ReadFile(in, (void*) &blob, sizeof blob, &bytes_read, NULL);
+ CHECK(r);
+ CHECK(bytes_read == sizeof blob);
+
+ sock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_IP, &blob, 0, WSA_FLAG_OVERLAPPED);
+ CHECK(sock != INVALID_SOCKET);
+
+ r = uv_tcp_import(&server, sock);
+ CHECK(r == 0);
+}
+
+
+int main(int argv, char** argc) {
+ int r;
+
+ uv_init();
+
+ if (argv < 3) {
+ int i;
+ int num_children = 0;
+
+ if (argv == 2) {
+ num_children = strtol(argc[1], NULL, 10);
+ }
+
+ /* We're the master process */
+ server_id = 0;
+
+ master();
+
+ // Start listening now
+ LOG("listen\n");
+ r = uv_listen((uv_stream_t*) &server, 512, connection_cb);
+ CHECK(r == 0);
+
+ // Spawn slaves
+ for (i = num_children; i > 0; i--) {
+ spawn(i, server.socket);
+ }
+ } else {
+ /* We're a slave process */
+ server_id = strtol(argc[2], NULL, 10);
+ slave();
+
+ // Start listening now
+ LOG("listen\n");
+ r = uv_listen((uv_stream_t*) &server, 10, connection_cb);
+ CHECK(r == 0);
+ }
+
+
+ r = uv_timer_init(&timer);
+ CHECK(r == 0);
+ uv_timer_start(&timer, timer_cb, 1000, 1000);
+
+ uv_run();
+}
View
2 src/uv-unix.c
@@ -1837,7 +1837,7 @@ int uv_read_stop(uv_stream_t* stream) {
void uv__req_init(uv_req_t* req) {
uv_counters()->req_init++;
- req->type = UV_UNKNOWN_REQ;
+ req->type = UV_UNKNOWN_OBJECT;
req->data = NULL;
}
View
5 src/win/core.c
@@ -156,6 +156,7 @@ static void uv_poll_ex(int block) {
#define UV_LOOP(poll) \
+ int mb; \
while (LOOP->refs > 0) { \
uv_update_time(); \
uv_process_timers(); \
@@ -168,9 +169,11 @@ static void uv_poll_ex(int block) {
/* Completely flush all pending reqs and endgames. */ \
/* We do even when we just called the idle callbacks because those may */ \
/* have closed handles or started requests that short-circuited. */ \
+ mb = 100; \
while (LOOP->pending_reqs_tail || LOOP->endgame_handles) { \
uv_process_endgames(); \
uv_process_reqs(); \
+ if (--mb <= 0) break; \
} \
\
if (LOOP->refs <= 0) { \
@@ -179,7 +182,7 @@ static void uv_poll_ex(int block) {
\
uv_prepare_invoke(); \
\
- poll(LOOP->idle_handles == NULL && LOOP->refs > 0); \
+ poll(LOOP->idle_handles == NULL && LOOP->pending_reqs_tail == NULL && LOOP->refs > 0); \
\
uv_check_invoke(); \
}
View
1 src/win/internal.h
@@ -109,6 +109,7 @@ extern uv_loop_t uv_main_loop_;
#define UV_HANDLE_UV_ALLOCED 0x20000
#define UV_HANDLE_SYNC_BYPASS_IOCP 0x40000
#define UV_HANDLE_ZERO_READ 0x80000
+#define UV_HANDLE_EMULATE_IOCP 0x100000
void uv_want_endgame(uv_handle_t* handle);
void uv_process_endgames();
View
35 src/win/process.c
@@ -841,6 +841,15 @@ int uv_spawn(uv_process_t* process, uv_process_options_t options) {
application_path = application;
}
+ /* Initialize the startup struct minus the stdio handles. */
+ startup.cb = sizeof(startup);
+ startup.lpReserved = NULL;
+ startup.lpDesktop = NULL;
+ startup.lpTitle = NULL;
+ startup.dwFlags = STARTF_USESTDHANDLES;
+ startup.cbReserved2 = 0;
+ startup.lpReserved2 = NULL;
+
/* Create stdio pipes. */
if (options.stdin_stream) {
err = uv_create_stdio_pipe_pair(options.stdin_stream, &process->stdio_pipes[0].child_pipe, PIPE_ACCESS_OUTBOUND, GENERIC_READ | FILE_WRITE_ATTRIBUTES);
@@ -849,6 +858,11 @@ int uv_spawn(uv_process_t* process, uv_process_options_t options) {
}
process->stdio_pipes[0].server_pipe = options.stdin_stream;
+ startup.hStdInput = process->stdio_pipes[0].child_pipe;
+ } else {
+ process->stdio_pipes[0].server_pipe = NULL;
+ process->stdio_pipes[0].child_pipe = INVALID_HANDLE_VALUE;
+ startup.hStdInput = GetStdHandle(STD_INPUT_HANDLE);
}
if (options.stdout_stream) {
@@ -858,6 +872,11 @@ int uv_spawn(uv_process_t* process, uv_process_options_t options) {
}
process->stdio_pipes[1].server_pipe = options.stdout_stream;
+ startup.hStdOutput = process->stdio_pipes[1].child_pipe;
+ } else {
+ process->stdio_pipes[1].server_pipe = NULL;
+ process->stdio_pipes[1].child_pipe = INVALID_HANDLE_VALUE;
+ startup.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE);
}
if (options.stderr_stream) {
@@ -867,19 +886,13 @@ int uv_spawn(uv_process_t* process, uv_process_options_t options) {
}
process->stdio_pipes[2].server_pipe = options.stderr_stream;
+ startup.hStdError = process->stdio_pipes[2].child_pipe;
+ } else {
+ process->stdio_pipes[2].server_pipe = NULL;
+ process->stdio_pipes[2].child_pipe = INVALID_HANDLE_VALUE;
+ startup.hStdError = GetStdHandle(STD_ERROR_HANDLE);
}
- startup.cb = sizeof(startup);
- startup.lpReserved = NULL;
- startup.lpDesktop = NULL;
- startup.lpTitle = NULL;
- startup.dwFlags = STARTF_USESTDHANDLES;
- startup.cbReserved2 = 0;
- startup.lpReserved2 = NULL;
- startup.hStdInput = process->stdio_pipes[0].child_pipe;
- startup.hStdOutput = process->stdio_pipes[1].child_pipe;
- startup.hStdError = process->stdio_pipes[2].child_pipe;
-
if (CreateProcessW(application_path,
arguments,
NULL,
View
5 src/win/req.c
@@ -28,7 +28,7 @@
void uv_req_init(uv_req_t* req) {
uv_counters()->req_init++;
- req->type = UV_UNKNOWN_REQ;
+ req->type = UV_UNKNOWN_OBJECT;
SET_REQ_SUCCESS(req);
}
@@ -91,6 +91,7 @@ static uv_req_t* uv_remove_pending_req() {
void uv_process_reqs() {
uv_req_t* req;
+ int mb = 100;
while (req = uv_remove_pending_req()) {
switch (req->type) {
@@ -153,5 +154,7 @@ void uv_process_reqs() {
default:
assert(0);
}
+
+ if (mb-- <= 0) break;
}
}
View
53 src/win/tcp.c
@@ -32,7 +32,7 @@
* the optimization is temporarily disabled (threshold=0). This will be
* revisited once node allocator is improved.)
*/
-const unsigned int uv_active_tcp_streams_threshold = 0;
+const unsigned int uv_active_tcp_streams_threshold = 1000;
/*
* Number of simultaneous pending AcceptEx calls.
@@ -69,8 +69,7 @@ static int uv_tcp_set_socket(uv_tcp_t* handle, SOCKET socket) {
LOOP->iocp,
(ULONG_PTR)socket,
0) == NULL) {
- uv_set_sys_error(GetLastError());
- return -1;
+ handle->flags |= UV_HANDLE_EMULATE_IOCP;
}
if (pSetFileCompletionNotificationModes) {
@@ -104,6 +103,23 @@ int uv_tcp_init(uv_tcp_t* handle) {
}
+int uv_tcp_import(uv_tcp_t* handle, SOCKET sock) {
+ int r;
+
+ if (uv_tcp_init(handle) != 0) {
+ return -1;
+ }
+
+ if (uv_tcp_set_socket(handle, sock) != 0) {
+ return -1;
+ }
+
+ handle->flags |= UV_HANDLE_BOUND;
+
+ return 0;
+}
+
+
void uv_tcp_endgame(uv_tcp_t* handle) {
uv_err_t err;
int status;
@@ -212,6 +228,14 @@ int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) {
}
}
+static void CALLBACK post_completion(void* context, BOOLEAN timed_out) {
+ uv_req_t* req = (uv_req_t*) context;
+
+ assert(req != NULL);
+ assert(!timed_out);
+
+ PostQueuedCompletionStatus(LOOP->iocp, req->overlapped.InternalHigh, 0, &req->overlapped);
+}
static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
BOOL success;
@@ -243,6 +267,9 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
/* Prepare the overlapped structure. */
memset(&(req->overlapped), 0, sizeof(req->overlapped));
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1);
+ }
success = pAcceptExFamily(handle->socket,
accept_socket,
@@ -262,13 +289,24 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
/* The req will be processed with IOCP. */
req->accept_socket = accept_socket;
handle->reqs_pending++;
+ if ((handle->flags & UV_HANDLE_EMULATE_IOCP) && !req->wait_handle) {
+ if (!RegisterWaitForSingleObject(&req->wait_handle, req->overlapped.hEvent, post_completion, (void*) req, INFINITE, WT_EXECUTEINWAITTHREAD)) {
+ SET_REQ_ERROR(req, GetLastError());
+ uv_insert_pending_req((uv_req_t*)req);
+ return;
+ }
+ }
} else {
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, WSAGetLastError());
uv_insert_pending_req((uv_req_t*)req);
handle->reqs_pending++;
/* Destroy the preallocated client socket. */
closesocket(accept_socket);
+ /* Destroy the event handle */
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ CloseHandle(req->overlapped.hEvent);
+ }
}
}
@@ -371,6 +409,15 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
req->type = UV_ACCEPT;
req->accept_socket = INVALID_SOCKET;
req->data = handle;
+ req->wait_handle = NULL;
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ req->event_handle = CreateEvent(NULL, 0, 0, NULL);
+ if (!req->event_handle) {
+ uv_fatal_error(GetLastError(), "CreateEvent");
+ }
+ } else {
+ req->event_handle = NULL;
+ }
uv_tcp_queue_accept(handle, req);
}
View
19 uv.gyp
@@ -261,6 +261,25 @@
},
{
+ 'target_name': 'prefork',
+ 'type': 'executable',
+ 'dependencies': [ 'uv' ],
+ 'sources': [
+ 'prefork.c',
+ ],
+ 'conditions': [
+ [ 'OS=="win"', {
+ 'libraries': [ 'ws2_32.lib' ]
+ }]
+ ],
+ 'msvs-settings': {
+ 'VCLinkerTool': {
+ 'SubSystem': 1, # /subsystem:console
+ },
+ },
+ },
+
+ {
'target_name': 'run-benchmarks',
'type': 'executable',
'dependencies': [ 'uv' ],

No commit comments for this range

Something went wrong with that request. Please try again.