Skip to content

Commit

Permalink
a mutex per node
Browse files Browse the repository at this point in the history
  • Loading branch information
paukul committed Mar 25, 2010
1 parent 0c7531a commit 80ad3af
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 21 deletions.
4 changes: 2 additions & 2 deletions examples/rabbitmq_queues_rpc.rb
@@ -1,7 +1,7 @@
`make clean; make all`
require File.expand_path('../../lib/intruder', __FILE__)
hostname = `hostname`.chomp

#hostname = `hostname`.chomp
hostname = 'nb-pfriederich'
n = Intruder::Node.new('fooz', File.read(File.expand_path('~/.erlang.cookie')))

puts "--- rabbit call ---\n"
Expand Down
5 changes: 2 additions & 3 deletions src/mod.c
Expand Up @@ -3,7 +3,6 @@
VALUE IntruderMod = Qnil;
extern VALUE IntruderModule;
extern VALUE IntruderTerm;
extern pthread_mutex_t mutex;

VALUE intruder_mod_init(VALUE self, VALUE modname, VALUE node) {
rb_iv_set(self, "@node", node);
Expand Down Expand Up @@ -46,9 +45,9 @@ VALUE intruder_mod_rpc(VALUE self, VALUE args) {

/* RPC call */
DEBUG("\nrpc call to %s:%s\n", mod, RSTRING_PTR(fun));
pthread_mutex_lock(&mutex);
lock_node(inode);
ret = ei_rpc(inode->cnode, inode->fd, mod, RSTRING_PTR(fun), rpcargs.buff, rpcargs.index, &result);
pthread_mutex_unlock(&mutex);
unlock_node(inode);
if (ret < 0)
raise_rException_for_erl_errno();

Expand Down
36 changes: 20 additions & 16 deletions src/node.c
Expand Up @@ -14,19 +14,18 @@ VALUE IntruderNode = Qnil;
VALUE IntruderNodeException = Qnil;
int node_count = 0;
pthread_t alive_thread;
INTRUDER_NODE **connectlist = (INTRUDER_NODE **)malloc(sizeof(INTRUDER_NODE*) * CONBUFFSIZE);
INTRUDER_NODE **connectlist;
fd_set socks;
int highsock;
int readsocks;
int connectlist_inited = 0;
pthread_mutex_t mutex;

/* internal methods */
static void declare_attr_accessors();
static void free_class_struct(void *class_struct);

void lock_node(INTRUDER_NODE node);
void unlock_node(INTRUDER_NODE node);
void lock_node(INTRUDER_NODE *node);
void unlock_node(INTRUDER_NODE *node);
void read_socks();
void *aliveloop();
void build_select_list();
Expand All @@ -36,13 +35,16 @@ VALUE intruder_node_alloc(VALUE class){
INTRUDER_NODE *class_struct = malloc(sizeof(INTRUDER_NODE));
class_struct->cnode = malloc(sizeof(ei_cnode));
class_struct->status = INTRUDER_DISCONNECTED;
class_struct->mutex = malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(class_struct->mutex, NULL);

VALUE obj = Data_Wrap_Struct(class, 0, free_class_struct, class_struct);
return obj;
}

static void free_class_struct(void *class_struct)
{
pthread_mutex_destroy(((INTRUDER_NODE *)class_struct)->mutex);
free(((INTRUDER_NODE *)class_struct)->cnode);
free(class_struct);
}
Expand All @@ -53,9 +55,9 @@ VALUE intruder_node_init(VALUE self, VALUE sname, VALUE cookie){
rb_iv_set(self, "@cookie", cookie);

if (!connectlist_inited) {
connectlist = (INTRUDER_NODE **)malloc(sizeof(INTRUDER_NODE*) * CONBUFFSIZE);
printf("initializing connectlist\n");
connectlist_inited = 1;
memset((char *) &connectlist, 0, sizeof(connectlist));
}

/* initialize the node */
Expand Down Expand Up @@ -84,15 +86,14 @@ VALUE intruder_node_pid(VALUE self){
VALUE intruder_node_connect(VALUE self, VALUE remote_node){
INTRUDER_NODE *class_struct;
Data_Get_Struct(self, INTRUDER_NODE, class_struct);
int fd;

if((class_struct->fd = ei_connect(class_struct->cnode, RSTRING_PTR(remote_node))) < 0)
raise_rException_for_erl_errno();

class_struct->status = INTRUDER_CONNECTED;

/* printf("setting node %d on the connectlist\n", node_count); */
*(connectlist[node_count]) = class_struct;
connectlist[node_count] = class_struct;
node_count++;

if (alive_thread == NULL) {
Expand Down Expand Up @@ -138,9 +139,9 @@ void build_select_list() {
for (listnum = 0; listnum < 5; listnum++) {
if (connectlist[listnum] != 0) {
/* printf("adding fd %d (%d) to connectlist\n", connectlist[listnum], listnum); */
FD_SET(connectlist[listnum],&socks);
if (connectlist[listnum] > highsock)
highsock = connectlist[listnum];
FD_SET(connectlist[listnum]->fd, &socks);
if (connectlist[listnum]->fd > highsock)
highsock = connectlist[listnum]->fd;
}
}
}
Expand All @@ -153,13 +154,16 @@ void read_socks() {
/* printf("reading open sockets\n"); */
printf("Nodecount = %d\n", node_count);
for (listnum = 0; listnum < node_count; listnum++) {
printf("checking socket %d (%d)\n", connectlist[listnum], listnum);
if (connectlist[listnum] != 0 && FD_ISSET(connectlist[listnum], &socks))
printf("checking socket %d (%d)\n", connectlist[listnum]->fd, listnum);
if (connectlist[listnum] != 0 && FD_ISSET(connectlist[listnum]->fd, &socks)) {
printf("erl_receive\n");
got = erl_receive(connectlist[listnum], buf, 400);
if (got == ERL_TICK) {
printf("keeping alive!!\n");
continue;
got = erl_receive(connectlist[listnum]->fd, buf, 400);
if (got == ERL_TICK) {
printf("keeping alive!!\n");
continue;
} else {
rb_raise(rb_eRuntimeError, "Keep Alive thread cought a message other than ERL_TICK");
}
}
}
}
Expand Down

0 comments on commit 80ad3af

Please sign in to comment.