From 80ad3af6133cc4de27d1e1efb0dbd077f0ab443a Mon Sep 17 00:00:00 2001 From: Pascal Friederich Date: Thu, 25 Mar 2010 14:10:36 +0100 Subject: [PATCH] a mutex per node --- examples/rabbitmq_queues_rpc.rb | 4 ++-- src/mod.c | 5 ++--- src/node.c | 36 ++++++++++++++++++--------------- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/examples/rabbitmq_queues_rpc.rb b/examples/rabbitmq_queues_rpc.rb index 7347772..e0c69c7 100644 --- a/examples/rabbitmq_queues_rpc.rb +++ b/examples/rabbitmq_queues_rpc.rb @@ -1,7 +1,7 @@ `make clean; make all` require File.expand_path('../../lib/intruder', __FILE__) -hostname = `hostname`.chomp - +#hostname = `hostname`.chomp +hostname = 'nb-pfriederich' n = Intruder::Node.new('fooz', File.read(File.expand_path('~/.erlang.cookie'))) puts "--- rabbit call ---\n" diff --git a/src/mod.c b/src/mod.c index e810ad9..617c4de 100644 --- a/src/mod.c +++ b/src/mod.c @@ -3,7 +3,6 @@ VALUE IntruderMod = Qnil; extern VALUE IntruderModule; extern VALUE IntruderTerm; -extern pthread_mutex_t mutex; VALUE intruder_mod_init(VALUE self, VALUE modname, VALUE node) { rb_iv_set(self, "@node", node); @@ -46,9 +45,9 @@ VALUE intruder_mod_rpc(VALUE self, VALUE args) { /* RPC call */ DEBUG("\nrpc call to %s:%s\n", mod, RSTRING_PTR(fun)); - pthread_mutex_lock(&mutex); + lock_node(inode); ret = ei_rpc(inode->cnode, inode->fd, mod, RSTRING_PTR(fun), rpcargs.buff, rpcargs.index, &result); - pthread_mutex_unlock(&mutex); + unlock_node(inode); if (ret < 0) raise_rException_for_erl_errno(); diff --git a/src/node.c b/src/node.c index bf9a81d..ca9a664 100644 --- a/src/node.c +++ b/src/node.c @@ -14,19 +14,18 @@ VALUE IntruderNode = Qnil; VALUE IntruderNodeException = Qnil; int node_count = 0; pthread_t alive_thread; -INTRUDER_NODE **connectlist = (INTRUDER_NODE **)malloc(sizeof(INTRUDER_NODE*) * CONBUFFSIZE); +INTRUDER_NODE **connectlist; fd_set socks; int highsock; int readsocks; int connectlist_inited = 0; -pthread_mutex_t mutex; /* internal methods */ static void declare_attr_accessors(); static void free_class_struct(void *class_struct); -void lock_node(INTRUDER_NODE node); -void unlock_node(INTRUDER_NODE node); +void lock_node(INTRUDER_NODE *node); +void unlock_node(INTRUDER_NODE *node); void read_socks(); void *aliveloop(); void build_select_list(); @@ -36,6 +35,8 @@ VALUE intruder_node_alloc(VALUE class){ INTRUDER_NODE *class_struct = malloc(sizeof(INTRUDER_NODE)); class_struct->cnode = malloc(sizeof(ei_cnode)); class_struct->status = INTRUDER_DISCONNECTED; + class_struct->mutex = malloc(sizeof(pthread_mutex_t)); + pthread_mutex_init(class_struct->mutex, NULL); VALUE obj = Data_Wrap_Struct(class, 0, free_class_struct, class_struct); return obj; @@ -43,6 +44,7 @@ VALUE intruder_node_alloc(VALUE class){ static void free_class_struct(void *class_struct) { + pthread_mutex_destroy(((INTRUDER_NODE *)class_struct)->mutex); free(((INTRUDER_NODE *)class_struct)->cnode); free(class_struct); } @@ -53,9 +55,9 @@ VALUE intruder_node_init(VALUE self, VALUE sname, VALUE cookie){ rb_iv_set(self, "@cookie", cookie); if (!connectlist_inited) { + connectlist = (INTRUDER_NODE **)malloc(sizeof(INTRUDER_NODE*) * CONBUFFSIZE); printf("initializing connectlist\n"); connectlist_inited = 1; - memset((char *) &connectlist, 0, sizeof(connectlist)); } /* initialize the node */ @@ -84,7 +86,6 @@ VALUE intruder_node_pid(VALUE self){ VALUE intruder_node_connect(VALUE self, VALUE remote_node){ INTRUDER_NODE *class_struct; Data_Get_Struct(self, INTRUDER_NODE, class_struct); - int fd; if((class_struct->fd = ei_connect(class_struct->cnode, RSTRING_PTR(remote_node))) < 0) raise_rException_for_erl_errno(); @@ -92,7 +93,7 @@ VALUE intruder_node_connect(VALUE self, VALUE remote_node){ class_struct->status = INTRUDER_CONNECTED; /* printf("setting node %d on the connectlist\n", node_count); */ - *(connectlist[node_count]) = class_struct; + connectlist[node_count] = class_struct; node_count++; if (alive_thread == NULL) { @@ -138,9 +139,9 @@ void build_select_list() { for (listnum = 0; listnum < 5; listnum++) { if (connectlist[listnum] != 0) { /* printf("adding fd %d (%d) to connectlist\n", connectlist[listnum], listnum); */ - FD_SET(connectlist[listnum],&socks); - if (connectlist[listnum] > highsock) - highsock = connectlist[listnum]; + FD_SET(connectlist[listnum]->fd, &socks); + if (connectlist[listnum]->fd > highsock) + highsock = connectlist[listnum]->fd; } } } @@ -153,13 +154,16 @@ void read_socks() { /* printf("reading open sockets\n"); */ printf("Nodecount = %d\n", node_count); for (listnum = 0; listnum < node_count; listnum++) { - printf("checking socket %d (%d)\n", connectlist[listnum], listnum); - if (connectlist[listnum] != 0 && FD_ISSET(connectlist[listnum], &socks)) + printf("checking socket %d (%d)\n", connectlist[listnum]->fd, listnum); + if (connectlist[listnum] != 0 && FD_ISSET(connectlist[listnum]->fd, &socks)) { printf("erl_receive\n"); - got = erl_receive(connectlist[listnum], buf, 400); - if (got == ERL_TICK) { - printf("keeping alive!!\n"); - continue; + got = erl_receive(connectlist[listnum]->fd, buf, 400); + if (got == ERL_TICK) { + printf("keeping alive!!\n"); + continue; + } else { + rb_raise(rb_eRuntimeError, "Keep Alive thread cought a message other than ERL_TICK"); + } } } }