Skip to content

Commit

Permalink
Change the semantics of rb_postponed_job_register
Browse files Browse the repository at this point in the history
Our current implementation of rb_postponed_job_register suffers from
some safety issues that can lead to interpreter crashes (see bug #1991).
Essentially, the issue is that jobs can be called with the wrong
arguments.

We made two attempts to fix this whilst keeping the promised semantics,
but:
  * The first one involved masking/unmasking when flushing jobs, which
    was believed to be too expensive
  * The second one involved a lock-free, multi-producer, single-consumer
    ringbuffer, which was too complex

The critical insight behind this third solution is that essentially the
only user of these APIs are a) internal, or b) profiling gems.

For a), none of the usages actually require variable data; they will
work just fine with the preregistration interface.

For b), generally profiling gems only call a single callback with a
single piece of data (which is actually usually just zero) for the life
of the program. The ringbuffer is complex because it needs to support
multi-word inserts of job & data (which can't be atomic); but nobody
actually even needs that functionality, really.

So, this comit:
  * Introduces a pre-registration API for jobs, with a GVL-requiring
    rb_postponed_job_prereigster, which returns a handle which can be
    used with an async-signal-safe rb_postponed_job_trigger.
  * Deprecates rb_postponed_job_register (and re-implements it on top of
    the preregister function for compatability)
  * Moves all the internal usages of postponed job register
    pre-registration
  • Loading branch information
KJTsanaktsidis authored and ko1 committed Dec 10, 2023
1 parent aecbd66 commit f8effa2
Show file tree
Hide file tree
Showing 14 changed files with 556 additions and 169 deletions.
51 changes: 51 additions & 0 deletions common.mk

Large diffs are not rendered by default.

94 changes: 94 additions & 0 deletions ext/-test-/postponed_job/postponed_job.c
@@ -1,6 +1,29 @@
#include "ruby.h"
#include "ruby/debug.h"

// We're testing deprecated things, don't print the compiler warnings
#if 0

#elif defined(_MSC_VER)
#pragma warning(disable : 4996)

#elif defined(__INTEL_COMPILER)
#pragma warning(disable : 1786)

#elif defined(__clang__)
#pragma clang diagnostic ignored "-Wdeprecated-declarations"

#elif defined(__GNUC__)
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"

#elif defined(__SUNPRO_CC)
#pragma error_messages (off,symdeprecated)

#else
// :FIXME: improve here for your compiler.

#endif

static int counter;

static void
Expand Down Expand Up @@ -58,6 +81,22 @@ pjob_call_direct(VALUE self, VALUE obj)
return self;
}

static void pjob_noop_callback(void *data) { }

static VALUE
pjob_register_one_same(VALUE self)
{
rb_gc_start();
int r1 = rb_postponed_job_register_one(0, pjob_noop_callback, NULL);
int r2 = rb_postponed_job_register_one(0, pjob_noop_callback, NULL);
int r3 = rb_postponed_job_register_one(0, pjob_noop_callback, NULL);
VALUE ary = rb_ary_new();
rb_ary_push(ary, INT2FIX(r1));
rb_ary_push(ary, INT2FIX(r2));
rb_ary_push(ary, INT2FIX(r3));
return ary;
}

#ifdef HAVE_PTHREAD_H
#include <pthread.h>

Expand Down Expand Up @@ -86,15 +125,70 @@ pjob_register_in_c_thread(VALUE self, VALUE obj)
}
#endif

static void
pjob_preregistered_callback(void *data)
{
VALUE ary = (VALUE)data;
Check_Type(ary, T_ARRAY);
rb_ary_push(ary, INT2FIX(counter));
}

static VALUE
pjob_preregister_and_call_with_sleep(VALUE self, VALUE obj)
{
counter = 0;
rb_postponed_job_handle_t h = rb_postponed_job_preregister(pjob_preregistered_callback, (void *)obj);
counter++;
rb_postponed_job_trigger(h);
rb_thread_sleep(0);
counter++;
rb_postponed_job_trigger(h);
rb_thread_sleep(0);
counter++;
rb_postponed_job_trigger(h);
rb_thread_sleep(0);
return self;
}

