Skip to content

Commit

Permalink
Implement RW as YARP periodic threads
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterBowman committed Jan 8, 2020
1 parent cb704cc commit d2d1da5
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 83 deletions.
91 changes: 30 additions & 61 deletions libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,9 @@ void CanReaderWriterThread::afterStart(bool success)

// -----------------------------------------------------------------------------

void CanReaderWriterThread::onStop()
bool CanReaderWriterThread::setPeriod(double period)
{
CD_INFO("Stopping CanBusControlboard %s thread %s.\n", type.c_str(), id.c_str());
}

// -----------------------------------------------------------------------------

void CanReaderWriterThread::setDelay(double delay)
{
this->delay = delay <= 0.0 ? std::numeric_limits<double>::min() : delay;
return yarp::os::PeriodicThread::setPeriod(period <= 0.0 ? std::numeric_limits<double>::min() : period);
}

// -----------------------------------------------------------------------------
Expand All @@ -63,39 +56,31 @@ void CanReaderThread::registerHandle(ICanBusSharer * p)
void CanReaderThread::run()
{
unsigned int read;
bool ok;

while (!isStopping())
{
//-- Lend CPU time to write threads.
// https://github.com/roboticslab-uc3m/yarp-devices/issues/191
yarp::os::Time::delay(delay);
//-- Return immediately if there is nothing to be read (non-blocking call), return false on errors.
bool ok = iCanBus->canRead(canBuffer, bufferSize, &read);

//-- Return immediately if there is nothing to be read (non-blocking call), return false on errors.
ok = iCanBus->canRead(canBuffer, bufferSize, &read);
//-- All debugging messages should be contained in canRead, so just loop again.
if (!ok || read == 0) return;

//-- All debugging messages should be contained in canRead, so just loop again.
if (!ok || read == 0) continue;
for (int i = 0; i < read; i++)
{
const yarp::dev::CanMessage & msg = canBuffer[i];
const int canId = msg.getId() & 0x7F;
auto it = canIdToHandle.find(canId);

for (int i = 0; i < read; i++)
if (it == canIdToHandle.end()) //-- Can ID not found
{
const yarp::dev::CanMessage & msg = canBuffer[i];
const int canId = msg.getId() & 0x7F;
auto it = canIdToHandle.find(canId);

if (it == canIdToHandle.end()) //-- Can ID not found
//-- Intercept 700h 0 msg that just indicates presence.
if (msg.getId() - canId == 0x700)
{
//-- Intercept 700h 0 msg that just indicates presence.
if (msg.getId() - canId == 0x700)
{
CD_SUCCESS("Device %d indicating presence.\n", canId);
}

continue;
CD_SUCCESS("Device %d indicating presence.\n", canId);
}

it->second->interpretMessage(msg);
continue;
}

it->second->interpretMessage(msg);
}
}

Expand All @@ -116,42 +101,26 @@ CanWriterThread::~CanWriterThread()

// -----------------------------------------------------------------------------

void CanWriterThread::flush()
void CanWriterThread::run()
{
unsigned int sent;
std::lock_guard<std::mutex> lock(bufferMutex);

//-- Nothing to write, exit.
//-- Nothing to write, just loop again.
if (preparedMessages == 0) return;

//-- Write as many bytes as it can, return false on errors.
bool ok = iCanBus->canWrite(canBuffer, preparedMessages, &sent);

//-- Something bad happened, try again on the next call.
if (!ok) return;

//-- Some messages could not be sent, preserve them for later.
if (sent != preparedMessages)
{
CD_WARNING("Partial write! Prepared: %d, sent: %d.\n", preparedMessages, sent);
handlePartialWrite(sent);
}

preparedMessages -= sent;
}

// -----------------------------------------------------------------------------
unsigned int sent;

void CanWriterThread::run()
{
while (!isStopping())
//-- Write as many bytes as it can, return false on errors.
if (iCanBus->canWrite(canBuffer, preparedMessages, &sent))
{
//-- Lend CPU time to read threads.
// https://github.com/roboticslab-uc3m/yarp-devices/issues/191
yarp::os::Time::delay(delay);
//-- Some messages could not be sent, preserve them for later.
if (sent != preparedMessages)
{
CD_WARNING("Partial write! Prepared: %d, sent: %d.\n", preparedMessages, sent);
handlePartialWrite(sent);
}

//-- Send everything and reset the queue.
flush();
preparedMessages -= sent;
}
}

Expand Down
34 changes: 16 additions & 18 deletions libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <string>
#include <unordered_map>

#include <yarp/os/Thread.h>
#include <yarp/os/PeriodicThread.h>
#include <yarp/dev/CanBusInterface.h>

#include "CanSenderDelegate.hpp"
Expand All @@ -21,19 +21,29 @@ namespace roboticslab
* @brief Base class for a thread that attends CAN reads or writes.
*
* Child classes take advantage of CAN message buffers to perform bulk reads
* and writes.
* and writes. Non-zero wait periods aim to lend CPU time to other threads, see
* <a href="https://github.com/roboticslab-uc3m/yarp-devices/issues/191">#191</a>.
*/
class CanReaderWriterThread : public yarp::os::Thread
class CanReaderWriterThread : public yarp::os::PeriodicThread
{
public:
//! Constructor.
CanReaderWriterThread(const std::string & type, const std::string & id)
: iCanBus(nullptr), iCanBufferFactory(nullptr), type(type), id(id), bufferSize(0), delay(0.0)
: yarp::os::PeriodicThread(0.0),
iCanBus(nullptr), iCanBufferFactory(nullptr), type(type), id(id), bufferSize(0), delay(0.0)
{ }

//! Virtual destructor.
virtual ~CanReaderWriterThread() = default;

//! Configure CAN interface handles.
virtual void setCanHandles(yarp::dev::ICanBus * iCanBus, yarp::dev::ICanBufferFactory * iCanBufferFactory, unsigned int bufferSize)
{ this->iCanBus = iCanBus; this->iCanBufferFactory = iCanBufferFactory; this->bufferSize = bufferSize; }

//! Configure a delay (in seconds) before each read/write.
bool setPeriod(double period);

protected:
//! Invoked by the thread right before it is started.
virtual bool threadInit() override
{ canBuffer = iCanBufferFactory->createBuffer(bufferSize); return true; }
Expand All @@ -48,20 +58,9 @@ class CanReaderWriterThread : public yarp::os::Thread
//! Invoked by the caller right after the thread is started.
virtual void afterStart(bool success) override;

//! Callback on thread stop.
virtual void onStop() override;

//! The thread will invoke this once.
virtual void run() override = 0;

//! Configure CAN interface handles.
virtual void setCanHandles(yarp::dev::ICanBus * iCanBus, yarp::dev::ICanBufferFactory * iCanBufferFactory, unsigned int bufferSize)
{ this->iCanBus = iCanBus; this->iCanBufferFactory = iCanBufferFactory; this->bufferSize = bufferSize; }

//! Configure a delay (in seconds) before each read/write.
void setDelay(double delay);

protected:
yarp::dev::ICanBus * iCanBus;
yarp::dev::ICanBufferFactory * iCanBufferFactory;
yarp::dev::CanBuffer canBuffer;
Expand Down Expand Up @@ -89,6 +88,7 @@ class CanReaderThread : public CanReaderWriterThread
//! Map CAN node ids with handles.
void registerHandle(ICanBusSharer * p);

protected:
virtual void run() override;

private:
Expand All @@ -114,11 +114,9 @@ class CanWriterThread : public CanReaderWriterThread
//! Retrieve a handle to the CAN sender delegate.
CanSenderDelegate * getDelegate();

//! Send awaiting messages and clear the queue.
void flush();

virtual void setCanHandles(yarp::dev::ICanBus * iCanBus, yarp::dev::ICanBufferFactory * iCanBufferFactory, unsigned int bufferSize) override;

protected:
virtual void run() override;

private:
Expand Down
8 changes: 4 additions & 4 deletions libraries/YarpPlugins/CanBusControlboard/DeviceDriverImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ bool CanBusControlboard::open(yarp::os::Searchable & config)

CanReaderThread * reader = new CanReaderThread(canBus);
reader->setCanHandles(iCanBus, iCanBufferFactory, rxBufferSize);
reader->setDelay(rxDelay);
reader->setPeriod(rxDelay);

CanWriterThread * writer = new CanWriterThread(canBus);
writer->setCanHandles(iCanBus, iCanBufferFactory, txBufferSize);
writer->setDelay(txDelay);
writer->setPeriod(txDelay);

canThreads.push_back({canBus, reader, writer});
}
Expand Down Expand Up @@ -296,14 +296,14 @@ bool CanBusControlboard::close()
{
if (bundle.reader && bundle.reader->isRunning())
{
ok &= bundle.reader->stop();
bundle.reader->stop();
}

delete bundle.reader;

if (bundle.writer && bundle.writer->isRunning())
{
ok &= bundle.writer->stop();
bundle.writer->stop();
}

delete bundle.writer;
Expand Down

0 comments on commit d2d1da5

Please sign in to comment.