Skip to content

Commit

Permalink
Prefork server experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
piscisaureus committed Aug 30, 2011
1 parent b087c21 commit cfc5182
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 13 deletions.
1 change: 1 addition & 0 deletions include/uv-win.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ typedef struct uv_buf_t {
OVERLAPPED overlapped; \
size_t queued_bytes; \
}; \
HANDLE wait; \
}; \
struct uv_req_s* next_req;

Expand Down
4 changes: 4 additions & 0 deletions include/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,10 @@ struct uv_tcp_s {

int uv_tcp_init(uv_tcp_t* handle);

#ifdef _WIN32
int uv_tcp_import(uv_tcp_t* handle, SOCKET sock);
#endif

int uv_tcp_bind(uv_tcp_t* handle, struct sockaddr_in);
int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6);

Expand Down
207 changes: 207 additions & 0 deletions prefork.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@


#include <uv.h>
#include <io.h>
#include <stdio.h>
#include <string.h>

#define NUM_CHILDREN 3 /* Not including the master server */

int server_id;
int accepted = 0;

uv_tcp_t server;
uv_timer_t timer;

char exe_path[1024];
size_t exe_path_size;

char message[] = "HTTP 1.1 200 OK\r\nContent-Length: 12\r\nConnection: close\r\n\r\nhello world\n";

#define CHECK(r) \
if (!(r)) abort();

#define LOG(msg, ...) \
printf("Server %d: " ## msg, server_id, __VA_ARGS__); \


void slave_close_cb(uv_handle_t* handle) {
free(handle);
}

void slave_pipe_close_cb(uv_handle_t* handle) {
free(handle);
}


void slave_exit_cb(uv_process_t* handle, int code, int sig) {
LOG("A child process exited with exit code %d\n", code);
uv_close((uv_handle_t*) handle->stdio_pipes[0].server_pipe, slave_pipe_close_cb);
uv_close((uv_handle_t*) handle, slave_close_cb);
}

void master_write_cb(uv_write_t* write, int status) {
CHECK(status == 0);

free(write->data);
free(write);
}

void spawn(int id, SOCKET sock) {
int r;
uv_pipe_t* in;
uv_process_t* process;
WSAPROTOCOL_INFOW* blob;
uv_process_options_t options;
char* args[3];
char id_str[3];
uv_write_t* wr_req;
uv_buf_t buf;

in = malloc(sizeof *in);
process = malloc(sizeof *process);

_snprintf(id_str, sizeof id_str, "%d", id);

args[0] = exe_path;
args[1] = id_str;
args[2] = NULL;

r = uv_pipe_init(in);
CHECK(r == 0);

memset(&options, 0, sizeof options);
options.file = exe_path;
options.args = args;
options.exit_cb = slave_exit_cb;
options.stdin_stream = in;

r = uv_spawn(process, options);
CHECK(r == 0);

// Duplicate the socket and send to to the child process
blob = malloc(sizeof *blob);
wr_req = malloc(sizeof *wr_req);

r = WSADuplicateSocketW(sock, GetProcessId(process->process_handle), blob);
CHECK(r == 0);

buf = uv_buf_init((char*) blob, sizeof *blob);
uv_write(wr_req, (uv_stream_t*) process->stdio_pipes[0].server_pipe, &buf, 1, master_write_cb);
wr_req->data = buf.base;
}


void cl_close_cb(uv_handle_t* handle) {
free(handle);
}

void cl_write_cb(uv_write_t* req, int status) {
CHECK(status == 0);

uv_close((uv_handle_t*) req->handle, cl_close_cb);

free(req);
}


void cl_write(uv_tcp_t* handle) {
int r;
uv_buf_t buf = uv_buf_init(message, (sizeof message) - 1);
uv_write_t* req = malloc(sizeof *req);

r = uv_write(req, (uv_stream_t*) handle, &buf, 1, cl_write_cb);
CHECK(r == 0);

// Pretend our server is very busy:
// Sleep(10);
}

void connection_cb(uv_stream_t* server, int status) {
int r;
uv_tcp_t* client = (uv_tcp_t*) malloc(sizeof *client);

CHECK(status == 0);

r = uv_tcp_init(client);
CHECK(r == 0);

r = uv_accept(server, (uv_stream_t*) client);
CHECK(r == 0);

accepted++;

cl_write(client);
}


void timer_cb(uv_timer_t* timer, int status) {
LOG("accepted %d connections\n", accepted);
}


void master() {
int i, r;

r = uv_tcp_init(&server);
CHECK(r == 0);

r = uv_tcp_bind(&server, uv_ip4_addr("0.0.0.0", 8000));
CHECK(r == 0);

exe_path_size = sizeof exe_path;
r = uv_exepath(exe_path, &exe_path_size);
CHECK(r == 0);
exe_path[exe_path_size] = '\0';

// Spawn slaves
for (i = NUM_CHILDREN; i > 0; i--) {
spawn(i, server.socket);
}
}


void slave() {
int r;
HANDLE in = (HANDLE) _get_osfhandle(0);
WSAPROTOCOL_INFOW blob;
DWORD bytes_read;
SOCKET sock;

r = ReadFile(in, (void*) &blob, sizeof blob, &bytes_read, NULL);
CHECK(r);
CHECK(bytes_read == sizeof blob);

sock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_IP, &blob, 0, WSA_FLAG_OVERLAPPED);
CHECK(sock != INVALID_SOCKET);

r = uv_tcp_import(&server, sock);
CHECK(r == 0);
}

