Skip to content

Commit

Permalink
Refactor ExecutionEngine operation processing for slave components
Browse files Browse the repository at this point in the history
The patch reverts most of the previous patches applied to fix the issue that `OwnThread` operations provided by components running in a SlaveActivity have only been executed when the slave was updated, but never asynchronously like for normal activities. This caused dead-locks in certain situations when calling operations of slaves that are never updated before the call returns. The history of the reverted patches started with #35, with follow-ups in https://github.com/orocos-toolchain/rtt/issue/59, #60 and #71.

The `slave_test` added in one of the original PRs was not reverted and still succeeds with the new patch.

The new solution to the original problem is much less intrusive: Instead of forwarding the operations calls itself and making the ExecutionEngine aware of the existence of masters and slaves, the SlaveActivity that gets triggered (e.g. as a consequence of an operation call) enqueues a message in the master's ExecutionEngine that processes the triggers in the slave's ExecutionEngine `work(Trigger)` callback, which then processes all the pending messages (and port callbacks) of the slave. This also works across multiple levels of master/slave relations. The difference to the previous patch is the pending messages are still enqueued in the engine of the actual runner and explicit calls to `Slave.update()` from the master thread will also process the pending operations of the slave - but not of the master.

Signed-off-by: Johannes Meyer <johannes@intermodalics.eu>
  • Loading branch information
meyerj committed May 8, 2019
1 parent 47cdabe commit c1a5e75
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 94 deletions.
47 changes: 1 addition & 46 deletions rtt/ExecutionEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
#include "internal/MWSRQueue.hpp"
#include "TaskContext.hpp"
#include "internal/CatchConfig.hpp"
#include "extras/SlaveActivity.hpp"

#include <boost/bind.hpp>
#include <algorithm>
Expand All @@ -69,8 +68,7 @@ namespace RTT
: taskc(owner),
mqueue(new MWSRQueue<DisposableInterface*>(ORONUM_EE_MQUEUE_SIZE) ),
port_queue(new MWSRQueue<PortInterface*>(ORONUM_EE_MQUEUE_SIZE) ),
f_queue( new MWSRQueue<ExecutableInterface*>(ORONUM_EE_MQUEUE_SIZE) ),
mmaster(0)
f_queue( new MWSRQueue<ExecutableInterface*>(ORONUM_EE_MQUEUE_SIZE) )
{
}

Expand Down Expand Up @@ -245,11 +243,6 @@ namespace RTT
if (taskc && taskc->mTaskState == TaskCore::FatalError )
return false;

// forward message to master ExecutionEngine if available
if (mmaster) {
return mmaster->process(c);
}

