diff --git a/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.cpp b/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.cpp index 1002c241e..366cfcffa 100644 --- a/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.cpp +++ b/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.cpp @@ -28,16 +28,9 @@ void CanReaderWriterThread::afterStart(bool success) // ----------------------------------------------------------------------------- -void CanReaderWriterThread::onStop() +bool CanReaderWriterThread::setPeriod(double period) { - CD_INFO("Stopping CanBusControlboard %s thread %s.\n", type.c_str(), id.c_str()); -} - -// ----------------------------------------------------------------------------- - -void CanReaderWriterThread::setDelay(double delay) -{ - this->delay = delay <= 0.0 ? std::numeric_limits::min() : delay; + return yarp::os::PeriodicThread::setPeriod(period <= 0.0 ? std::numeric_limits::min() : period); } // ----------------------------------------------------------------------------- @@ -63,39 +56,31 @@ void CanReaderThread::registerHandle(ICanBusSharer * p) void CanReaderThread::run() { unsigned int read; - bool ok; - while (!isStopping()) - { - //-- Lend CPU time to write threads. - // https://github.com/roboticslab-uc3m/yarp-devices/issues/191 - yarp::os::Time::delay(delay); + //-- Return immediately if there is nothing to be read (non-blocking call), return false on errors. + bool ok = iCanBus->canRead(canBuffer, bufferSize, &read); - //-- Return immediately if there is nothing to be read (non-blocking call), return false on errors. - ok = iCanBus->canRead(canBuffer, bufferSize, &read); + //-- All debugging messages should be contained in canRead, so just loop again. + if (!ok || read == 0) return; - //-- All debugging messages should be contained in canRead, so just loop again. - if (!ok || read == 0) continue; + for (int i = 0; i < read; i++) + { + const yarp::dev::CanMessage & msg = canBuffer[i]; + const int canId = msg.getId() & 0x7F; + auto it = canIdToHandle.find(canId); - for (int i = 0; i < read; i++) + if (it == canIdToHandle.end()) //-- Can ID not found { - const yarp::dev::CanMessage & msg = canBuffer[i]; - const int canId = msg.getId() & 0x7F; - auto it = canIdToHandle.find(canId); - - if (it == canIdToHandle.end()) //-- Can ID not found + //-- Intercept 700h 0 msg that just indicates presence. + if (msg.getId() - canId == 0x700) { - //-- Intercept 700h 0 msg that just indicates presence. - if (msg.getId() - canId == 0x700) - { - CD_SUCCESS("Device %d indicating presence.\n", canId); - } - - continue; + CD_SUCCESS("Device %d indicating presence.\n", canId); } - it->second->interpretMessage(msg); + continue; } + + it->second->interpretMessage(msg); } } @@ -116,42 +101,26 @@ CanWriterThread::~CanWriterThread() // ----------------------------------------------------------------------------- -void CanWriterThread::flush() +void CanWriterThread::run() { - unsigned int sent; std::lock_guard lock(bufferMutex); - //-- Nothing to write, exit. + //-- Nothing to write, just loop again. if (preparedMessages == 0) return; - //-- Write as many bytes as it can, return false on errors. - bool ok = iCanBus->canWrite(canBuffer, preparedMessages, &sent); - - //-- Something bad happened, try again on the next call. - if (!ok) return; - - //-- Some messages could not be sent, preserve them for later. - if (sent != preparedMessages) - { - CD_WARNING("Partial write! Prepared: %d, sent: %d.\n", preparedMessages, sent); - handlePartialWrite(sent); - } - - preparedMessages -= sent; -} - -// ----------------------------------------------------------------------------- + unsigned int sent; -void CanWriterThread::run() -{ - while (!isStopping()) + //-- Write as many bytes as it can, return false on errors. + if (iCanBus->canWrite(canBuffer, preparedMessages, &sent)) { - //-- Lend CPU time to read threads. - // https://github.com/roboticslab-uc3m/yarp-devices/issues/191 - yarp::os::Time::delay(delay); + //-- Some messages could not be sent, preserve them for later. + if (sent != preparedMessages) + { + CD_WARNING("Partial write! Prepared: %d, sent: %d.\n", preparedMessages, sent); + handlePartialWrite(sent); + } - //-- Send everything and reset the queue. - flush(); + preparedMessages -= sent; } } diff --git a/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.hpp b/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.hpp index 8fb910ba2..d60149fb7 100644 --- a/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.hpp +++ b/libraries/YarpPlugins/CanBusControlboard/CanRxTxThreads.hpp @@ -7,7 +7,7 @@ #include #include -#include +#include #include #include "CanSenderDelegate.hpp" @@ -21,19 +21,29 @@ namespace roboticslab * @brief Base class for a thread that attends CAN reads or writes. * * Child classes take advantage of CAN message buffers to perform bulk reads - * and writes. + * and writes. Non-zero wait periods aim to lend CPU time to other threads, see + * #191. */ -class CanReaderWriterThread : public yarp::os::Thread +class CanReaderWriterThread : public yarp::os::PeriodicThread { public: //! Constructor. CanReaderWriterThread(const std::string & type, const std::string & id) - : iCanBus(nullptr), iCanBufferFactory(nullptr), type(type), id(id), bufferSize(0), delay(0.0) + : yarp::os::PeriodicThread(0.0), + iCanBus(nullptr), iCanBufferFactory(nullptr), type(type), id(id), bufferSize(0), delay(0.0) { } //! Virtual destructor. virtual ~CanReaderWriterThread() = default; + //! Configure CAN interface handles. + virtual void setCanHandles(yarp::dev::ICanBus * iCanBus, yarp::dev::ICanBufferFactory * iCanBufferFactory, unsigned int bufferSize) + { this->iCanBus = iCanBus; this->iCanBufferFactory = iCanBufferFactory; this->bufferSize = bufferSize; } + + //! Configure a delay (in seconds) before each read/write. + bool setPeriod(double period); + +protected: //! Invoked by the thread right before it is started. virtual bool threadInit() override { canBuffer = iCanBufferFactory->createBuffer(bufferSize); return true; } @@ -48,20 +58,9 @@ class CanReaderWriterThread : public yarp::os::Thread //! Invoked by the caller right after the thread is started. virtual void afterStart(bool success) override; - //! Callback on thread stop. - virtual void onStop() override; - //! The thread will invoke this once. virtual void run() override = 0; - //! Configure CAN interface handles. - virtual void setCanHandles(yarp::dev::ICanBus * iCanBus, yarp::dev::ICanBufferFactory * iCanBufferFactory, unsigned int bufferSize) - { this->iCanBus = iCanBus; this->iCanBufferFactory = iCanBufferFactory; this->bufferSize = bufferSize; } - - //! Configure a delay (in seconds) before each read/write. - void setDelay(double delay); - -protected: yarp::dev::ICanBus * iCanBus; yarp::dev::ICanBufferFactory * iCanBufferFactory; yarp::dev::CanBuffer canBuffer; @@ -89,6 +88,7 @@ class CanReaderThread : public CanReaderWriterThread //! Map CAN node ids with handles. void registerHandle(ICanBusSharer * p); +protected: virtual void run() override; private: @@ -114,11 +114,9 @@ class CanWriterThread : public CanReaderWriterThread //! Retrieve a handle to the CAN sender delegate. CanSenderDelegate * getDelegate(); - //! Send awaiting messages and clear the queue. - void flush(); - virtual void setCanHandles(yarp::dev::ICanBus * iCanBus, yarp::dev::ICanBufferFactory * iCanBufferFactory, unsigned int bufferSize) override; +protected: virtual void run() override; private: diff --git a/libraries/YarpPlugins/CanBusControlboard/DeviceDriverImpl.cpp b/libraries/YarpPlugins/CanBusControlboard/DeviceDriverImpl.cpp index 704edef9f..e25326604 100644 --- a/libraries/YarpPlugins/CanBusControlboard/DeviceDriverImpl.cpp +++ b/libraries/YarpPlugins/CanBusControlboard/DeviceDriverImpl.cpp @@ -99,11 +99,11 @@ bool CanBusControlboard::open(yarp::os::Searchable & config) CanReaderThread * reader = new CanReaderThread(canBus); reader->setCanHandles(iCanBus, iCanBufferFactory, rxBufferSize); - reader->setDelay(rxDelay); + reader->setPeriod(rxDelay); CanWriterThread * writer = new CanWriterThread(canBus); writer->setCanHandles(iCanBus, iCanBufferFactory, txBufferSize); - writer->setDelay(txDelay); + writer->setPeriod(txDelay); canThreads.push_back({canBus, reader, writer}); } @@ -296,14 +296,14 @@ bool CanBusControlboard::close() { if (bundle.reader && bundle.reader->isRunning()) { - ok &= bundle.reader->stop(); + bundle.reader->stop(); } delete bundle.reader; if (bundle.writer && bundle.writer->isRunning()) { - ok &= bundle.writer->stop(); + bundle.writer->stop(); } delete bundle.writer;