int main(int argv, char** argc) {
int r;

uv_init();

if (argv == 1) {
/* We're the master process */
server_id = 0;
master();

} else {
/* We're a slave process */
server_id = strtol(argc[1], NULL, 10);
slave();
}

// Start listening now
r = uv_listen((uv_stream_t*) &server, 512, connection_cb);
CHECK(r == 0);

r = uv_timer_init(&timer);
CHECK(r == 0);
uv_timer_start(&timer, timer_cb, 1000, 1000);

uv_run();
}
1 change: 1 addition & 0 deletions src/win/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ extern uv_loop_t uv_main_loop_;
#define UV_HANDLE_UV_ALLOCED 0x20000
#define UV_HANDLE_SYNC_BYPASS_IOCP 0x40000
#define UV_HANDLE_ZERO_READ 0x80000
#define UV_HANDLE_EMULATE_IOCP 0x100000

void uv_want_endgame(uv_handle_t* handle);
void uv_process_endgames();
Expand Down
35 changes: 24 additions & 11 deletions src/win/process.c
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,15 @@ int uv_spawn(uv_process_t* process, uv_process_options_t options) {
application_path = application;
}

/* Initialize the startup struct minus the stdio handles. */
startup.cb = sizeof(startup);
startup.lpReserved = NULL;
startup.lpDesktop = NULL;
startup.lpTitle = NULL;
startup.dwFlags = STARTF_USESTDHANDLES;
startup.cbReserved2 = 0;
startup.lpReserved2 = NULL;

/* Create stdio pipes. */
if (options.stdin_stream) {
err = uv_create_stdio_pipe_pair(options.stdin_stream, &process->stdio_pipes[0].child_pipe, PIPE_ACCESS_OUTBOUND, GENERIC_READ | FILE_WRITE_ATTRIBUTES);
Expand All @@ -849,6 +858,11 @@ int uv_spawn(uv_process_t* process, uv_process_options_t options) {
}

process->stdio_pipes[0].server_pipe = options.stdin_stream;
startup.hStdInput = process->stdio_pipes[0].child_pipe;
} else {
process->stdio_pipes[0].server_pipe = NULL;
process->stdio_pipes[0].child_pipe = INVALID_HANDLE_VALUE;
startup.hStdInput = GetStdHandle(STD_INPUT_HANDLE);
}

if (options.stdout_stream) {
Expand All @@ -858,6 +872,11 @@ int uv_spawn(uv_process_t* process, uv_process_options_t options) {
}

process->stdio_pipes[1].server_pipe = options.stdout_stream;
startup.hStdOutput = process->stdio_pipes[1].child_pipe;
} else {
process->stdio_pipes[1].server_pipe = NULL;
process->stdio_pipes[1].child_pipe = INVALID_HANDLE_VALUE;
startup.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE);
}

