Skip to content

Commit

Permalink
Prefer rb_thread_call_without_gvl
Browse files Browse the repository at this point in the history
  • Loading branch information
methodmissing committed Dec 24, 2014
1 parent adb703c commit eec394a
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 30 deletions.
18 changes: 9 additions & 9 deletions ext/rbczmq/beacon.c
Expand Up @@ -24,7 +24,7 @@ static void rb_czmq_free_beacon_gc(void *ptr)
{
zmq_beacon_wrapper *beacon = (zmq_beacon_wrapper *)ptr;
if (beacon) {
rb_thread_blocking_region(rb_czmq_nogvl_beacon_destroy, beacon, RUBY_UBF_IO, 0);
rb_thread_call_without_gvl(rb_czmq_nogvl_beacon_destroy, beacon, RUBY_UBF_IO, 0);
xfree(beacon);
}
}
Expand Down Expand Up @@ -57,7 +57,7 @@ static VALUE rb_czmq_beacon_s_new(VALUE beacon, VALUE port)
Check_Type(port, T_FIXNUM);
beacon = Data_Make_Struct(rb_cZmqBeacon, zmq_beacon_wrapper, 0, rb_czmq_free_beacon_gc, bcn);
prt = FIX2INT(port);
bcn->beacon = (zbeacon_t*)rb_thread_blocking_region(rb_czmq_nogvl_new_beacon, (void *)prt, RUBY_UBF_IO, 0);
bcn->beacon = (zbeacon_t*)rb_thread_call_without_gvl(rb_czmq_nogvl_new_beacon, (void *)prt, RUBY_UBF_IO, 0);
ZmqAssertObjOnAlloc(bcn->beacon, bcn);
rb_obj_call_init(beacon, 0, NULL);
return beacon;
Expand All @@ -77,7 +77,7 @@ static VALUE rb_czmq_beacon_s_new(VALUE beacon, VALUE port)
static VALUE rb_czmq_beacon_destroy(VALUE obj)
{
GetZmqBeacon(obj);
rb_thread_blocking_region(rb_czmq_nogvl_beacon_destroy, beacon, RUBY_UBF_IO, 0);
rb_thread_call_without_gvl(rb_czmq_nogvl_beacon_destroy, beacon, RUBY_UBF_IO, 0);
return Qnil;
}

Expand Down Expand Up @@ -127,7 +127,7 @@ static VALUE rb_czmq_beacon_set_interval(VALUE obj, VALUE interval)
Check_Type(interval, T_FIXNUM);
args.beacon = beacon;
args.interval = FIX2INT(interval);
rb_thread_blocking_region(rb_czmq_nogvl_set_interval, (void *)&args, RUBY_UBF_IO, 0);
rb_thread_call_without_gvl(rb_czmq_nogvl_set_interval, (void *)&args, RUBY_UBF_IO, 0);
return Qnil;
}

Expand Down Expand Up @@ -156,7 +156,7 @@ static VALUE rb_czmq_nogvl_noecho(void *ptr)
static VALUE rb_czmq_beacon_noecho(VALUE obj)
{
GetZmqBeacon(obj);
rb_thread_blocking_region(rb_czmq_nogvl_noecho, (void *)beacon, RUBY_UBF_IO, 0);
rb_thread_call_without_gvl(rb_czmq_nogvl_noecho, (void *)beacon, RUBY_UBF_IO, 0);
return Qnil;
}

Expand Down Expand Up @@ -191,7 +191,7 @@ static VALUE rb_czmq_beacon_publish(VALUE obj, VALUE transmit)
args.beacon = beacon;
args.transmit = RSTRING_PTR(transmit);
args.length = (int)RSTRING_LEN(transmit);
rb_thread_blocking_region(rb_czmq_nogvl_publish, (void *)&args, RUBY_UBF_IO, 0);
rb_thread_call_without_gvl(rb_czmq_nogvl_publish, (void *)&args, RUBY_UBF_IO, 0);
return Qnil;
}

