Skip to content

Commit

Permalink
Eliminated some deadlocks in CodecAwareConduit by serializing most me…
Browse files Browse the repository at this point in the history
…thod calls with a single mutex (seems to resolve ticket #23)
  • Loading branch information
cstawarz committed Jun 5, 2012
1 parent 63c5ba7 commit fb8d3df
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 69 deletions.
139 changes: 82 additions & 57 deletions core/Core/Communications/Conduits/CodecAwareConduit.cpp
Expand Up @@ -11,85 +11,106 @@
#include "SystemEventFactory.h"
#include "StandardVariables.h"

CodecAwareConduit::CodecAwareConduit(shared_ptr<EventTransport> _transport) : SimpleConduit(_transport){
local_codec_code_counter = 0;
}


void CodecAwareConduit::transmitCodecEvent(){
// This is called internally from methods that already hold conduit_mutex

//std::cerr << "transmitting codec" << endl;
//mw::Datum codec_datum(local_codec);
mw::Datum codec_datum = construct_simple_codec_datum_from_map(local_codec);
sendData(RESERVED_CODEC_CODE, codec_datum);
}


void CodecAwareConduit::receiveCodecEvent(shared_ptr<Event> evt){
{
boost::mutex::scoped_lock lock(remote_codec_lock);

remote_codec = extract_simple_codec_map(evt);
remote_reverse_codec = reverse_simple_codec_map(remote_codec);

rebuildEventCallbacks();
}
// This is called from handleCallbacks, which already holds conduit_mutex

remote_codec = extract_simple_codec_map(evt);
remote_reverse_codec = reverse_simple_codec_map(remote_codec);

rebuildEventCallbacks();

remote_codec_cond.notify_all();
}


void CodecAwareConduit::receiveControlEvent(shared_ptr<Event> evt){
// This is called from handleCallbacks, which already holds conduit_mutex

Datum payload_type = evt->getData().getElement(M_SYSTEM_PAYLOAD_TYPE);
if((int)payload_type == M_REQUEST_CODEC){
transmitCodecEvent();
}
}


void CodecAwareConduit::addEventCallback(const std::string &evt_name, EventCallback cb) {
// This is called internally from methods that already hold conduit_mutex

int incoming_code = M_UNDEFINED_EVENT_CODE;

if (remote_reverse_codec.find(evt_name) != remote_reverse_codec.end()) {
incoming_code = remote_reverse_codec[evt_name];
} else {
//std::cerr << "Cannot register callback for unknown name: " << evt_name << " (maybe codec hasn't been received yet?)" << endl;
// send a request, just in case no one else has
// sendData(SystemEventFactory::requestCodecControl());
return;
}

//std::cerr << "registering callback for code: " << incoming_code << " to name: " << evt_name << " in: " << this << std::endl;

// find out if that event name has a different code on this side of the conduit
// if so, we'll want to create a translation callback
int local_code = M_UNDEFINED_EVENT_CODE;

if (local_reverse_codec.find(evt_name) != local_reverse_codec.end()) {
local_code = local_reverse_codec[evt_name];
}

if (M_UNDEFINED_EVENT_CODE == local_code) {
registerCallback(incoming_code, cb, string(name_defined_callback_key));
} else {
EventCallback cb2 = boost::bind(&CodecAwareConduit::codeTranslatedCallback, _1, cb, local_code);
registerCallback(incoming_code, cb2, string(name_defined_callback_key));
}

// send a request to the other side of the conduit to begin forwarding this event
sendData(SystemEventFactory::setEventForwardingControl(evt_name, true));
}


void CodecAwareConduit::rebuildEventCallbacks(){
// This is called internally from methods that already hold conduit_mutex

//std::cerr << "rebuildEventCallbacks called in: " << this << std::endl;

unregisterCallbacks(name_defined_callback_key);

map<string, EventCallback>::iterator i;
for(i = callbacks_by_name.begin(); i != callbacks_by_name.end(); ++i){
for (map<string, EventCallback>::iterator i = callbacks_by_name.begin(); i != callbacks_by_name.end(); ++i) {
string evt_name = (*i).first;
EventCallback cb = (*i).second;

int incoming_code;
if(remote_reverse_codec.find(evt_name) != remote_reverse_codec.end()){
incoming_code = remote_reverse_codec[evt_name];
} else {
//std::cerr << "Cannot register callback for unknown name: " << evt_name << " (maybe codec hasn't been received yet?)" << endl;
// send a request, just in case no one else has
// sendData(SystemEventFactory::requestCodecControl());
continue;
}

//std::cerr << "registering callback for code: " << incoming_code << " to name: " << evt_name << " in: " << this << std::endl;


// find out if that event name has a different code on this side of the conduit
// if so, we'll want to create a translation callback
if(local_reverse_codec.find(evt_name) != local_reverse_codec.end()){
int local_code = local_reverse_codec[evt_name];
EventCallback cb2 = boost::bind(&CodecAwareConduit::codeTranslatedCallback, shared_from_this(), _1, cb, local_code);
registerCallback(incoming_code, cb2, string(name_defined_callback_key));
} else {
registerCallback(incoming_code, cb, string(name_defined_callback_key));
}

// send a request to the other side of the conduit to begin forwarding this event
sendData(SystemEventFactory::setEventForwardingControl(evt_name, true));
addEventCallback(evt_name, cb);
}

}


void CodecAwareConduit::codeTranslatedCallback(shared_ptr<Event> evt, EventCallback cb, int new_code){
// This is a static method that does not need to hold conduit_mutex

//std::cerr << "calling code translated callback; new_code: " << new_code << " (old code: " << evt->getEventCode() << ")" << endl;
evt->setEventCode(new_code);
return cb(evt);
}


void CodecAwareConduit::handleCallbacks(shared_ptr<Event> evt) {
boost::mutex::scoped_lock lock(conduit_mutex);
SimpleConduit::handleCallbacks(evt);
}


bool CodecAwareConduit::initialize(){
//std::cerr << "initializing: " << this << std::endl;
//EventCallback cb = boost::bind(&CodecAwareConduit::receiveCodecEvent, this, _1);
Expand All @@ -105,23 +126,26 @@ bool CodecAwareConduit::initialize(){
return okayp;
}


void CodecAwareConduit::registerCallbackByName(string event_name, EventCallback cb){
boost::mutex::scoped_lock lock(remote_codec_lock);
boost::mutex::scoped_lock lock(conduit_mutex);

if(event_name.size() == 0){
throw SimpleException("Attempt to register callback for empty event name");
}
callbacks_by_name[event_name] = cb;

rebuildEventCallbacks();
callbacks_by_name[event_name] = cb;
addEventCallback(event_name, cb);

//std::cerr << "Sending event forwarding event for " << event_name << std::endl;
sendData(SystemEventFactory::setEventForwardingControl(event_name, true));
}


void CodecAwareConduit::registerLocalEventCode(int code, string event_name){
boost::mutex::scoped_lock lock(conduit_mutex);

//std::cerr << "registering local name: " << event_name << " to code: " << code << endl;
boost::mutex::scoped_lock lock(local_codec_lock);
local_codec[code] = event_name;

local_reverse_codec = reverse_simple_codec_map(local_codec);
Expand All @@ -130,6 +154,20 @@ void CodecAwareConduit::registerLocalEventCode(int code, string event_name){
}


map<int, string> CodecAwareConduit::getRemoteCodec() {
boost::mutex::scoped_lock lock(conduit_mutex);
waitForRemoteCodec(lock);
return remote_codec;
}


map<string, int> CodecAwareConduit::getRemoteReverseCodec() {
boost::mutex::scoped_lock lock(conduit_mutex);
waitForRemoteCodec(lock);
return remote_reverse_codec;
}


void CodecAwareConduit::waitForRemoteCodec(boost::mutex::scoped_lock &lock) {
if (remote_codec.empty()) {
sendData(SystemEventFactory::requestCodecControl());
Expand All @@ -144,19 +182,6 @@ void CodecAwareConduit::waitForRemoteCodec(boost::mutex::scoped_lock &lock) {
}


map<int, string> CodecAwareConduit::getRemoteCodec() {
boost::mutex::scoped_lock lock(remote_codec_lock);
waitForRemoteCodec(lock);
return remote_codec;
}

map<string, int> CodecAwareConduit::getRemoteReverseCodec() {
boost::mutex::scoped_lock lock(remote_codec_lock);
waitForRemoteCodec(lock);
return remote_reverse_codec;
}





Expand Down
24 changes: 12 additions & 12 deletions core/Core/Communications/Conduits/CodecAwareConduit.h
Expand Up @@ -26,33 +26,31 @@ using namespace mw;
class CodecAwareConduit : public SimpleConduit, public enable_shared_from_this<CodecAwareConduit> {

protected:
boost::mutex conduit_mutex;

// A local codec mapping local codes to event names
// when changed, this will get transmitted over the
boost::mutex local_codec_lock;
map<int, string> local_codec;
map<string, int> local_reverse_codec;
int local_codec_code_counter;

boost::mutex remote_codec_lock;
boost::condition_variable remote_codec_cond;
map<int, string> remote_codec;
map<string, int> remote_reverse_codec;
boost::condition_variable remote_codec_cond;

map<string, EventCallback> callbacks_by_name;

void transmitCodecEvent();
void addEventCallback(const std::string &name, EventCallback cb);
void rebuildEventCallbacks();
void waitForRemoteCodec(boost::mutex::scoped_lock &lock);

public:

CodecAwareConduit(shared_ptr<EventTransport> _transport);
CodecAwareConduit(shared_ptr<EventTransport> _transport) : SimpleConduit(_transport) { }
virtual ~CodecAwareConduit(){
// grab these locks, so that we can ensure that
// anyone else who had them is done
boost::mutex::scoped_lock l1(local_codec_lock);
boost::mutex::scoped_lock l2(remote_codec_lock);
// grab the lock, so that we can ensure that
// anyone else who had it is done
boost::mutex::scoped_lock lock(conduit_mutex);
}


Expand All @@ -63,10 +61,12 @@ class CodecAwareConduit : public SimpleConduit, public enable_shared_from_this<C
void receiveCodecEvent(shared_ptr<Event> evt);
void receiveControlEvent(shared_ptr<Event> evt);

void codeTranslatedCallback(shared_ptr<Event> evt, EventCallback cb, int new_code);
static void codeTranslatedCallback(shared_ptr<Event> evt, EventCallback cb, int new_code);

virtual void handleCallbacks(shared_ptr<Event> evt);

map<int, string> getLocalCodec(){ return local_codec; }
map<string, int> getLocalReverseCodec(){ return local_reverse_codec; }
//map<int, string> getLocalCodec(){ return local_codec; }
//map<string, int> getLocalReverseCodec(){ return local_reverse_codec; }
map<int, string> getRemoteCodec();
map<string, int> getRemoteReverseCodec();

Expand Down

0 comments on commit fb8d3df

Please sign in to comment.