Permalink
Browse files

Prefork server experiment

  • Loading branch information...
piscisaureus committed Aug 30, 2011
1 parent b087c21 commit cfc5182bd5f75c0271006cda9caffe22500e8a86
Showing with 308 additions and 13 deletions.
  1. +1 −0 include/uv-win.h
  2. +4 −0 include/uv.h
  3. +207 −0 prefork.c
  4. +1 −0 src/win/internal.h
  5. +24 −11 src/win/process.c
  6. +52 −2 src/win/tcp.c
  7. +19 −0 uv.gyp
View
@@ -61,6 +61,7 @@ typedef struct uv_buf_t {
OVERLAPPED overlapped; \
size_t queued_bytes; \
}; \
+ HANDLE wait; \
}; \
struct uv_req_s* next_req;
View
@@ -404,6 +404,10 @@ struct uv_tcp_s {
int uv_tcp_init(uv_tcp_t* handle);
+#ifdef _WIN32
+ int uv_tcp_import(uv_tcp_t* handle, SOCKET sock);
+#endif
+
int uv_tcp_bind(uv_tcp_t* handle, struct sockaddr_in);
int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6);
View
207 prefork.c
@@ -0,0 +1,207 @@
+
+
+#include <uv.h>
+#include <io.h>
+#include <stdio.h>
+#include <string.h>
+
+#define NUM_CHILDREN 3 /* Not including the master server */
+
+int server_id;
+int accepted = 0;
+
+uv_tcp_t server;
+uv_timer_t timer;
+
+char exe_path[1024];
+size_t exe_path_size;
+
+char message[] = "HTTP 1.1 200 OK\r\nContent-Length: 12\r\nConnection: close\r\n\r\nhello world\n";
+
+#define CHECK(r) \
+ if (!(r)) abort();
+
+#define LOG(msg, ...) \
+ printf("Server %d: " ## msg, server_id, __VA_ARGS__); \
+
+
+void slave_close_cb(uv_handle_t* handle) {
+ free(handle);
+}
+
+void slave_pipe_close_cb(uv_handle_t* handle) {
+ free(handle);
+}
+
+
+void slave_exit_cb(uv_process_t* handle, int code, int sig) {
+ LOG("A child process exited with exit code %d\n", code);
+ uv_close((uv_handle_t*) handle->stdio_pipes[0].server_pipe, slave_pipe_close_cb);
+ uv_close((uv_handle_t*) handle, slave_close_cb);
+}
+
+void master_write_cb(uv_write_t* write, int status) {
+ CHECK(status == 0);
+
+ free(write->data);
+ free(write);
+}
+
+void spawn(int id, SOCKET sock) {
+ int r;
+ uv_pipe_t* in;
+ uv_process_t* process;
+ WSAPROTOCOL_INFOW* blob;
+ uv_process_options_t options;
+ char* args[3];
+ char id_str[3];
+ uv_write_t* wr_req;
+ uv_buf_t buf;
+
+ in = malloc(sizeof *in);
+ process = malloc(sizeof *process);
+
+ _snprintf(id_str, sizeof id_str, "%d", id);
+
+ args[0] = exe_path;
+ args[1] = id_str;
+ args[2] = NULL;
+
+ r = uv_pipe_init(in);
+ CHECK(r == 0);
+
+ memset(&options, 0, sizeof options);
+ options.file = exe_path;
+ options.args = args;
+ options.exit_cb = slave_exit_cb;
+ options.stdin_stream = in;
+
+ r = uv_spawn(process, options);
+ CHECK(r == 0);
+
+ // Duplicate the socket and send to to the child process
+ blob = malloc(sizeof *blob);
+ wr_req = malloc(sizeof *wr_req);
+
+ r = WSADuplicateSocketW(sock, GetProcessId(process->process_handle), blob);
+ CHECK(r == 0);
+
+ buf = uv_buf_init((char*) blob, sizeof *blob);
+ uv_write(wr_req, (uv_stream_t*) process->stdio_pipes[0].server_pipe, &buf, 1, master_write_cb);
+ wr_req->data = buf.base;
+}
+
+
+void cl_close_cb(uv_handle_t* handle) {
+ free(handle);
+}
+
+void cl_write_cb(uv_write_t* req, int status) {
+ CHECK(status == 0);
+
+ uv_close((uv_handle_t*) req->handle, cl_close_cb);
+
+ free(req);
+}
+
+
+void cl_write(uv_tcp_t* handle) {
+ int r;
+ uv_buf_t buf = uv_buf_init(message, (sizeof message) - 1);
+ uv_write_t* req = malloc(sizeof *req);
+
+ r = uv_write(req, (uv_stream_t*) handle, &buf, 1, cl_write_cb);
+ CHECK(r == 0);
+
+ // Pretend our server is very busy:
+ // Sleep(10);
+}
+
+void connection_cb(uv_stream_t* server, int status) {
+ int r;
+ uv_tcp_t* client = (uv_tcp_t*) malloc(sizeof *client);
+
+ CHECK(status == 0);
+
+ r = uv_tcp_init(client);
+ CHECK(r == 0);
+
+ r = uv_accept(server, (uv_stream_t*) client);
+ CHECK(r == 0);
+
+ accepted++;
+
+ cl_write(client);
+}
+
+
+void timer_cb(uv_timer_t* timer, int status) {
+ LOG("accepted %d connections\n", accepted);
+}
+
+
+void master() {
+ int i, r;
+
+ r = uv_tcp_init(&server);
+ CHECK(r == 0);
+
+ r = uv_tcp_bind(&server, uv_ip4_addr("0.0.0.0", 8000));
+ CHECK(r == 0);
+
+ exe_path_size = sizeof exe_path;
+ r = uv_exepath(exe_path, &exe_path_size);
+ CHECK(r == 0);
+ exe_path[exe_path_size] = '\0';
+
+ // Spawn slaves
+ for (i = NUM_CHILDREN; i > 0; i--) {
+ spawn(i, server.socket);
+ }
+}
+
+
+void slave() {
+ int r;
+ HANDLE in = (HANDLE) _get_osfhandle(0);
+ WSAPROTOCOL_INFOW blob;
+ DWORD bytes_read;
+ SOCKET sock;
+
+ r = ReadFile(in, (void*) &blob, sizeof blob, &bytes_read, NULL);
+ CHECK(r);
+ CHECK(bytes_read == sizeof blob);
+
+ sock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_IP, &blob, 0, WSA_FLAG_OVERLAPPED);
+ CHECK(sock != INVALID_SOCKET);
+
+ r = uv_tcp_import(&server, sock);
+ CHECK(r == 0);
+}
+
+int main(int argv, char** argc) {
+ int r;
+
+ uv_init();
+
+ if (argv == 1) {
+ /* We're the master process */
+ server_id = 0;
+ master();
+
+ } else {
+ /* We're a slave process */
+ server_id = strtol(argc[1], NULL, 10);
+ slave();
+ }
+
+ // Start listening now
+ r = uv_listen((uv_stream_t*) &server, 512, connection_cb);
+ CHECK(r == 0);
+
+ r = uv_timer_init(&timer);
+ CHECK(r == 0);
+ uv_timer_start(&timer, timer_cb, 1000, 1000);
+
+ uv_run();
+}
View
@@ -109,6 +109,7 @@ extern uv_loop_t uv_main_loop_;
#define UV_HANDLE_UV_ALLOCED 0x20000
#define UV_HANDLE_SYNC_BYPASS_IOCP 0x40000
#define UV_HANDLE_ZERO_READ 0x80000
+#define UV_HANDLE_EMULATE_IOCP 0x100000
void uv_want_endgame(uv_handle_t* handle);
void uv_process_endgames();
View
@@ -841,6 +841,15 @@ int uv_spawn(uv_process_t* process, uv_process_options_t options) {
application_path = application;
}
+ /* Initialize the startup struct minus the stdio handles. */
+ startup.cb = sizeof(startup);
+ startup.lpReserved = NULL;
+ startup.lpDesktop = NULL;
+ startup.lpTitle = NULL;
+ startup.dwFlags = STARTF_USESTDHANDLES;
+ startup.cbReserved2 = 0;
+ startup.lpReserved2 = NULL;
+
/* Create stdio pipes. */
if (options.stdin_stream) {
err = uv_create_stdio_pipe_pair(options.stdin_stream, &process->stdio_pipes[0].child_pipe, PIPE_ACCESS_OUTBOUND, GENERIC_READ | FILE_WRITE_ATTRIBUTES);
@@ -849,6 +858,11 @@ int uv_spawn(uv_process_t* process, uv_process_options_t options) {
}
process->stdio_pipes[0].server_pipe = options.stdin_stream;
+ startup.hStdInput = process->stdio_pipes[0].child_pipe;
+ } else {
+ process->stdio_pipes[0].server_pipe = NULL;
+ process->stdio_pipes[0].child_pipe = INVALID_HANDLE_VALUE;
+ startup.hStdInput = GetStdHandle(STD_INPUT_HANDLE);
}
if (options.stdout_stream) {
@@ -858,6 +872,11 @@ int uv_spawn(uv_process_t* process, uv_process_options_t options) {
}
process->stdio_pipes[1].server_pipe = options.stdout_stream;
+ startup.hStdOutput = process->stdio_pipes[1].child_pipe;
+ } else {
+ process->stdio_pipes[1].server_pipe = NULL;
+ process->stdio_pipes[1].child_pipe = INVALID_HANDLE_VALUE;
+ startup.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE);
}
if (options.stderr_stream) {
@@ -867,19 +886,13 @@ int uv_spawn(uv_process_t* process, uv_process_options_t options) {
}
process->stdio_pipes[2].server_pipe = options.stderr_stream;
+ startup.hStdError = process->stdio_pipes[2].child_pipe;
+ } else {
+ process->stdio_pipes[2].server_pipe = NULL;
+ process->stdio_pipes[2].child_pipe = INVALID_HANDLE_VALUE;
+ startup.hStdError = GetStdHandle(STD_ERROR_HANDLE);
}
- startup.cb = sizeof(startup);
- startup.lpReserved = NULL;
- startup.lpDesktop = NULL;
- startup.lpTitle = NULL;
- startup.dwFlags = STARTF_USESTDHANDLES;
- startup.cbReserved2 = 0;
- startup.lpReserved2 = NULL;
- startup.hStdInput = process->stdio_pipes[0].child_pipe;
- startup.hStdOutput = process->stdio_pipes[1].child_pipe;
- startup.hStdError = process->stdio_pipes[2].child_pipe;
-
if (CreateProcessW(application_path,
arguments,
NULL,
View
@@ -69,8 +69,7 @@ static int uv_tcp_set_socket(uv_tcp_t* handle, SOCKET socket) {
LOOP->iocp,
(ULONG_PTR)socket,
0) == NULL) {
- uv_set_sys_error(GetLastError());
- return -1;
+ handle->flags |= UV_HANDLE_EMULATE_IOCP;
}
if (pSetFileCompletionNotificationModes) {
@@ -104,6 +103,23 @@ int uv_tcp_init(uv_tcp_t* handle) {
}
+int uv_tcp_import(uv_tcp_t* handle, SOCKET sock) {
+ int r;
+
+ if (uv_tcp_init(handle) != 0) {
+ return -1;
+ }
+
+ if (uv_tcp_set_socket(handle, sock) != 0) {
+ return -1;
+ }
+
+ handle->flags |= UV_HANDLE_BOUND;
+
+ return 0;
+}
+
+
void uv_tcp_endgame(uv_tcp_t* handle) {
uv_err_t err;
int status;
@@ -212,6 +228,18 @@ int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) {
}
}
+static void CALLBACK post_completion(void* context, BOOLEAN timed_out) {
+ uv_req_t* req = (uv_req_t*) context;
+
+ assert(req != NULL);
+ assert(!timed_out);
+
+ UnregisterWait(req->wait);
+ CloseHandle(req->overlapped.hEvent);
+ req->overlapped.hEvent = NULL;
+
+ PostQueuedCompletionStatus(LOOP->iocp, req->overlapped.InternalHigh, NULL, &req->overlapped);
+}
static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
BOOL success;
@@ -244,6 +272,17 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
/* Prepare the overlapped structure. */
memset(&(req->overlapped), 0, sizeof(req->overlapped));
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ req->overlapped.hEvent = CreateEvent(NULL, 0, 0, NULL);
+ if (!req->overlapped.hEvent) {
+ SET_REQ_ERROR(req, GetLastError());
+ uv_insert_pending_req((uv_req_t*)req);
+ handle->reqs_pending++;
+ return;
+ }
+ req->overlapped.hEvent = (HANDLE) ((DWORD) req->overlapped.hEvent | 1);
+ }
+
success = pAcceptExFamily(handle->socket,
accept_socket,
(void*)req->accept_buffer,
@@ -262,13 +301,24 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
/* The req will be processed with IOCP. */
req->accept_socket = accept_socket;
handle->reqs_pending++;
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ if (!RegisterWaitForSingleObject(&req->wait, req->overlapped.hEvent, post_completion, (void*) req, INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
+ SET_REQ_ERROR(req, GetLastError());
+ uv_insert_pending_req((uv_req_t*)req);
+ return;
+ }
+ }
} else {
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, WSAGetLastError());
uv_insert_pending_req((uv_req_t*)req);
handle->reqs_pending++;
/* Destroy the preallocated client socket. */
closesocket(accept_socket);
+ /* Destroy the event handle */
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ CloseHandle(req->overlapped.hEvent);
+ }
}
}
Oops, something went wrong.

0 comments on commit cfc5182

Please sign in to comment.