Expand Down Expand Up @@ -220,7 +220,7 @@ static VALUE rb_czmq_nogvl_silence(void *ptr)
static VALUE rb_czmq_beacon_silence(VALUE obj)
{
GetZmqBeacon(obj);
rb_thread_blocking_region(rb_czmq_nogvl_silence, (void *)beacon, RUBY_UBF_IO, 0);
rb_thread_call_without_gvl(rb_czmq_nogvl_silence, (void *)beacon, RUBY_UBF_IO, 0);
return Qnil;
}

Expand Down Expand Up @@ -260,7 +260,7 @@ static VALUE rb_czmq_beacon_subscribe(VALUE obj, VALUE filter)
args.filter = RSTRING_PTR(filter);
args.length = (int)RSTRING_LEN(filter);
}
rb_thread_blocking_region(rb_czmq_nogvl_subscribe, (void *)&args, RUBY_UBF_IO, 0);
rb_thread_call_without_gvl(rb_czmq_nogvl_subscribe, (void *)&args, RUBY_UBF_IO, 0);
return Qnil;
}

Expand Down Expand Up @@ -289,7 +289,7 @@ static VALUE rb_czmq_nogvl_unsubscribe(void *ptr)
static VALUE rb_czmq_beacon_unsubscribe(VALUE obj)
{
GetZmqBeacon(obj);
rb_thread_blocking_region(rb_czmq_nogvl_unsubscribe, (void *)beacon, RUBY_UBF_IO, 0);
rb_thread_call_without_gvl(rb_czmq_nogvl_unsubscribe, (void *)beacon, RUBY_UBF_IO, 0);
return Qnil;
}

Expand Down
8 changes: 4 additions & 4 deletions ext/rbczmq/context.c
Expand Up @@ -54,7 +54,7 @@ static void rb_czmq_free_ctx(zmq_ctx_wrapper *ctx)
}

// finally, shutdown the context.
rb_thread_blocking_region(rb_czmq_nogvl_zctx_destroy, ctx, RUBY_UBF_IO, 0);
rb_thread_call_without_gvl(rb_czmq_nogvl_zctx_destroy, ctx, RUBY_UBF_IO, 0);

ctx->ctx = NULL;
rb_hash_aset(ctx_map, ctx->pidValue, Qnil);
Expand Down Expand Up @@ -108,7 +108,7 @@ void rb_czmq_context_destroy_socket(zmq_sock_wrapper* socket)
zlist_remove(ctx->sockets, socket);

if (socket->socket) {
rb_thread_blocking_region(rb_czmq_nogvl_zsocket_destroy, socket, RUBY_UBF_IO, 0);
rb_thread_call_without_gvl(rb_czmq_nogvl_zsocket_destroy, socket, RUBY_UBF_IO, 0);
}
}
}
Expand Down Expand Up @@ -190,7 +190,7 @@ static VALUE rb_czmq_ctx_s_new(int argc, VALUE *argv, VALUE context)
rb_raise(rb_eZmqError, "single ZMQ context per process allowed (previous context created at %s:%d)", ctx->file, ctx->line);
}
context = Data_Make_Struct(rb_cZmqContext, zmq_ctx_wrapper, rb_czmq_mark_ctx_gc, rb_czmq_free_ctx_gc, ctx);
ctx->ctx = (zctx_t*)rb_thread_blocking_region(rb_czmq_nogvl_zctx_new, NULL, RUBY_UBF_IO, 0);
ctx->ctx = (zctx_t*)rb_thread_call_without_gvl(rb_czmq_nogvl_zctx_new, NULL, RUBY_UBF_IO, 0);
ZmqAssertObjOnAlloc(ctx->ctx, ctx);
ctx->flags = 0;
ctx->pid = getpid();
Expand Down Expand Up @@ -386,7 +386,7 @@ static VALUE rb_czmq_ctx_socket(VALUE obj, VALUE type)