static VALUE
pjob_preregister_and_call_without_sleep(VALUE self, VALUE obj)
{
counter = 0;
rb_postponed_job_handle_t h = rb_postponed_job_preregister(pjob_preregistered_callback, (void *)obj);
counter = 3;
rb_postponed_job_trigger(h);
rb_postponed_job_trigger(h);
rb_postponed_job_trigger(h);
return self;
}

static VALUE
pjob_preregister_multiple_times(VALUE self)
{
int r1 = rb_postponed_job_preregister(pjob_noop_callback, NULL);
int r2 = rb_postponed_job_preregister(pjob_noop_callback, NULL);
int r3 = rb_postponed_job_preregister(pjob_noop_callback, NULL);
VALUE ary = rb_ary_new();
rb_ary_push(ary, INT2FIX(r1));
rb_ary_push(ary, INT2FIX(r2));
rb_ary_push(ary, INT2FIX(r3));
return ary;

}

void
Init_postponed_job(VALUE self)
{
VALUE mBug = rb_define_module("Bug");
rb_define_module_function(mBug, "postponed_job_register", pjob_register, 1);
rb_define_module_function(mBug, "postponed_job_register_one", pjob_register_one, 1);
rb_define_module_function(mBug, "postponed_job_call_direct", pjob_call_direct, 1);
rb_define_module_function(mBug, "postponed_job_register_one_same", pjob_register_one_same, 0);
#ifdef HAVE_PTHREAD_H
rb_define_module_function(mBug, "postponed_job_register_in_c_thread", pjob_register_in_c_thread, 1);
#endif
rb_define_module_function(mBug, "postponed_job_preregister_and_call_with_sleep", pjob_preregister_and_call_with_sleep, 1);
rb_define_module_function(mBug, "postponed_job_preregister_and_call_without_sleep", pjob_preregister_and_call_without_sleep, 1);
rb_define_module_function(mBug, "postponed_job_preregister_multiple_times", pjob_preregister_multiple_times, 0);
}

42 changes: 35 additions & 7 deletions ext/-test-/tracepoint/gc_hook.c
Expand Up @@ -2,6 +2,11 @@
#include "ruby/debug.h"

static int invoking; /* TODO: should not be global variable */
static VALUE gc_start_proc;
static VALUE gc_end_proc;
static rb_postponed_job_handle_t invoking_proc_pjob;
static bool pjob_execute_gc_start_proc_p;
static bool pjob_execute_gc_end_proc_p;

static VALUE
invoke_proc_ensure(VALUE _)
Expand All @@ -17,23 +22,35 @@ invoke_proc_begin(VALUE proc)
}

static void
invoke_proc(void *data)
invoke_proc(void *unused)
{
VALUE proc = (VALUE)data;
invoking += 1;
rb_ensure(invoke_proc_begin, proc, invoke_proc_ensure, 0);
if (pjob_execute_gc_start_proc_p) {
pjob_execute_gc_start_proc_p = false;
invoking += 1;
rb_ensure(invoke_proc_begin, gc_start_proc, invoke_proc_ensure, 0);
}
if (pjob_execute_gc_end_proc_p) {
pjob_execute_gc_end_proc_p = false;
invoking += 1;
rb_ensure(invoke_proc_begin, gc_end_proc, invoke_proc_ensure, 0);
}
}

static void
gc_start_end_i(VALUE tpval, void *data)
{
rb_trace_arg_t *tparg = rb_tracearg_from_tracepoint(tpval);
if (0) {
rb_trace_arg_t *tparg = rb_tracearg_from_tracepoint(tpval);
fprintf(stderr, "trace: %s\n", rb_tracearg_event_flag(tparg) == RUBY_INTERNAL_EVENT_GC_START ? "gc_start" : "gc_end");
}

if (invoking == 0) {
rb_postponed_job_register(0, invoke_proc, data);
if (rb_tracearg_event_flag(tparg) == RUBY_INTERNAL_EVENT_GC_START) {
pjob_execute_gc_start_proc_p = true;
} else {
pjob_execute_gc_end_proc_p = true;
}
rb_postponed_job_trigger(invoking_proc_pjob);
}
}

