Skip to content

Commit

Permalink
WIP: threading fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jacquesg committed Feb 19, 2018
1 parent b129e43 commit 0be2ee7
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 25 deletions.
23 changes: 6 additions & 17 deletions Raw.xs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ typedef struct
int dummy;
} zmq_raw_proxy;

#define MAX_CONTEXT_COUNT 64
typedef struct my_cxt_t
{
zmq_raw_context *contexts;
zmq_raw_context contexts[MAX_CONTEXT_COUNT];
zmq_raw_timers *timers;
zmq_raw_mutex *context_mutex;
} my_cxt_t;

STATIC PerlIO *zmq_get_socket_io (SV *sv)
Expand Down Expand Up @@ -289,20 +292,8 @@ STATIC void *zmq_sv_to_ptr (const char *type, SV *sv, const char *file, int line
} \
} STMT_END

static zmq_raw_timers *timers;
static zmq_raw_mutex *timers_mutex;
STATIC void zmq_raw_timers_cleanup (void)
{
#ifndef _WIN32
zmq_raw_timers_destroy (timers);
#endif

zmq_raw_mutex_destroy (timers_mutex);
}

#define MY_CXT_KEY "ZMQ::Raw::_guts"
#define MAX_CONTEXT_COUNT 64
static zmq_raw_context contexts[MAX_CONTEXT_COUNT];

START_MY_CXT

Expand All @@ -311,10 +302,8 @@ MODULE = ZMQ::Raw PACKAGE = ZMQ::Raw
BOOT:
{
MY_CXT_INIT;
MY_CXT.contexts = contexts;

timers_mutex = zmq_raw_mutex_create();
assert (timers_mutex);
MY_CXT.timers = NULL;
MY_CXT.context_mutex = zmq_raw_mutex_create();
}

INCLUDE: const-xs-constant.inc
Expand Down
19 changes: 19 additions & 0 deletions deps/libzmqraw/timers.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ struct zmq_raw_timers
{
zmq_raw_mutex *mutex;
void *timers;
int timer_count;
void *thread;
int running;

Expand Down Expand Up @@ -70,12 +71,14 @@ zmq_raw_timers *zmq_raw_timers_create()
timers->wakeup_item.events = ZMQ_POLLIN;
timers->wakeup_item.socket = timers->wakeup_recv;
timers->mutex = zmq_raw_mutex_create();
timers->timer_count = 0;
return timers;
}

void zmq_raw_timers_destroy (zmq_raw_timers *timers)
{
assert (timers);
assert (timers->timer_count == 0);

zmq_raw_mutex_lock (timers->mutex);
timers->running = 0;
Expand All @@ -94,6 +97,17 @@ void zmq_raw_timers_destroy (zmq_raw_timers *timers)
free (timers);
}

int zmq_raw_timers_count (zmq_raw_timers *timers)
{
int count;

assert (timers);
zmq_raw_mutex_lock (timers->mutex);
count = timers->timer_count;
zmq_raw_mutex_unlock (timers->mutex);
return count;
}

static zmq_raw_timer *zmq_raw_timer_create (void *context, int after, int interval)
{
int rc;
Expand Down Expand Up @@ -141,6 +155,10 @@ static void zmq_raw_timer_destroy (zmq_raw_timer *timer)
if (timer->recv && !timer->recv_sv)
zmq_close (timer->recv);

zmq_raw_mutex_lock (timer->timers->mutex);
--timer->timers->timer_count;
zmq_raw_mutex_unlock (timer->timers->mutex);

free (timer);
}