args.ctx = ctx->ctx;
args.type = socket_type;
VALUE socket_object = rb_czmq_socket_alloc(obj, ctx->ctx, (void*)rb_thread_blocking_region(rb_czmq_nogvl_socket_new, (void *)&args, RUBY_UBF_IO, 0));
VALUE socket_object = rb_czmq_socket_alloc(obj, ctx->ctx, (void*)rb_thread_call_without_gvl(rb_czmq_nogvl_socket_new, (void *)&args, RUBY_UBF_IO, 0));
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(socket_object);
zlist_push(ctx->sockets, sock);
Expand Down
2 changes: 1 addition & 1 deletion ext/rbczmq/extconf.rb
Expand Up @@ -124,7 +124,7 @@ def check_heads heads = [], fatal = false

dir_config('rbczmq')

have_func('rb_thread_blocking_region')
have_func('rb_thread_call_without_gvl')
have_func('rb_thread_call_without_gvl')

$INCFLAGS << " -I#{zmq_include_path}" if find_header("zmq.h", zmq_include_path)
Expand Down
2 changes: 1 addition & 1 deletion ext/rbczmq/loop.c
Expand Up @@ -301,7 +301,7 @@ static VALUE rb_czmq_loop_start(VALUE obj)
rb_thread_schedule();
zloop_timer(loop->loop, 1, 1, rb_czmq_loop_started_callback, loop);

rc = (int)rb_thread_blocking_region(rb_czmq_loop_start_nogvl, (void *)loop, rb_czmq_loop_start_ubf, (void*)loop);
rc = (int)rb_thread_call_without_gvl(rb_czmq_loop_start_nogvl, (void *)loop, rb_czmq_loop_start_ubf, (void*)loop);

if (rc > 0) rb_raise(rb_eZmqError, "internal event loop error!");
return INT2NUM(rc);
Expand Down
2 changes: 1 addition & 1 deletion ext/rbczmq/poller.c
Expand Up @@ -157,7 +157,7 @@ VALUE rb_czmq_poller_poll(int argc, VALUE *argv, VALUE obj)
rb_ary_clear(poller->readables);
rb_ary_clear(poller->writables);

rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_poll, (void *)&args, RUBY_UBF_IO, 0);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_poll, (void *)&args, RUBY_UBF_IO, 0);

/* only call ZmqAssert if return code is less than zero since zmq_poll returns the number of pollers on success */
if (rc < 0) {
Expand Down
2 changes: 1 addition & 1 deletion ext/rbczmq/rbczmq_ext.c
Expand Up @@ -223,7 +223,7 @@ static VALUE rb_czmq_m_proxy(int argc, VALUE *argv, ZMQ_UNUSED VALUE klass)
sockets[2] = NULL;
}

rc = (int)rb_thread_blocking_region(rb_czmq_m_proxy_nogvl, (void *)sockets, RUBY_UBF_IO, 0);
rc = (int)rb_thread_call_without_gvl(rb_czmq_m_proxy_nogvl, (void *)sockets, RUBY_UBF_IO, 0);

// int result = zmq_proxy(frontend_socket, backend_socket, capture_socket);
return INT2NUM(rc);
Expand Down
4 changes: 4 additions & 0 deletions ext/rbczmq/ruby2.h
@@ -1,4 +1,8 @@
#ifndef RBCZMQ_RUBY2_H
#define RBCZMQ_RUBY2_H

#ifndef rb_thread_call_without_gvl
#define rb_thread_call_without_gvl rb_thread_blocking_region
#endif

