Skip to content

Commit

Permalink
do less stuff when we're running non-threaded
Browse files Browse the repository at this point in the history
Don't use a self-pipe for the event queue, we're not blocking
  • Loading branch information
slyphon committed May 15, 2012
1 parent 11c538b commit a503cb0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 52 deletions.
92 changes: 40 additions & 52 deletions ext/event_lib.c
Expand Up @@ -21,7 +21,6 @@ slyphon@gmail.com
*/

#include "ruby.h"
#include "event_lib.h"
#include "c-client-src/zookeeper.h"
#include <errno.h>
#include <stdio.h>
Expand All @@ -30,82 +29,59 @@ slyphon@gmail.com
#include <unistd.h>
#include <inttypes.h>
#include "common.h"
#include "event_lib.h"
#include "dbg.h"

#ifndef THREADED
#define USE_XMALLOC
#endif

#define GET_SYM(str) ID2SYM(rb_intern(str))

int ZKRBDebugging;

#ifdef THREADED

#if THREADED
pthread_mutex_t zkrb_q_mutex = PTHREAD_MUTEX_INITIALIZER;
#endif

inline static int global_mutex_lock() {
int rv = pthread_mutex_lock(&zkrb_q_mutex);
int rv=0;
#if THREADED
rv = pthread_mutex_lock(&zkrb_q_mutex);
if (rv != 0) log_err("global_mutex_lock error");
#endif
return rv;
}

inline static int global_mutex_unlock() {
int rv = pthread_mutex_unlock(&zkrb_q_mutex);
int rv=0;
#if THREADED
rv = pthread_mutex_unlock(&zkrb_q_mutex);
if (rv != 0) log_err("global_mutex_unlock error");
#endif
return rv;
}

void atfork_prepare() {
global_mutex_lock();
}

void atfork_parent() {
global_mutex_unlock();
}

void atfork_child() {
global_mutex_unlock();
}

// set up handlers to make sure the thread being forked holds the lock at the
// time the process is copied, then immediately unlock the mutex in both parent
// and children
/*pthread_atfork(atfork_prepare, atfork_parent, atfork_child);*/

// delegates to the system malloc (we can't use xmalloc in the threaded case,
// as we can't touch the interpreter)

inline static void* zk_malloc(size_t size) {
return malloc(size);
}

inline static void zk_free(void *ptr) {
free(ptr);
}

#else

inline static int global_mutex_lock() {
return 0;
}

inline static int global_mutex_unlock() {
return 0;
}

// we can use the ruby xmalloc/xfree that will raise errors
// in the case of a failure to allocate memory, and can cycle
// the garbage collector in some cases.

inline static void* zk_malloc(size_t size) {
#ifdef USE_XMALLOC
return xmalloc(size);
#else
return malloc(size);
#endif
}

inline static void zk_free(void *ptr) {
#ifdef USE_XMALLOC
xfree(ptr);
#else
free(ptr);
#endif
}


#endif /* THREADED */


void zkrb_enqueue(zkrb_queue_t *q, zkrb_event_t *elt) {
if (q == NULL) {
zkrb_debug("zkrb_enqueue, queue ptr was NULL");
Expand All @@ -125,13 +101,15 @@ void zkrb_enqueue(zkrb_queue_t *q, zkrb_event_t *elt) {
q->tail->event = NULL;
q->tail->next = NULL;

ssize_t ret = write(q->pipe_write, "0", 1); /* Wake up Ruby listener */

global_mutex_unlock();

if (ret < 0) {
#if THREADED
ssize_t ret = write(q->pipe_write, "0", 1); /* Wake up Ruby listener */

if (ret < 0)
log_err("write to queue (%p) pipe failed!\n", q);
}
#endif

}

// NOTE: the zkrb_event_t* returned *is* the same pointer that's part of the
Expand Down Expand Up @@ -182,8 +160,10 @@ void zkrb_signal(zkrb_queue_t *q) {

global_mutex_lock();

#if THREADED
if (!write(q->pipe_write, "0", 1)) /* Wake up Ruby listener */
log_err("zkrb_signal: write to pipe failed, could not wake");
#endif

global_mutex_unlock();
}
Expand All @@ -200,10 +180,12 @@ zkrb_event_ll_t *zkrb_event_ll_t_alloc(void) {
}

zkrb_queue_t *zkrb_queue_alloc(void) {
int pfd[2];
zkrb_queue_t *rq = NULL;

#if THREADED
int pfd[2];
check(pipe(pfd) == 0, "creating the signal pipe failed");
#endif

rq = zk_malloc(sizeof(zkrb_queue_t));
check_mem(rq);
Expand All @@ -214,8 +196,11 @@ zkrb_queue_t *zkrb_queue_alloc(void) {
check_mem(rq->head);

rq->tail = rq->head;

#if THREADED
rq->pipe_read = pfd[0];
rq->pipe_write = pfd[1];
#endif

return rq;

Expand All @@ -233,8 +218,11 @@ void zkrb_queue_free(zkrb_queue_t *queue) {
}

zk_free(queue->head);

#if THREADED
close(queue->pipe_read);
close(queue->pipe_write);
#endif

zk_free(queue);
}
Expand Down
2 changes: 2 additions & 0 deletions ext/zkrb.c
Expand Up @@ -722,12 +722,14 @@ static VALUE method_zkrb_get_next_event_st(VALUE self) {
rval = zkrb_event_to_ruby(event);
zkrb_event_free(event);

#if THREADED
int fd = zk->queue->pipe_read;

// we don't care in this case. this is just until i can remove the self
// pipe from the queue
char b[128];
while(read(fd, b, sizeof(b)) == sizeof(b)){}
#endif
}

return rval;
Expand Down

0 comments on commit a503cb0

Please sign in to comment.