Skip to content

Commit

Permalink
track changes only per data sink (#1226)
Browse files Browse the repository at this point in the history
  • Loading branch information
john30 committed Apr 10, 2024
1 parent d9628fa commit c70d8ff
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 25 deletions.
5 changes: 4 additions & 1 deletion src/ebusd/datahandler.cpp
Expand Up @@ -75,8 +75,11 @@ bool datahandler_register(UserInfo* userInfo, BusHandler* busHandler, MessageMap
return success;
}

void DataSink::notifyUpdate(Message* message) {
void DataSink::notifyUpdate(Message* message, bool changed) {
if (message && message->hasLevel(m_levels)) {
if (m_changedOnly && !changed) {
return;
}
m_updatedMessages[message->getKey()]++;
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/ebusd/datahandler.h
Expand Up @@ -143,8 +143,9 @@ class DataSink : virtual public DataHandler {
* Constructor.
* @param userInfo the @a UserInfo instance.
* @param user the user name for determining the allowed access levels (fall back to default levels).
* @param changedOnly whether to handle changed messages only in the updates.
*/
DataSink(const UserInfo* userInfo, const string& user) {
DataSink(const UserInfo* userInfo, const string& user, bool changedOnly) : m_changedOnly(changedOnly) {
m_levels = userInfo->getLevels(userInfo->hasUser(user) ? user : "");
}

Expand All @@ -159,8 +160,9 @@ class DataSink : virtual public DataHandler {
/**
* Notify the sink of an updated @a Message (not necessarily changed though).
* @param message the updated @a Message.
* @param changed whether the message data changed since the last notification.
*/
virtual void notifyUpdate(Message* message);
virtual void notifyUpdate(Message* message, bool changed);

/**
* Notify the sink of the latest update check result.
Expand All @@ -178,6 +180,9 @@ class DataSink : virtual public DataHandler {
/** the allowed access levels. */
string m_levels;

/** whether to handle changed messages only in the updates. */
bool m_changedOnly;

/** a map of updated @p Message keys. */
map<uint64_t, int> m_updatedMessages;
};
Expand Down
14 changes: 2 additions & 12 deletions src/ebusd/knxhandler.cpp
Expand Up @@ -163,7 +163,7 @@ bool knxhandler_register(UserInfo* userInfo, BusHandler* busHandler, MessageMap*
}

KnxHandler::KnxHandler(UserInfo* userInfo, BusHandler* busHandler, MessageMap* messages)
: DataSink(userInfo, "knx"), DataSource(busHandler), WaitThread(), m_messages(messages),
: DataSink(userInfo, "knx", true), DataSource(busHandler), WaitThread(), m_messages(messages),
m_start(0), m_lastUpdateCheckResult("."),
m_lastScanStatus(SCAN_STATUS_NONE), m_scanFinishReceived(false), m_lastErrorLogTime(0) {
m_con = KnxConnection::create(g_url);
Expand Down Expand Up @@ -709,7 +709,7 @@ void KnxHandler::handleGroupTelegram(knx_addr_t src, knx_addr_t dest, int len, c
#define UPTIME_INTERVAL 3600

void KnxHandler::run() {
time_t lastTaskRun, now, lastSignal = 0, lastUptime = 0, lastUpdates = 0;
time_t lastTaskRun, now, lastSignal = 0, lastUptime = 0;
bool signal = false;
result_t result = RESULT_OK;
time(&now);
Expand Down Expand Up @@ -899,7 +899,6 @@ void KnxHandler::run() {
if (!m_updatedMessages.empty()) {
m_messages->lock();
if (m_con->isConnected()) {
time_t maxUpdates = 0;
for (auto it = m_updatedMessages.begin(); it != m_updatedMessages.end(); ) {
const vector<Message*>* messages = m_messages->getByKey(it->first);
if (!messages) {
Expand All @@ -911,18 +910,10 @@ void KnxHandler::run() {
if (changeTime <= 0) {
continue;
}
if (changeTime > lastUpdates && changeTime > maxUpdates) {
maxUpdates = changeTime;
}
const auto mit = m_subscribedMessages.find(message->getKey());
if (mit == m_subscribedMessages.cend()) {
continue;
}
if (!(message->getDataHandlerState()&2)) {
message->setDataHandlerState(2, true); // first update still needed
} else if (changeTime <= lastUpdates) {
continue;
}
for (auto destFlags : mit->second) {
auto sit = m_subscribedGroups.find(destFlags);
if (sit == m_subscribedGroups.end()) {
Expand All @@ -941,7 +932,6 @@ void KnxHandler::run() {
}
it = m_updatedMessages.erase(it);
}
lastUpdates = maxUpdates == 0 || lastUpdates > maxUpdates ? now : maxUpdates + 1;
} else {
m_updatedMessages.clear();
}
Expand Down
3 changes: 2 additions & 1 deletion src/ebusd/mainloop.cpp
Expand Up @@ -352,8 +352,9 @@ void MainLoop::run() {
m_messages->lock();
m_messages->findAll("", "", "*", false, true, true, true, true, true, sinkSince, now, false, &messages);
for (const auto message : messages) {
bool changed = message->getLastChangeTime() >= sinkSince;
for (const auto dataSink : dataSinks) {
dataSink->notifyUpdate(message);
dataSink->notifyUpdate(message, changed);
}
}
m_messages->unlock();
Expand Down
13 changes: 4 additions & 9 deletions src/ebusd/mqtthandler.cpp
Expand Up @@ -368,7 +368,8 @@ string removeTrailingNonTopicPart(const string& str) {
}

MqttHandler::MqttHandler(UserInfo* userInfo, BusHandler* busHandler, MessageMap* messages)
: DataSink(userInfo, "mqtt"), DataSource(busHandler), WaitThread(), m_messages(messages), m_connected(false),
: DataSink(userInfo, "mqtt", g_onlyChanges), DataSource(busHandler), WaitThread(),
m_messages(messages), m_connected(false),
m_lastUpdateCheckResult("."), m_lastScanStatus(SCAN_STATUS_NONE) {
m_definitionsSince = 0;
m_client = nullptr;
Expand Down Expand Up @@ -696,7 +697,7 @@ void splitFields(const string& str, vector<string>* row) {
}

void MqttHandler::run() {
time_t lastTaskRun, now, start, lastSignal = 0, lastUpdates = 0;
time_t lastTaskRun, now, start, lastSignal = 0;
bool signal = false;
bool globalHasName = m_globalTopic.has("name");
string signalTopic = m_globalTopic.get("", "signal");
Expand Down Expand Up @@ -1068,27 +1069,21 @@ void MqttHandler::run() {
if (!m_updatedMessages.empty()) {
m_messages->lock();
if (m_connected) {
time_t maxUpdates = 0;
for (auto it = m_updatedMessages.begin(); it != m_updatedMessages.end(); ) {
const vector<Message*>* messages = m_messages->getByKey(it->first);
if (messages) {
for (const auto& message : *messages) {
time_t changeTime = message->getLastChangeTime();
if (changeTime > 0 && message->isAvailable()
&& (!g_onlyChanges || changeTime > lastUpdates)) {
if (changeTime > 0 && message->isAvailable()) {
updates.str("");
updates.clear();
updates << dec;
publishMessage(message, &updates);
}
if (changeTime > lastUpdates && changeTime > maxUpdates) {
maxUpdates = changeTime;
}
}
}
it = m_updatedMessages.erase(it);
}
lastUpdates = maxUpdates == 0 || lastUpdates > maxUpdates ? now : maxUpdates + 1;
} else {
m_updatedMessages.clear();
}
Expand Down

0 comments on commit c70d8ff

Please sign in to comment.