#endif
26 changes: 13 additions & 13 deletions ext/rbczmq/socket.c
Expand Up @@ -211,7 +211,7 @@ static VALUE rb_czmq_socket_bind(VALUE obj, VALUE endpoint)
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0);
/* ZmqAssert will return false on any non-zero return code. Bind returns the port number */
if (rc < 0) {
ZmqAssert(rc);
Expand Down Expand Up @@ -252,7 +252,7 @@ static VALUE rb_czmq_socket_connect(VALUE obj, VALUE endpoint)
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
/* get the endpoint name with any ephemeral ports filled in. */
char* endpoint_string = zsocket_last_endpoint(sock->socket);
Expand Down Expand Up @@ -306,7 +306,7 @@ static VALUE rb_czmq_socket_disconnect(VALUE obj, VALUE endpoint)
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_disconnect, (void *)&args, RUBY_UBF_IO, 0);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_socket_disconnect, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: disconnected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
Expand Down Expand Up @@ -360,7 +360,7 @@ static VALUE rb_czmq_socket_unbind(VALUE obj, VALUE endpoint)
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_unbind, (void *)&args, RUBY_UBF_IO, 0);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_socket_unbind, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: unbound \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
Expand Down Expand Up @@ -468,7 +468,7 @@ static VALUE rb_czmq_socket_send(VALUE obj, VALUE msg)
Check_Type(msg, T_STRING);
args.msg = RSTRING_PTR(msg);
args.length = RSTRING_LEN(msg);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: send \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(msg));
Expand Down Expand Up @@ -503,7 +503,7 @@ static VALUE rb_czmq_socket_sendm(VALUE obj, VALUE msg)
Check_Type(msg, T_STRING);
args.msg = RSTRING_PTR(msg);
args.length = RSTRING_LEN(msg);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: sendm \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(msg));
Expand Down Expand Up @@ -555,7 +555,7 @@ static VALUE rb_czmq_socket_recv(VALUE obj)
args.socket = sock;
zmq_msg_init(&args.message);

int rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0);
int rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0);
if (rc < 0) {
zmq_msg_close(&args.message);
return Qnil;
Expand Down Expand Up @@ -677,7 +677,7 @@ static VALUE rb_czmq_socket_send_frame(int argc, VALUE *argv, VALUE obj)
args.socket = sock;
args.frame = frame->frame;
args.flags = flgs;
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if ((flgs & ZFRAME_REUSE) == 0) {
/* frame has been destroyed, clear the owns flag */
Expand Down Expand Up @@ -730,7 +730,7 @@ static VALUE rb_czmq_socket_send_message(VALUE obj, VALUE message_obj)
if (sock->verbose) print_message = zmsg_dup(message->message);
args.socket = sock;
args.message = message->message;
rb_thread_blocking_region(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0);
rb_thread_call_without_gvl(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0);
message->flags &= ~ZMQ_MESSAGE_OWNED;
if (sock->verbose) ZmqDumpMessage("send_message", print_message);
return Qnil;
Expand Down Expand Up @@ -774,7 +774,7 @@ static VALUE rb_czmq_socket_recv_frame(VALUE obj)
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
frame = (zframe_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0);
frame = (zframe_t *)rb_thread_call_without_gvl(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
Expand Down Expand Up @@ -852,7 +852,7 @@ static VALUE rb_czmq_socket_recv_message(VALUE obj)
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
message = (zmsg_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0);
message = (zmsg_t *)rb_thread_call_without_gvl(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0);
if (message == NULL) return Qnil;
if (sock->verbose) ZmqDumpMessage("recv_message", message);
return rb_czmq_alloc_message(message);
Expand Down Expand Up @@ -894,7 +894,7 @@ static VALUE rb_czmq_socket_poll(VALUE obj, VALUE timeout)
ZmqSockGuardCrossThread(sock);
args.socket = sock;
args.timeout = FIX2INT(timeout);
readable = (bool)rb_thread_blocking_region(rb_czmq_nogvl_poll, (void *)&args, RUBY_UBF_IO, 0);
readable = (bool)rb_thread_call_without_gvl(rb_czmq_nogvl_poll, (void *)&args, RUBY_UBF_IO, 0);
return (readable == true) ? Qtrue : Qfalse;
}

Expand Down Expand Up @@ -1788,7 +1788,7 @@ static VALUE rb_czmq_socket_monitor_thread(void *arg)
while (1) {
zmq_msg_init (&args.msg_event);
zmq_msg_init (&args.msg_endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_monitor_recv, (void *)&args, RUBY_UBF_IO, 0);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_monitor_recv, (void *)&args, RUBY_UBF_IO, 0);
if (rc == -1 && (zmq_errno() == ETERM || zmq_errno() == ENOTSOCK || zmq_errno() == EINTR)) break;
if (rc == -1 && (sock->flags & ZMQ_SOCKET_DESTROYED)) break;
assert (rc != -1);
Expand Down

0 comments on commit eec394a

Please sign in to comment.