Expand All @@ -54,8 +71,13 @@ set_gc_hook(VALUE module, VALUE proc, rb_event_flag_t event, const char *tp_str,
if (!rb_obj_is_proc(proc)) {
rb_raise(rb_eTypeError, "trace_func needs to be Proc");
}
if (event == RUBY_INTERNAL_EVENT_GC_START) {
gc_start_proc = proc;
} else {
gc_end_proc = proc;
}

tpval = rb_tracepoint_new(0, event, gc_start_end_i, (void *)proc);
tpval = rb_tracepoint_new(0, event, gc_start_end_i, 0);
rb_ivar_set(module, tp_key, tpval);
rb_tracepoint_enable(tpval);
}
Expand All @@ -82,4 +104,10 @@ Init_gc_hook(VALUE module)
{
rb_define_module_function(module, "after_gc_start_hook=", set_after_gc_start, 1);
rb_define_module_function(module, "after_gc_exit_hook=", start_after_gc_exit, 1);
rb_gc_register_address(&gc_start_proc);
rb_gc_register_address(&gc_end_proc);
invoking_proc_pjob = rb_postponed_job_preregister(invoke_proc, NULL);
if (invoking_proc_pjob == POSTPONED_JOB_HANDLE_INVALID) {
rb_raise(rb_eStandardError, "could not preregister invoke_proc");
}
}
1 change: 1 addition & 0 deletions ext/ripper/depend
Expand Up @@ -566,6 +566,7 @@ ripper.o: $(hdrdir)/ruby/st.h
ripper.o: $(hdrdir)/ruby/subst.h
ripper.o: $(hdrdir)/ruby/thread_native.h
ripper.o: $(hdrdir)/ruby/util.h
ripper.o: $(hdrdir)/ruby/version.h
ripper.o: $(top_srcdir)/ccan/check_type/check_type.h
ripper.o: $(top_srcdir)/ccan/container_of/container_of.h
ripper.o: $(top_srcdir)/ccan/list/list.h
Expand Down
15 changes: 15 additions & 0 deletions ext/socket/depend
Expand Up @@ -186,6 +186,7 @@ ancdata.o: $(hdrdir)/ruby/subst.h
ancdata.o: $(hdrdir)/ruby/thread.h
ancdata.o: $(hdrdir)/ruby/thread_native.h
ancdata.o: $(hdrdir)/ruby/util.h
ancdata.o: $(hdrdir)/ruby/version.h
ancdata.o: $(top_srcdir)/ccan/check_type/check_type.h
ancdata.o: $(top_srcdir)/ccan/container_of/container_of.h
ancdata.o: $(top_srcdir)/ccan/list/list.h
Expand Down Expand Up @@ -394,6 +395,7 @@ basicsocket.o: $(hdrdir)/ruby/subst.h
basicsocket.o: $(hdrdir)/ruby/thread.h
basicsocket.o: $(hdrdir)/ruby/thread_native.h
basicsocket.o: $(hdrdir)/ruby/util.h
basicsocket.o: $(hdrdir)/ruby/version.h
basicsocket.o: $(top_srcdir)/ccan/check_type/check_type.h
basicsocket.o: $(top_srcdir)/ccan/container_of/container_of.h
basicsocket.o: $(top_srcdir)/ccan/list/list.h
Expand Down Expand Up @@ -602,6 +604,7 @@ constants.o: $(hdrdir)/ruby/subst.h
constants.o: $(hdrdir)/ruby/thread.h
constants.o: $(hdrdir)/ruby/thread_native.h
constants.o: $(hdrdir)/ruby/util.h
constants.o: $(hdrdir)/ruby/version.h
constants.o: $(top_srcdir)/ccan/check_type/check_type.h
constants.o: $(top_srcdir)/ccan/container_of/container_of.h
constants.o: $(top_srcdir)/ccan/list/list.h
Expand Down Expand Up @@ -811,6 +814,7 @@ ifaddr.o: $(hdrdir)/ruby/subst.h
ifaddr.o: $(hdrdir)/ruby/thread.h
ifaddr.o: $(hdrdir)/ruby/thread_native.h
ifaddr.o: $(hdrdir)/ruby/util.h
ifaddr.o: $(hdrdir)/ruby/version.h
ifaddr.o: $(top_srcdir)/ccan/check_type/check_type.h
ifaddr.o: $(top_srcdir)/ccan/container_of/container_of.h
ifaddr.o: $(top_srcdir)/ccan/list/list.h
Expand Down Expand Up @@ -1019,6 +1023,7 @@ init.o: $(hdrdir)/ruby/subst.h
init.o: $(hdrdir)/ruby/thread.h
init.o: $(hdrdir)/ruby/thread_native.h
init.o: $(hdrdir)/ruby/util.h
init.o: $(hdrdir)/ruby/version.h
init.o: $(top_srcdir)/ccan/check_type/check_type.h
init.o: $(top_srcdir)/ccan/container_of/container_of.h
init.o: $(top_srcdir)/ccan/list/list.h
Expand Down Expand Up @@ -1227,6 +1232,7 @@ ipsocket.o: $(hdrdir)/ruby/subst.h
ipsocket.o: $(hdrdir)/ruby/thread.h
ipsocket.o: $(hdrdir)/ruby/thread_native.h
ipsocket.o: $(hdrdir)/ruby/util.h
ipsocket.o: $(hdrdir)/ruby/version.h
ipsocket.o: $(top_srcdir)/ccan/check_type/check_type.h
ipsocket.o: $(top_srcdir)/ccan/container_of/container_of.h
ipsocket.o: $(top_srcdir)/ccan/list/list.h
Expand Down Expand Up @@ -1435,6 +1441,7 @@ option.o: $(hdrdir)/ruby/subst.h
option.o: $(hdrdir)/ruby/thread.h
option.o: $(hdrdir)/ruby/thread_native.h
option.o: $(hdrdir)/ruby/util.h
option.o: $(hdrdir)/ruby/version.h
option.o: $(top_srcdir)/ccan/check_type/check_type.h
option.o: $(top_srcdir)/ccan/container_of/container_of.h
option.o: $(top_srcdir)/ccan/list/list.h
Expand Down Expand Up @@ -1643,6 +1650,7 @@ raddrinfo.o: $(hdrdir)/ruby/subst.h
raddrinfo.o: $(hdrdir)/ruby/thread.h
raddrinfo.o: $(hdrdir)/ruby/thread_native.h
raddrinfo.o: $(hdrdir)/ruby/util.h
raddrinfo.o: $(hdrdir)/ruby/version.h
raddrinfo.o: $(top_srcdir)/ccan/check_type/check_type.h
raddrinfo.o: $(top_srcdir)/ccan/container_of/container_of.h
raddrinfo.o: $(top_srcdir)/ccan/list/list.h
Expand Down Expand Up @@ -1851,6 +1859,7 @@ socket.o: $(hdrdir)/ruby/subst.h
socket.o: $(hdrdir)/ruby/thread.h
socket.o: $(hdrdir)/ruby/thread_native.h
socket.o: $(hdrdir)/ruby/util.h
socket.o: $(hdrdir)/ruby/version.h
socket.o: $(top_srcdir)/ccan/check_type/check_type.h
socket.o: $(top_srcdir)/ccan/container_of/container_of.h
socket.o: $(top_srcdir)/ccan/list/list.h
Expand Down Expand Up @@ -2059,6 +2068,7 @@ sockssocket.o: $(hdrdir)/ruby/subst.h
sockssocket.o: $(hdrdir)/ruby/thread.h
sockssocket.o: $(hdrdir)/ruby/thread_native.h
sockssocket.o: $(hdrdir)/ruby/util.h
sockssocket.o: $(hdrdir)/ruby/version.h
sockssocket.o: $(top_srcdir)/ccan/check_type/check_type.h
sockssocket.o: $(top_srcdir)/ccan/container_of/container_of.h
sockssocket.o: $(top_srcdir)/ccan/list/list.h
Expand Down Expand Up @@ -2267,6 +2277,7 @@ tcpserver.o: $(hdrdir)/ruby/subst.h
tcpserver.o: $(hdrdir)/ruby/thread.h
tcpserver.o: $(hdrdir)/ruby/thread_native.h
tcpserver.o: $(hdrdir)/ruby/util.h
tcpserver.o: $(hdrdir)/ruby/version.h
tcpserver.o: $(top_srcdir)/ccan/check_type/check_type.h
tcpserver.o: $(top_srcdir)/ccan/container_of/container_of.h
tcpserver.o: $(top_srcdir)/ccan/list/list.h
Expand Down Expand Up @@ -2475,6 +2486,7 @@ tcpsocket.o: $(hdrdir)/ruby/subst.h
tcpsocket.o: $(hdrdir)/ruby/thread.h
tcpsocket.o: $(hdrdir)/ruby/thread_native.h
tcpsocket.o: $(hdrdir)/ruby/util.h
tcpsocket.o: $(hdrdir)/ruby/version.h
tcpsocket.o: $(top_srcdir)/ccan/check_type/check_type.h
tcpsocket.o: $(top_srcdir)/ccan/container_of/container_of.h
tcpsocket.o: $(top_srcdir)/ccan/list/list.h
Expand Down Expand Up @@ -2683,6 +2695,7 @@ udpsocket.o: $(hdrdir)/ruby/subst.h
udpsocket.o: $(hdrdir)/ruby/thread.h
udpsocket.o: $(hdrdir)/ruby/thread_native.h
udpsocket.o: $(hdrdir)/ruby/util.h
udpsocket.o: $(hdrdir)/ruby/version.h
udpsocket.o: $(top_srcdir)/ccan/check_type/check_type.h
udpsocket.o: $(top_srcdir)/ccan/container_of/container_of.h
udpsocket.o: $(top_srcdir)/ccan/list/list.h
Expand Down Expand Up @@ -2891,6 +2904,7 @@ unixserver.o: $(hdrdir)/ruby/subst.h
unixserver.o: $(hdrdir)/ruby/thread.h
unixserver.o: $(hdrdir)/ruby/thread_native.h
unixserver.o: $(hdrdir)/ruby/util.h
unixserver.o: $(hdrdir)/ruby/version.h
unixserver.o: $(top_srcdir)/ccan/check_type/check_type.h
unixserver.o: $(top_srcdir)/ccan/container_of/container_of.h
unixserver.o: $(top_srcdir)/ccan/list/list.h
Expand Down Expand Up @@ -3099,6 +3113,7 @@ unixsocket.o: $(hdrdir)/ruby/subst.h
unixsocket.o: $(hdrdir)/ruby/thread.h
unixsocket.o: $(hdrdir)/ruby/thread_native.h
unixsocket.o: $(hdrdir)/ruby/util.h
unixsocket.o: $(hdrdir)/ruby/version.h
unixsocket.o: $(top_srcdir)/ccan/check_type/check_type.h
unixsocket.o: $(top_srcdir)/ccan/container_of/container_of.h
unixsocket.o: $(top_srcdir)/ccan/list/list.h
Expand Down
12 changes: 9 additions & 3 deletions gc.c
Expand Up @@ -952,6 +952,7 @@ typedef struct rb_objspace {
#endif

rb_darray(VALUE *) weak_references;
rb_postponed_job_handle_t finalize_deferred_pjob;
} rb_objspace_t;


