Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add rbffi_thread_blocking_region and DEFER_ASYNC_CALLBACK for Ruby 1.…

…8 on win32

This mostly done by replacing posix with win32 functions,
but it's working with rb_thread_select() and blocking _pipes,
since rb_io_wait_readable() doesn't work with nonblocking pipes.
  • Loading branch information...
commit 23aedab7ca947845bec5f379c0c66bb7acc4ad86 1 parent 149f544
@larskanis authored
Showing with 160 additions and 10 deletions.
  1. +53 −10 ext/ffi_c/Function.c
  2. +107 −0 ext/ffi_c/Thread.c
View
63 ext/ffi_c/Function.c
@@ -69,9 +69,7 @@ static void callback_invoke(ffi_cif* cif, void* retval, void** parameters, void*
static bool callback_prep(void* ctx, void* code, Closure* closure, char* errmsg, size_t errmsgsize);
static void* callback_with_gvl(void* data);
-#if !defined(_WIN32) || defined(HAVE_RB_THREAD_BLOCKING_REGION)
-# define DEFER_ASYNC_CALLBACK 1
-#endif
+#define DEFER_ASYNC_CALLBACK 1
#if defined(DEFER_ASYNC_CALLBACK)
@@ -117,8 +115,11 @@ static struct gvl_callback* async_cb_list = NULL;
static int async_cb_pipe[2];
# endif
# else
-static HANDLE async_cb_cond;
-static CRITICAL_SECTION async_cb_lock;
+ static HANDLE async_cb_cond;
+ static CRITICAL_SECTION async_cb_lock;
+# if !defined(HAVE_RB_THREAD_BLOCKING_REGION)
+ static int async_cb_pipe[2];
+# endif
# endif
#endif
@@ -277,7 +278,9 @@ function_init(VALUE self, VALUE rbFunctionInfo, VALUE rbProc)
#if defined(DEFER_ASYNC_CALLBACK)
if (async_cb_thread == Qnil) {
-#if !defined(HAVE_RB_THREAD_BLOCKING_REGION)
+#if !defined(HAVE_RB_THREAD_BLOCKING_REGION) && defined(_WIN32)
+ _pipe(async_cb_pipe, 1024, O_BINARY);
+#elif !defined(HAVE_RB_THREAD_BLOCKING_REGION)
pipe(async_cb_pipe);
fcntl(async_cb_pipe[0], F_SETFL, fcntl(async_cb_pipe[0], F_GETFL) | O_NONBLOCK);
fcntl(async_cb_pipe[1], F_SETFL, fcntl(async_cb_pipe[1], F_GETFL) | O_NONBLOCK);
@@ -441,16 +444,27 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data)
#elif defined(DEFER_ASYNC_CALLBACK) && defined(_WIN32)
} else {
+ bool empty = false;
+
cb.async_event = CreateEvent(NULL, FALSE, FALSE, NULL);
-
+
// Now signal the async callback thread
EnterCriticalSection(&async_cb_lock);
+ empty = async_cb_list == NULL;
cb.next = async_cb_list;
async_cb_list = &cb;
LeaveCriticalSection(&async_cb_lock);
+#if !defined(HAVE_RB_THREAD_BLOCKING_REGION)
+ // Only signal if the list was empty
+ if (empty) {
+ char c;
+ write(async_cb_pipe[1], &c, 1);
+ }
+#else
SetEvent(async_cb_cond);
-
+#endif
+
// Wait for the thread executing the ruby callback to signal it is done
WaitForSingleObject(cb.async_event, INFINITE);
CloseHandle(cb.async_event);
@@ -481,10 +495,39 @@ async_cb_event(void* unused)
rb_thread_create(async_cb_call, w.cb);
}
}
-
+
return Qnil;
}
+#elif defined(_WIN32)
+static VALUE
+async_cb_event(void* unused)
+{
+ while (true) {
+ struct gvl_callback* cb;
+ char buf[64];
+ fd_set rfds;
+
+ FD_ZERO(&rfds);
+ FD_SET(async_cb_pipe[0], &rfds);
+ rb_thread_select(async_cb_pipe[0] + 1, &rfds, NULL, NULL, NULL);
+ read(async_cb_pipe[0], buf, sizeof(buf));
+
+ EnterCriticalSection(&async_cb_lock);
+ cb = async_cb_list;
+ async_cb_list = NULL;
+ LeaveCriticalSection(&async_cb_lock);
+
+ while (cb != NULL) {
+ struct gvl_callback* next = cb->next;
+ // Start up a new ruby thread to run the ruby callback
+ rb_thread_create(async_cb_call, cb);
+ cb = next;
+ }
+ }
+
+ return Qnil;
+}
#else
static VALUE
async_cb_event(void* unused)
@@ -842,7 +885,7 @@ rbffi_Function_Init(VALUE moduleFFI)
id_cb_ref = rb_intern("@__ffi_callback__");
id_to_native = rb_intern("to_native");
id_from_native = rb_intern("from_native");
-#if defined(_WIN32) && defined(HAVE_RB_THREAD_BLOCKING_REGION)
+#if defined(_WIN32)
InitializeCriticalSection(&async_cb_lock);
async_cb_cond = CreateEvent(NULL, FALSE, FALSE, NULL);
#endif
View
107 ext/ffi_c/Thread.c
@@ -180,6 +180,113 @@ rbffi_thread_blocking_region(VALUE (*func)(void *), void *data1, void (*ubf)(voi
}
#else
+/* win32 implementation */
+
+struct BlockingThread {
+ HANDLE tid;
+ VALUE (*fn)(void *);
+ void *data;
+ void (*ubf)(void *);
+ void *data2;
+ VALUE retval;
+ int wrfd;
+ int rdfd;
+};
+
+static DWORD __stdcall
+rbffi_blocking_thread(LPVOID args)
+{
+ struct BlockingThread* thr = (struct BlockingThread *) args;
+ char c = 1;
+ VALUE retval;
+
+ retval = (*thr->fn)(thr->data);
+ thr->retval = retval;
+
+ write(thr->wrfd, &c, sizeof(c));
+
+ return 0;
+}
+
+static VALUE
+wait_for_thread(void *data)
+{
+ struct BlockingThread* thr = (struct BlockingThread *) data;
+ char c, res;
+ fd_set rfds;
+
+ FD_ZERO(&rfds);
+ FD_SET(thr->rdfd, &rfds);
+ rb_thread_select(thr->rdfd + 1, &rfds, NULL, NULL, NULL);
+ read(thr->rdfd, &c, 1);
+ return Qnil;
+}
+
+static VALUE
+cleanup_blocking_thread(void *data, VALUE exc)
+{
+ struct BlockingThread* thr = (struct BlockingThread *) data;
+
+ if (thr->ubf != (void (*)(void *)) -1) {
+ (*thr->ubf)(thr->data2);
+ } else {
+ TerminateThread(thr->tid, 0);
+ }
+
+ return exc;
+}
+
+VALUE
+rbffi_thread_blocking_region(VALUE (*func)(void *), void *data1, void (*ubf)(void *), void *data2)
+{
+ struct BlockingThread* thr;
+ int fd[2];
+ VALUE exc;
+ DWORD state;
+ DWORD res;
+
+ if (_pipe(fd, 1024, O_BINARY) == -1) {
+ rb_raise(rb_eSystemCallError, "_pipe() failed");
+ return Qnil;
+ }
+
+ thr = ALLOC_N(struct BlockingThread, 1);
+ thr->rdfd = fd[0];
+ thr->wrfd = fd[1];
+ thr->fn = func;
+ thr->data = data1;
+ thr->ubf = ubf;
+ thr->data2 = data2;
+ thr->retval = Qnil;
+
+ thr->tid = CreateThread(NULL, 0, rbffi_blocking_thread, thr, 0, NULL);
+ if (!thr->tid) {
+ close(fd[0]);
+ close(fd[1]);
+ xfree(thr);
+ rb_raise(rb_eSystemCallError, "CreateThread() failed");
+ return Qnil;
+ }
+
+ exc = rb_rescue2(wait_for_thread, (VALUE) thr, cleanup_blocking_thread, (VALUE) thr,
+ rb_eException);
+
+ /* The thread should be finished, already. */
+ WaitForSingleObject(thr->tid, INFINITE);
+ CloseHandle(thr->tid);
+ close(fd[1]);
+ close(fd[0]);
+ xfree(thr);
+
+ if (exc != Qnil) {
+ rb_exc_raise(exc);
+ }
+
+ return thr->retval;
+}
+
+
+#if 0
/*
* FIXME: someone needs to implement something similar to the posix pipe based
Please sign in to comment.
Something went wrong with that request. Please try again.