if (options.stderr_stream) {
Expand All @@ -867,19 +886,13 @@ int uv_spawn(uv_process_t* process, uv_process_options_t options) {
}

process->stdio_pipes[2].server_pipe = options.stderr_stream;
startup.hStdError = process->stdio_pipes[2].child_pipe;
} else {
process->stdio_pipes[2].server_pipe = NULL;
process->stdio_pipes[2].child_pipe = INVALID_HANDLE_VALUE;
startup.hStdError = GetStdHandle(STD_ERROR_HANDLE);
}

startup.cb = sizeof(startup);
startup.lpReserved = NULL;
startup.lpDesktop = NULL;
startup.lpTitle = NULL;
startup.dwFlags = STARTF_USESTDHANDLES;
startup.cbReserved2 = 0;
startup.lpReserved2 = NULL;
startup.hStdInput = process->stdio_pipes[0].child_pipe;
startup.hStdOutput = process->stdio_pipes[1].child_pipe;
startup.hStdError = process->stdio_pipes[2].child_pipe;

if (CreateProcessW(application_path,
arguments,
NULL,
Expand Down
54 changes: 52 additions & 2 deletions src/win/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ static int uv_tcp_set_socket(uv_tcp_t* handle, SOCKET socket) {
LOOP->iocp,
(ULONG_PTR)socket,
0) == NULL) {
uv_set_sys_error(GetLastError());
return -1;
handle->flags |= UV_HANDLE_EMULATE_IOCP;
}

if (pSetFileCompletionNotificationModes) {
Expand Down Expand Up @@ -104,6 +103,23 @@ int uv_tcp_init(uv_tcp_t* handle) {
}


int uv_tcp_import(uv_tcp_t* handle, SOCKET sock) {
int r;

if (uv_tcp_init(handle) != 0) {
return -1;
}

if (uv_tcp_set_socket(handle, sock) != 0) {
return -1;
}

handle->flags |= UV_HANDLE_BOUND;

return 0;
}


void uv_tcp_endgame(uv_tcp_t* handle) {
uv_err_t err;
int status;
Expand Down Expand Up @@ -212,6 +228,18 @@ int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) {
}
}

static void CALLBACK post_completion(void* context, BOOLEAN timed_out) {
uv_req_t* req = (uv_req_t*) context;

assert(req != NULL);
assert(!timed_out);

UnregisterWait(req->wait);
CloseHandle(req->overlapped.hEvent);
req->overlapped.hEvent = NULL;

PostQueuedCompletionStatus(LOOP->iocp, req->overlapped.InternalHigh, NULL, &req->overlapped);
}

static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
BOOL success;
Expand Down Expand Up @@ -244,6 +272,17 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
/* Prepare the overlapped structure. */
memset(&(req->overlapped), 0, sizeof(req->overlapped));

if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->overlapped.hEvent = CreateEvent(NULL, 0, 0, NULL);
if (!req->overlapped.hEvent) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req((uv_req_t*)req);
handle->reqs_pending++;
return;
}
req->overlapped.hEvent = (HANDLE) ((DWORD) req->overlapped.hEvent | 1);
}

success = pAcceptExFamily(handle->socket,
accept_socket,
(void*)req->accept_buffer,
Expand All @@ -262,13 +301,24 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
/* The req will be processed with IOCP. */
req->accept_socket = accept_socket;
handle->reqs_pending++;
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (!RegisterWaitForSingleObject(&req->wait, req->overlapped.hEvent, post_completion, (void*) req, INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req((uv_req_t*)req);
return;
}
}
} else {
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, WSAGetLastError());
uv_insert_pending_req((uv_req_t*)req);
handle->reqs_pending++;
/* Destroy the preallocated client socket. */
closesocket(accept_socket);
/* Destroy the event handle */
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
CloseHandle(req->overlapped.hEvent);
}
}
}

Expand Down
Loading

0 comments on commit cfc5182

Please sign in to comment.