Expand Down Expand Up @@ -1425,6 +1426,8 @@ PRINTF_ARGS(static void gc_report_body(int level, rb_objspace_t *objspace, const
static const char *obj_info(VALUE obj);
static const char *obj_type_name(VALUE obj);

static void gc_finalize_deferred(void *dmy);

/*
* 1 - TSC (H/W Time Stamp Counter)
* 2 - getrusage
Expand Down Expand Up @@ -1906,6 +1909,10 @@ rb_objspace_alloc(void)
rb_objspace_t *objspace = calloc1(sizeof(rb_objspace_t));
objspace->flags.measure_gc = 1;
malloc_limit = gc_params.malloc_limit_min;
objspace->finalize_deferred_pjob = rb_postponed_job_preregister(gc_finalize_deferred, objspace);
if (objspace->finalize_deferred_pjob == POSTPONED_JOB_HANDLE_INVALID) {
rb_bug("Could not preregister postponed job for GC");
}

for (int i = 0; i < SIZE_POOL_COUNT; i++) {
rb_size_pool_t *size_pool = &size_pools[i];
Expand Down Expand Up @@ -4527,9 +4534,8 @@ gc_finalize_deferred(void *dmy)
static void
gc_finalize_deferred_register(rb_objspace_t *objspace)
{
if (rb_postponed_job_register_one(0, gc_finalize_deferred, objspace) == 0) {
rb_bug("gc_finalize_deferred_register: can't register finalizer.");
}
/* will enqueue a call to gc_finalize_deferred */
rb_postponed_job_trigger(objspace->finalize_deferred_pjob);
}

static int pop_mark_stack(mark_stack_t *stack, VALUE *data);
Expand Down

0 comments on commit f8effa2

Please sign in to comment.