Skip to content

Commit

Permalink
[issue #869] Added 'LWT' to last will topic and improved CPU load (#883)
Browse files Browse the repository at this point in the history
* [issue #869] Added 'LWT' to last will topic and improved CPU load

See #869 for discussion on Last Will Topic.
Also changed the way it tried to reconnect to make it return a lot faster when connection is not (yet) possible and call the PubSubClient::loop() at a much slower pace to reduce CPU usage. (See #847)
This higher CPU load was probably introduced when fixing #683.

* [MQTT] Fix error reporting success status with longer payloads

Applied PR https://github.com/knolleary/pubsubclient/pull/360/files

* made MQTT_CALLBACK_SIGNATURE for esp32 functional

Applied PR knolleary/pubsubclient#336
  • Loading branch information
TD-er authored and psy0rz committed Feb 16, 2018
1 parent 0e0e7be commit a93a078
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 49 deletions.
2 changes: 1 addition & 1 deletion lib/pubsubclient/src/PubSubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig

lastOutActivity = millis();

return rc == tlen + 4 + plength;
return rc == tlen + 3 + llen + plength;
}

boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
Expand Down
2 changes: 1 addition & 1 deletion lib/pubsubclient/src/PubSubClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
#define MQTTQOS1 (1 << 1)
#define MQTTQOS2 (2 << 1)

#ifdef ESP8266
#if defined(ESP8266) || defined(ESP32)
#include <functional>
#define MQTT_CALLBACK_SIGNATURE std::function<void(char*, uint8_t*, unsigned int)> callback
#else
Expand Down
81 changes: 39 additions & 42 deletions src/Controller.ino
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ void callback(char* c_topic, byte* b_payload, unsigned int length) {
/*********************************************************************************************\
* Connect to MQTT message broker
\*********************************************************************************************/
void MQTTConnect(int controller_idx)
bool MQTTConnect(int controller_idx)
{
if (!WiFiConnected(100)) return;
if (!WiFiConnected(100)) return false;
if (MQTTclient.connected())
MQTTclient.disconnect();
ControllerSettingsStruct ControllerSettings;
Expand All @@ -137,73 +137,70 @@ void MQTTConnect(int controller_idx)
// MQTT needs a unique clientname to subscribe to broker
String clientid = F("ESPClient_");
clientid += WiFi.macAddress();
String subscribeTo = "";

String LWTTopic = ControllerSettings.Subscribe;
LWTTopic.replace(F("/#"), F("/status"));
LWTTopic.replace(F("%sysname%"), Settings.Name);
LWTTopic += F("/LWT"); // Extend the topic for status updates of connected/disconnected status.

for (byte x = 1; x < 3; x++)
{
String log = "";
boolean MQTTresult = false;

if ((SecuritySettings.ControllerUser[controller_idx] != 0) && (SecuritySettings.ControllerPassword[controller_idx] != 0))
MQTTresult = MQTTclient.connect(clientid.c_str(), SecuritySettings.ControllerUser[controller_idx], SecuritySettings.ControllerPassword[controller_idx], LWTTopic.c_str(), 0, 0, "Connection Lost");
else
MQTTresult = MQTTclient.connect(clientid.c_str(), LWTTopic.c_str(), 0, 0, "Connection Lost");
yield();
boolean MQTTresult = false;
uint8_t willQos = 0;
boolean willRetain = true;

if (MQTTresult)
{
MQTTclient_should_reconnect = false;
log = F("MQTT : Connected to broker with client ID: ");
log += clientid;
addLog(LOG_LEVEL_INFO, log);
subscribeTo = ControllerSettings.Subscribe;
subscribeTo.replace(F("%sysname%"), Settings.Name);
MQTTclient.subscribe(subscribeTo.c_str());
log = F("Subscribed to: ");
log += subscribeTo;
addLog(LOG_LEVEL_INFO, log);

MQTTclient.publish(LWTTopic.c_str(), "Connected");

statusLED(true);
return; // end loop if succesfull
}
else
{
log = F("MQTT : Failed to connect to broker");
addLog(LOG_LEVEL_ERROR, log);
}
if ((SecuritySettings.ControllerUser[controller_idx] != 0) && (SecuritySettings.ControllerPassword[controller_idx] != 0)) {
MQTTresult = MQTTclient.connect(clientid.c_str(), SecuritySettings.ControllerUser[controller_idx], SecuritySettings.ControllerPassword[controller_idx],
LWTTopic.c_str(), willQos, willRetain, "Connection Lost");
} else {
MQTTresult = MQTTclient.connect(clientid.c_str(), LWTTopic.c_str(), willQos, willRetain, "Connection Lost");
}
yield();

delay(500);
if (!MQTTresult) {
addLog(LOG_LEVEL_ERROR, F("MQTT : Failed to connect to broker"));
return false;
}
MQTTclient_should_reconnect = false;
String log = F("MQTT : Connected to broker with client ID: ");
log += clientid;
addLog(LOG_LEVEL_INFO, log);
String subscribeTo = ControllerSettings.Subscribe;
subscribeTo.replace(F("%sysname%"), Settings.Name);
MQTTclient.subscribe(subscribeTo.c_str());
log = F("Subscribed to: ");
log += subscribeTo;
addLog(LOG_LEVEL_INFO, log);

if (MQTTclient.publish(LWTTopic.c_str(), "Connected")) {
statusLED(true);
return true; // end loop if succesfull
}
return false;
}


/*********************************************************************************************\
* Check connection MQTT message broker
\*********************************************************************************************/
void MQTTCheck(int controller_idx)
bool MQTTCheck(int controller_idx)
{
byte ProtocolIndex = getProtocolIndex(Settings.Protocol[controller_idx]);
if (Protocol[ProtocolIndex].usesMQTT)
{
if (MQTTclient_should_reconnect || !WiFiConnected(100) || !MQTTclient.connected())
if (MQTTclient_should_reconnect || !WiFiConnected(10) || !MQTTclient.connected())
{
if (MQTTclient_should_reconnect) {
addLog(LOG_LEVEL_ERROR, F("MQTT : Intentional reconnect"));
} else {
addLog(LOG_LEVEL_ERROR, F("MQTT : Connection lost"));
connectionFailures += 2;
}
MQTTConnect(controller_idx);
}
else if (connectionFailures)
return MQTTConnect(controller_idx);
} else if (connectionFailures) {
connectionFailures--;
}
}
// When no MQTT protocol is enabled, all is fine.
return true;
}


Expand Down
23 changes: 18 additions & 5 deletions src/ESPEasy.ino
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ unsigned long timer100ms;
unsigned long timer20ms;
unsigned long timer1s;
unsigned long timerwd;
unsigned long timermqtt;
unsigned long lastSend;
unsigned int NC_Count = 0;
unsigned int C_Count = 0;
Expand Down Expand Up @@ -1104,6 +1105,7 @@ void setup()
timer100ms = 0; // timer for periodic actions 10 x per/sec
timer1s = 0; // timer for periodic actions once per/sec
timerwd = 0; // timer for watchdog once per 30 sec
timermqtt = 0; // Timer for the MQTT keep alive loop.

PluginInit();
CPluginInit();
Expand All @@ -1119,12 +1121,14 @@ void setup()
if (Settings.UDPPort != 0)
portUDP.begin(Settings.UDPPort);

/*
// Setup MQTT Client
// ToDo TD-er: Controller index is forced to the first enabled MQTT controller.
int enabledMqttController = firstEnabledMQTTController();
if (enabledMqttController >= 0) {
MQTTConnect(enabledMqttController);
}
*/

sendSysInfoUDP(3);

Expand Down Expand Up @@ -1206,12 +1210,21 @@ void loop()

if (timeOutReached(timer1s))
runOncePerSecond();
}

//dont do this in backgroundtasks(), otherwise causes crashes. (https://github.com/letscontrolit/ESPEasy/issues/683)
int enabledMqttController = firstEnabledMQTTController();
if (enabledMqttController >= 0) {
MQTTclient.loop();
if (timeOutReached(timermqtt)) {
// MQTT_KEEPALIVE = 15 seconds.
timermqtt = millis() + 250;
//dont do this in backgroundtasks(), otherwise causes crashes. (https://github.com/letscontrolit/ESPEasy/issues/683)
int enabledMqttController = firstEnabledMQTTController();
if (enabledMqttController >= 0) {
if (!MQTTclient.loop()) {
if (!MQTTCheck(enabledMqttController)) {
// Check failed, no need to retry it immediately.
timermqtt = millis() + 500;
}
}
}
}
}

backgroundtasks();
Expand Down

6 comments on commit a93a078

@Superpiffer
Copy link

@Superpiffer Superpiffer commented on a93a078 Feb 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LWT message are correctly sended after a broker restart but they haven't the retain flag enabled, even if I enable the option "MQTT Retain Msg" in Tools -> Advanced:

1519297091: Received PUBLISH from ESPClient_XX:XX:XX:XX:XX (d0, q0, r0, m0, '/esp_bedroom/status/LWT', ... (9 bytes))
1519297091: Sending PUBLISH to ESPClient_XX:XX:XX:XX:XX (d0, q0, r0, m0, '/esp_bedroom/status/LWT', ... (9 bytes))

Edit: it works for "Lost Connection" message, but not with "Connected" message:

Sending PUBLISH to ESPClient_XX:XX:XX:XX:XX:XX (d0, q0, r1, m0, '/esp_bedroom/status/LWT', ... (15 bytes))

@TD-er
Copy link
Member Author

@TD-er TD-er commented on a93a078 Feb 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it something like this issue?
If I'm correct, the retained flag is only between publisher and broker. Not between broker en subscriber.

And I override the 'retained' flag (set to true) for connect and last will message, since those are status flags which I think should be retained.

@Superpiffer
Copy link

@Superpiffer Superpiffer commented on a93a078 Feb 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, like this pull request. I meant that I received this line:

Received PUBLISH from ESPClient_XX:XX:XX:XX:XX:XX (d0, q0, r0, m0, '/esp_bedroom/status/LWT', ... (9 bytes))

with r0 when the message is "Connected" (9 bytes) and r1 when the message is "Lost Connection" (15 bytes).
I think that the line 173 has to be:

if (MQTTclient.publish(LWTTopic.c_str(), "Connected", 1))

Right now, if I subscribe to the topic after a broker restart, I receive nothing or "Lost Connection" but never "Connected".

@TD-er
Copy link
Member Author

@TD-er TD-er commented on a93a078 Feb 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I now get it.

Will look into it. Not sure if I have time for it this afternoon.

@TD-er
Copy link
Member Author

@TD-er TD-er commented on a93a078 Feb 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a PR (#925 ) for this.
Can you test it, when released?

@Superpiffer
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, no problem. For the time being I can easily access my setup, but at some point I have to install it in a rollershutter box so I'm looking for a stable enough firmware or at the possibility to upgrade via OTA (#303)
Thanks for the PR, I would have done myself if I had the chance to test it at least.

Please sign in to comment.