Skip to content

Commit

Permalink
Added cancellation points to meters
Browse files Browse the repository at this point in the history
* Defined cancellation macros in threads.h
* Called macros in loops in meters where it looks safe
  • Loading branch information
maxberger authored and r00t- committed Feb 8, 2023
1 parent 54e2bbc commit d0d27ea
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 19 deletions.
20 changes: 19 additions & 1 deletion include/threads.h
Expand Up @@ -26,9 +26,27 @@
#ifndef _THREADS_H_
#define _THREADS_H_

void logging_thread_cleanup(void *arg);
#include <pthread.h>
#include <unistd.h>

void *logging_thread(void *arg);
void *reading_thread(void *arg);

// vzlogger uses pthread_cancel() to stop threads, which is not safe for C++ code that might invoke
// destructors, this macro is to be placed around any code that your meter spends significant
// amounts of time in, but which may not contain C++ code that might destroy objects.
// See https://blog.memzero.de/pthread-cancel-noexcept/ for details.

#define CANCELLABLE(...) \
do { \
int oldstate; \
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); \
__VA_ARGS__; \
pthread_setcancelstate(oldstate, NULL); \
} while (0)

inline void _safe_to_cancel() { CANCELLABLE(pthread_testcancel()); }

inline void _cancellable_sleep(int seconds) { CANCELLABLE(sleep(seconds)); }

#endif /* _THREADS_H_ */
3 changes: 3 additions & 0 deletions src/protocols/MeterD0.cpp
Expand Up @@ -43,6 +43,8 @@
#include <netdb.h>
#include <sys/socket.h>

#include "threads.h"

#include "protocols/MeterD0.hpp"
#include <VZException.hpp>

Expand Down Expand Up @@ -470,6 +472,7 @@ ssize_t MeterD0::read(std::vector<Reading> &rds, size_t max_readings) {
}

while (1) {
_safe_to_cancel();
// check for timeout
time(&end_time);
if (difftime(end_time, start_time) > _read_timeout_s) {
Expand Down
3 changes: 3 additions & 0 deletions src/protocols/MeterFile.cpp
Expand Up @@ -32,6 +32,8 @@
#include <sys/time.h>
#include <unistd.h>

#include "threads.h"

#include "Options.hpp"
#include "protocols/MeterFile.hpp"
#include <VZException.hpp>
Expand Down Expand Up @@ -237,6 +239,7 @@ ssize_t MeterFile::read(std::vector<Reading> &rds, size_t n) {
print(log_debug, "MeterFile::read: %d, %d", "", rds.size(), n);

while (i < n && fgets(line, 256, _fd)) {
_safe_to_cancel();
char *nl;
if ((nl = strrchr(line, '\n')))
*nl = '\0'; // remove trailing newlines
Expand Down
3 changes: 3 additions & 0 deletions src/protocols/MeterFluksoV2.cpp
Expand Up @@ -29,6 +29,8 @@
#include <sys/types.h>
#include <unistd.h>

#include "threads.h"

#include "Options.hpp"
#include "protocols/MeterFluksoV2.hpp"
#include <VZException.hpp>
Expand Down Expand Up @@ -89,6 +91,7 @@ ssize_t MeterFluksoV2::read(std::vector<Reading> &rds, size_t n) {
time.tv_usec = 0; /* no millisecond resolution available */

while (cursor) {
_safe_to_cancel();
int channel =
atoi(strsep(&cursor, " \t")) + 1; /* increment by 1 to distinguish between +0 and -0 */

Expand Down
5 changes: 3 additions & 2 deletions src/protocols/MeterS0.cpp
Expand Up @@ -33,6 +33,8 @@
#include <time.h>
#include <unistd.h>

#include "threads.h"

#include "Options.hpp"
#include "protocols/MeterS0.hpp"
#include <VZException.hpp>
Expand Down Expand Up @@ -357,8 +359,7 @@ ssize_t MeterS0::read(std::vector<Reading> &rds, size_t n) {
bool is_zero = true;
do {
req.tv_sec += 1;
while (EINTR == clock_nanosleep(CLOCK_REALTIME, TIMER_ABSTIME, &req, NULL))
;
CANCELLABLE(while (EINTR == clock_nanosleep(CLOCK_REALTIME, TIMER_ABSTIME, &req, NULL)));
// check from counter_thread the current impulses:
t_imp = _impulses;
t_imp_neg = _impulses_neg;
Expand Down
8 changes: 5 additions & 3 deletions src/protocols/MeterSML.cpp
Expand Up @@ -43,6 +43,8 @@
#include <netdb.h>
#include <sys/socket.h>

#include "threads.h"

/* sml stuff */
#include <sml/sml_file.h>
#include <sml/sml_transport.h>
Expand Down Expand Up @@ -248,7 +250,7 @@ ssize_t MeterSML::read(std::vector<Reading> &rds, size_t n) {
if (_fd < 0) {
if (!reopen()) {
// sleep a little bit to prevent busy looping
sleep(1);
_cancellable_sleep(1);
return 0;
}
}
Expand All @@ -260,12 +262,12 @@ ssize_t MeterSML::read(std::vector<Reading> &rds, size_t n) {
}

/* wait until we receive a new datagram from the meter (blocking read) */
bytes = sml_transport_read(_fd, buffer, SML_BUFFER_LEN);
CANCELLABLE(bytes = sml_transport_read(_fd, buffer, SML_BUFFER_LEN));

if (0 == bytes) {
// try to reopen. see issue #362
if (reopen()) {
bytes = sml_transport_read(_fd, buffer, SML_BUFFER_LEN);
CANCELLABLE(bytes = sml_transport_read(_fd, buffer, SML_BUFFER_LEN));
print(log_info, "sml_transport_read returned len=%d after reopen", name().c_str(),
bytes);
}
Expand Down
3 changes: 3 additions & 0 deletions src/protocols/MeterW1therm.cpp
Expand Up @@ -12,6 +12,8 @@
* @author Matthias Behr <mbehr (a) mcbehr.de>
* */

#include "threads.h"

#include "protocols/MeterW1therm.hpp"
#include <glob.h>

Expand Down Expand Up @@ -138,6 +140,7 @@ ssize_t MeterW1therm::read(std::vector<Reading> &rds, size_t n) {

for (std::list<std::string>::const_iterator it = list.cbegin();
it != list.cend() && static_cast<size_t>(ret) < n; ++it) {
_safe_to_cancel();
double value;
if (_hwif->readTemp(*it, value)) {
print(log_finest, "reading w1 device %s returned %f", name().c_str(), (*it).c_str(),
Expand Down
13 changes: 0 additions & 13 deletions src/threads.cpp
Expand Up @@ -44,19 +44,6 @@

extern Config_Options options;

inline void _safe_to_cancel() {
// see https://blog.memzero.de/pthread-cancel-noexcept/
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_testcancel();
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
}

inline void _cancellable_sleep(int seconds) {
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
sleep(seconds);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
}

void *reading_thread(void *arg) {
MeterMap *mapping = static_cast<MeterMap *>(arg);
Meter::Ptr mtr = mapping->meter();
Expand Down

0 comments on commit d0d27ea

Please sign in to comment.