if ( c && this->getActivity() ) {
bool result = mqueue->enqueue( c );
this->getActivity()->trigger();
Expand All @@ -265,11 +258,6 @@ namespace RTT
if (taskc && taskc->mTaskState == TaskCore::FatalError )
return false;

// forward port callback to the master ExecutionEngine if available
if (mmaster) {
return mmaster->process(port);
}

if ( port && this->getActivity() ) {
bool result = port_queue->enqueue( port );
this->getActivity()->trigger();
Expand All @@ -280,51 +268,19 @@ namespace RTT

void ExecutionEngine::waitForMessages(const boost::function<bool(void)>& pred)
{
// forward the call to the master ExecutionEngine which is processing messages for us...
if (mmaster) {
mmaster->waitForMessages(pred);
return;
}

if (isSelf())
waitAndProcessMessages(pred);
else
waitForMessagesInternal(pred);
}

void ExecutionEngine::setMaster(ExecutionEngine *master)
{
mmaster = master;
}

void ExecutionEngine::setActivity( base::ActivityInterface* task )
{
extras::SlaveActivity *slave_activity = dynamic_cast<extras::SlaveActivity *>(task);
if (slave_activity && slave_activity->getMaster()) {
ExecutionEngine *master = dynamic_cast<ExecutionEngine *>(slave_activity->getMaster()->getRunner());
setMaster(master);
} else {
setMaster(0);
}
RTT::base::RunnableInterface::setActivity(task);
}

os::ThreadInterface* ExecutionEngine::getThread() const {
// forward to the master ExecutionEngine if available
if (mmaster) {
return mmaster->getThread();
}
return base::RunnableInterface::getThread();
}

bool ExecutionEngine::isSelf() const {
os::ThreadInterface *thread = this->getThread();
return (thread && thread->isSelf());
}

void ExecutionEngine::waitForMessagesInternal(boost::function<bool(void)> const& pred)
{
assert( mmaster == 0 );
if ( pred() )
return;
// only to be called from the thread not executing step().
Expand All @@ -337,7 +293,6 @@ namespace RTT

void ExecutionEngine::waitAndProcessMessages(boost::function<bool(void)> const& pred)
{
assert( mmaster == 0 );
// optimization for the case the predicate is already true
if ( pred() )
return;
Expand Down
31 changes: 0 additions & 31 deletions rtt/ExecutionEngine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,30 +164,6 @@ namespace RTT
*/
void setExceptionTask();

/**
* Set the master ExecutionEngine.
* If set, all incoming messages are forwarded to the master.
*
* @param master The new master ExecutionEngine.
*/
void setMaster(ExecutionEngine *master);

/**
* Overridden version of RTT::base::RunnableInterface::setActivity().
* This version will also set the master ExecutionEngine if the new activity is a SlaveActivity that runs an ExecutionEngine.
*
* @param task The ActivityInterface running this interface.
*/
virtual void setActivity( base::ActivityInterface* task );

/**
* Get the thread that processes messages send to this engine.
* @sa reimplementation of base::RunnableInterface::getThread()
*
* @return a pointer to the thread, or 0 if there is no activity assigned.
*/
virtual os::ThreadInterface* getThread() const;

/**
* Check if the thread that processes messages send to this engine is the same as the calling thread.
* This method is typically used to check if operation or function calls can be inlined or even must
Expand Down Expand Up @@ -250,12 +226,6 @@ namespace RTT
os::Mutex msg_lock;
os::Condition msg_cond;

/**
* A master ExecutionEngine which should process our messages.
* This is used for ExecutionEngines running in a SlaveActivity which forward incoming messages to their master engine.
*/
ExecutionEngine *mmaster;

void processMessages();
void processPortCallbacks();
void processFunctions();
Expand All @@ -276,7 +246,6 @@ namespace RTT
virtual void finalize();

virtual bool hasWork();

};

}
Expand Down
48 changes: 33 additions & 15 deletions rtt/extras/SlaveActivity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,53 @@


#include "SlaveActivity.hpp"

#include "../base/DisposableInterface.hpp"
#include "../os/MainThread.hpp"
#include "ExecutionEngine.hpp"
#include "Logger.hpp"

namespace RTT {
using namespace extras;
using namespace base;

class TriggerSlaveActivity : public base::DisposableInterface {
public:
SlaveActivity* mslave;
TriggerSlaveActivity(SlaveActivity* act) : mslave(act) {}
virtual void executeAndDispose() {
base::RunnableInterface *runner = mslave->getRunner();
if (runner) {
runner->work(RunnableInterface::Trigger);
} else {
mslave->work(RunnableInterface::Trigger);
}
}
virtual void dispose() {}
};

SlaveActivity::SlaveActivity( ActivityInterface* master, RunnableInterface* run /*= 0*/ )
:ActivityInterface(run), mmaster(master), mperiod( master->getPeriod() ), running(false), active(false)
:ActivityInterface(run), mmaster(master), mperiod( master->getPeriod() ), running(false), active(false),
mtrigger(new TriggerSlaveActivity(this))
{
}

SlaveActivity::SlaveActivity( double period, RunnableInterface* run /*= 0*/ )
:ActivityInterface(run), mmaster(0), mperiod(period), running(false), active(false)
:ActivityInterface(run), mmaster(0), mperiod(period), running(false), active(false),
mtrigger(new TriggerSlaveActivity(this))
{
}

SlaveActivity::SlaveActivity( RunnableInterface* run /*= 0*/ )
:ActivityInterface(run), mmaster(0), mperiod(0.0), running(false), active(false)
:ActivityInterface(run), mmaster(0), mperiod(0.0), running(false), active(false),
mtrigger(new TriggerSlaveActivity(this))
{
}

SlaveActivity::~SlaveActivity()
{
stop();
delete mtrigger;
}

Seconds SlaveActivity::getPeriod() const
Expand Down Expand Up @@ -94,11 +117,6 @@ namespace RTT {
return mmaster ? mmaster->thread() : os::MainThread::Instance();
}

base::ActivityInterface *SlaveActivity::getMaster() const
{
return mmaster;
}

bool SlaveActivity::initialize()
{
return true;
Expand Down Expand Up @@ -187,16 +205,17 @@ namespace RTT {

bool SlaveActivity::trigger()
{
if (mmaster)
return mmaster->trigger();
return false;
if (!mmaster) { return false; }

ExecutionEngine *engine = dynamic_cast<ExecutionEngine *>(mmaster->getRunner());
if (!engine) { return false; }
return engine->process(mtrigger);
}

bool SlaveActivity::timeout()
{
if (mmaster)
return mmaster->timeout();
return false;
if (!mmaster) { return false; }
return mmaster->timeout();
}

bool SlaveActivity::execute()
Expand Down Expand Up @@ -231,5 +250,4 @@ namespace RTT {
return running;
}


}
5 changes: 3 additions & 2 deletions rtt/extras/SlaveActivity.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ namespace RTT

os::ThreadInterface* thread();

base::ActivityInterface *getMaster() const;

virtual bool initialize();
virtual void step();
virtual void work(base::RunnableInterface::WorkReason reason);
Expand Down Expand Up @@ -157,11 +155,14 @@ namespace RTT
bool trigger();

bool timeout();

private:
base::ActivityInterface* mmaster;
double mperiod;
bool running;
bool active;

base::DisposableInterface *mtrigger;
};

}}
Expand Down

0 comments on commit c1a5e75

Please sign in to comment.