Expand Down Expand Up @@ -188,6 +206,7 @@ zmq_raw_timer *zmq_raw_timers_start (zmq_raw_timers *timers, void *context, int
timer->timers = timers;
zmq_raw_timers__start (timer);

++timer->timers->timer_count;
zmq_raw_mutex_unlock (timers->mutex);

return timer;
Expand Down
1 change: 1 addition & 0 deletions deps/libzmqraw/timers.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ typedef struct zmq_raw_timer zmq_raw_timer;

zmq_raw_timers *zmq_raw_timers_create();
void zmq_raw_timers_destroy (zmq_raw_timers *timers);
int zmq_raw_timers_count (zmq_raw_timers *timers);

zmq_raw_timer *zmq_raw_timers_start (zmq_raw_timers *timers, void *context, int after, int interval);
void zmq_raw_timers_reset (zmq_raw_timer *timer);
Expand Down
13 changes: 13 additions & 0 deletions xs/Context.xs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ new (class)
dMY_CXT;
int i;
zmq_raw_context *ctx = NULL;
zmq_raw_mutex *context_mutex;

CODE:
context_mutex = MY_CXT.context_mutex;
zmq_raw_mutex_lock (context_mutex);

for (i = 0; i < MAX_CONTEXT_COUNT; ++i)
{
zmq_raw_context *tmp = &MY_CXT.contexts[i];
Expand All @@ -21,21 +25,30 @@ new (class)
}

if (ctx == NULL)
{
zmq_raw_mutex_unlock (context_mutex);
croak_usage ("too many contexts created");
}

ctx->context = zmq_ctx_new();
if (ctx->context == NULL)
{
zmq_raw_mutex_unlock (context_mutex);
zmq_raw_check_error (-1);
}

ctx->counter = zmq_atomic_counter_new();
if (ctx->counter == NULL)
{
zmq_raw_mutex_unlock (context_mutex);

zmq_ctx_term (ctx->context);
ctx->context = NULL;

zmq_raw_check_error (-1);
}

zmq_raw_mutex_unlock (context_mutex);
zmq_atomic_counter_inc (ctx->counter);
ZMQ_NEW_OBJ (RETVAL, "ZMQ::Raw::Context", ctx);

Expand Down
30 changes: 22 additions & 8 deletions xs/Timer.xs
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,28 @@ _new (class, context, after, interval)
SV *interval

PREINIT:
dMY_CXT;
zmq_raw_timer *timer;
zmq_raw_context *ctx;
zmq_raw_socket *sock;
zmq_raw_mutex *context_mutex;
SV *sv;

CODE:
ctx = ZMQ_SV_TO_PTR (Context, context);

zmq_raw_mutex_lock (timers_mutex);
if (timers == NULL)
context_mutex = MY_CXT.context_mutex;

zmq_raw_mutex_lock (context_mutex);
if (MY_CXT.timers == NULL)
{
timers = zmq_raw_timers_create();
if (timers == NULL)
MY_CXT.timers = zmq_raw_timers_create();
if (MY_CXT.timers == NULL)
zmq_raw_check_error (-1);

atexit (zmq_raw_timers_cleanup);
}
zmq_raw_mutex_unlock (timers_mutex);
zmq_raw_mutex_unlock (context_mutex);

timer = zmq_raw_timers_start (timers, ctx->context,
timer = zmq_raw_timers_start (MY_CXT.timers, ctx->context,
after, SvIOK (interval) ? SvIV (interval) : 0);
if (timer == NULL)
zmq_raw_check_error (-1);
Expand Down Expand Up @@ -127,10 +129,15 @@ DESTROY (self)
SV *self

PREINIT:
dMY_CXT;
zmq_raw_timer *timer;
zmq_raw_mutex *context_mutex;
SV *recv;

CODE:
context_mutex = MY_CXT.context_mutex;
zmq_raw_mutex_lock (context_mutex);

timer = ZMQ_SV_TO_PTR (Timer, self);
recv = MUTABLE_SV (zmq_raw_timer_get_sv (timer));

Expand All @@ -139,3 +146,10 @@ DESTROY (self)
SvREFCNT_dec (recv);
SvREFCNT_dec (ZMQ_SV_TO_MAGIC (self));

if (zmq_raw_timers_count (MY_CXT.timers) == 0)
{
zmq_raw_timers_destroy (MY_CXT.timers);
MY_CXT.timers = NULL;
}
zmq_raw_mutex_unlock (context_mutex);

0 comments on commit 0be2ee7

Please sign in to comment.