Skip to content

Commit

Permalink
mutex madness
Browse files Browse the repository at this point in the history
  • Loading branch information
paukul committed Mar 25, 2010
1 parent 80ad3af commit 2f89231
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 27 deletions.
4 changes: 3 additions & 1 deletion examples/echoserver.rb
Expand Up @@ -8,7 +8,9 @@
include Intruder

n = Node.new('fazy', File.read(File.expand_path('~/.erlang.cookie')))
n.connect("snaps@#{`hostname`.chomp}")
#hostname = `hostname`.chomp
hostname = 'nb-pfriederich'
n.connect("snaps@#{hostname}")

test = n.mod('test')
p test.say(Intruder::Term.encode([:a]))
Expand Down
24 changes: 13 additions & 11 deletions src/mod.c
Expand Up @@ -47,18 +47,20 @@ VALUE intruder_mod_rpc(VALUE self, VALUE args) {
DEBUG("\nrpc call to %s:%s\n", mod, RSTRING_PTR(fun));
lock_node(inode);
ret = ei_rpc(inode->cnode, inode->fd, mod, RSTRING_PTR(fun), rpcargs.buff, rpcargs.index, &result);
unlock_node(inode);
if (ret < 0)
if (ret < 0) {
raise_rException_for_erl_errno();

ETERM *tuplep;
tuplep = erl_decode(result.buff);
ei_x_free(&rpcargs);
ei_x_free(&result);
/* printf("result: \n"); */
/* erl_print_term(stdout, tuplep); */
/* fflush(stdout); */
return rb_value_from_eterm(tuplep);
return Qnil;
} else {
ETERM *tuplep;
tuplep = erl_decode(result.buff);
ei_x_free(&rpcargs);
ei_x_free(&result);
unlock_node(inode);
/* printf("result: \n"); */
/* erl_print_term(stdout, tuplep); */
/* fflush(stdout); */
return rb_value_from_eterm(tuplep);
}
}

void Init_intruder_mod() {
Expand Down
57 changes: 42 additions & 15 deletions src/node.c
Expand Up @@ -14,15 +14,18 @@ VALUE IntruderNode = Qnil;
VALUE IntruderNodeException = Qnil;
int node_count = 0;
pthread_t alive_thread;
pthread_mutex_t **mutexes_locked_for_keep_alive;
INTRUDER_NODE **connectlist;
fd_set socks;
int highsock;
int readsocks;
int connectlist_inited = 0;
unsigned int tmo = 200;

/* internal methods */
static void declare_attr_accessors();
static void free_class_struct(void *class_struct);
static void release_locks();

void lock_node(INTRUDER_NODE *node);
void unlock_node(INTRUDER_NODE *node);
Expand Down Expand Up @@ -56,6 +59,7 @@ VALUE intruder_node_init(VALUE self, VALUE sname, VALUE cookie){

if (!connectlist_inited) {
connectlist = (INTRUDER_NODE **)malloc(sizeof(INTRUDER_NODE*) * CONBUFFSIZE);
mutexes_locked_for_keep_alive = (pthread_mutex_t **)malloc(sizeof(pthread_mutex_t *) * CONBUFFSIZE);
printf("initializing connectlist\n");
connectlist_inited = 1;
}
Expand Down Expand Up @@ -87,12 +91,12 @@ VALUE intruder_node_connect(VALUE self, VALUE remote_node){
INTRUDER_NODE *class_struct;
Data_Get_Struct(self, INTRUDER_NODE, class_struct);

if((class_struct->fd = ei_connect(class_struct->cnode, RSTRING_PTR(remote_node))) < 0)
if((class_struct->fd = ei_connect_tmo(class_struct->cnode, RSTRING_PTR(remote_node), tmo)) < 0)
raise_rException_for_erl_errno();

class_struct->status = INTRUDER_CONNECTED;

/* printf("setting node %d on the connectlist\n", node_count); */
/* printf("setting node %d on the connectlist\n", node_count); */
connectlist[node_count] = class_struct;
node_count++;

Expand Down Expand Up @@ -133,15 +137,21 @@ static void declare_attr_accessors(){
}

void build_select_list() {
int listnum;
int listnum, locks = 0;
FD_ZERO(&socks);
/* printf("building select list\n"); */
for (listnum = 0; listnum < 5; listnum++) {
if (connectlist[listnum] != 0) {
/* printf("adding fd %d (%d) to connectlist\n", connectlist[listnum], listnum); */
FD_SET(connectlist[listnum]->fd, &socks);
if (connectlist[listnum]->fd > highsock)
highsock = connectlist[listnum]->fd;

for (listnum = 0; listnum < CONBUFFSIZE; listnum++) {
if (connectlist[listnum] != NULL) {
printf("may add fd %d (%d) to connectlist\n", connectlist[listnum]->fd, listnum);
if (pthread_mutex_trylock(connectlist[listnum]->mutex)) {
mutexes_locked_for_keep_alive[locks++] = connectlist[listnum]->mutex;
FD_SET(connectlist[listnum]->fd, &socks);
if (connectlist[listnum]->fd > highsock)
highsock = connectlist[listnum]->fd;
}
else {
printf("fd %d is locked, skipping\n", connectlist[listnum]->fd);
}
}
}
}
Expand All @@ -151,7 +161,7 @@ void read_socks() {
int got;
char buf[400];

/* printf("reading open sockets\n"); */
/* printf("reading open sockets\n"); */
printf("Nodecount = %d\n", node_count);
for (listnum = 0; listnum < node_count; listnum++) {
printf("checking socket %d (%d)\n", connectlist[listnum]->fd, listnum);
Expand All @@ -162,7 +172,7 @@ void read_socks() {
printf("keeping alive!!\n");
continue;
} else {
rb_raise(rb_eRuntimeError, "Keep Alive thread cought a message other than ERL_TICK");
/* rb_raise(rb_eRuntimeError, "Keep Alive thread cought a message other than ERL_TICK"); */
}
}
}
Expand All @@ -176,7 +186,7 @@ void *aliveloop() {
timeout.tv_usec = 0;

build_select_list();
/* printf("waiting for sockets\n"); */
/* printf("waiting for sockets\n"); */
readsocks = select(highsock+1, &socks, (fd_set *) 0, (fd_set *) 0, &timeout);

if (readsocks < 0) {
Expand All @@ -186,10 +196,27 @@ void *aliveloop() {
if (readsocks == 0) {
/* Nothing ready to read, just show that
we're alive */
/* printf("."); */
/* fflush(stdout); */
/* printf("."); */
/* fflush(stdout); */
} else
read_socks();
release_locks(); /* TODO: maybe return the locklist from build_select_list and pass in as an argument here */
}
}

static void release_locks() {
int i;
pthread_mutex_t *mutex;
printf("cleaning up locks");
for (i = 0; i < CONBUFFSIZE; i++) {
if ((mutex = mutexes_locked_for_keep_alive[i]) != NULL) {
printf(" %d", i);
pthread_mutex_unlock(mutex);
mutexes_locked_for_keep_alive[i] = NULL;
} else {
printf("\n");
break;
}
}
}

Expand Down

0 comments on commit 2f89231

Please sign in to comment.