Skip to content

Commit

Permalink
Cleanup single event subscriptions after reconnect.
Browse files Browse the repository at this point in the history
  • Loading branch information
Matteo Gmür committed May 23, 2024
1 parent 0601dc6 commit 0cff437
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/Array.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class Array {
/// @brief Removes the element at the given iterator, has to move all element one to the left if the index is not at the end of the array
/// @param iterator Iterator the element should be removed at from the underlying data container
void erase(T const * const iterator) {
size_t const index = Helper::distance(this->cbegin(), iterator);
size_t const index = Helper::distance(cbegin(), iterator);
// Check if the given index is bigger or equal than the actual amount of elements if it is we can not erase that element because it does not exist
if (index < m_size) {
// Move all elements after the index one position to the left
Expand Down
10 changes: 10 additions & 0 deletions src/Espressif_MQTT_Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,21 @@ bool Espressif_MQTT_Client::publish(const char *topic, const uint8_t *payload, c
}

bool Espressif_MQTT_Client::subscribe(const char *topic) {
// The esp_mqtt_client_subscribe method does not return false, if we send a subscribe request while not being connected to a broker,
// so we have to check for that case to ensure the end user is informed that their subscribe request could not be sent and has been ignored.
if (!connected()) {
return false;
}
const int message_id = esp_mqtt_client_subscribe(m_mqtt_client, topic, 0U);
return message_id > MQTT_FAILURE_MESSAGE_ID;
}

bool Espressif_MQTT_Client::unsubscribe(const char *topic) {
// The esp_mqtt_client_unsubscribe method does not return false, if we send a unsubscribe request while not being connected to a broker,
// so we have to check for that case to ensure the end user is informed that their unsubscribe request could not be sent and has been ignored.
if (!connected()) {
return false;
}
const int message_id = esp_mqtt_client_unsubscribe(m_mqtt_client, topic);
return message_id > MQTT_FAILURE_MESSAGE_ID;
}
Expand Down
49 changes: 27 additions & 22 deletions src/ThingsBoard.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ char constexpr COLON PROGMEM = ':';
char constexpr COMMA[] PROGMEM = ",";
char constexpr NO_KEYS_TO_REQUEST[] PROGMEM = "No keys to request were given";
char constexpr RPC_METHOD_NULL[] PROGMEM = "RPC methodName is NULL";
char constexpr SUBSCRIBE_TOPIC_FAILED[] PROGMEM = "Subscribing the given topic failed";
char constexpr SUBSCRIBE_TOPIC_FAILED[] PROGMEM = "Subscribing the given topic (%s) failed";
#if THINGSBOARD_ENABLE_DEBUG
char constexpr NO_RPC_PARAMS_PASSED[] PROGMEM = "No parameters passed with RPC, passing null JSON";
char constexpr NOT_FOUND_ATT_UPDATE[] PROGMEM = "Shared attribute update key not found";
Expand Down Expand Up @@ -161,7 +161,7 @@ char constexpr COLON = ':';
char constexpr COMMA[] = ",";
char constexpr NO_KEYS_TO_REQUEST[] = "No keys to request were given";
char constexpr RPC_METHOD_NULL[] = "RPC methodName is NULL";
char constexpr SUBSCRIBE_TOPIC_FAILED[] = "Subscribing the given topic failed";
char constexpr SUBSCRIBE_TOPIC_FAILED[] = "Subscribing the given topic (%s) failed";
#if THINGSBOARD_ENABLE_DEBUG
char constexpr NO_RPC_PARAMS_PASSED[] = "No parameters passed with RPC, passing null JSON";
char constexpr NOT_FOUND_ATT_UPDATE[] = "Shared attribute update key not found";
Expand Down Expand Up @@ -430,20 +430,20 @@ class ThingsBoardSized {
/// therefore there is no need anymore to discard all previously subscribed callbacks and letting the user resubscribe
void Cleanup_Subscriptions() {
// Cleanup all server-side RPC subscriptions
this->RPC_Unsubscribe();
RPC_Unsubscribe();
// Cleanup all client-side RPC requests
this->RPC_Request_Unsubscribe();
RPC_Request_Unsubscribe();
// Cleanup all shared attributes subscriptions
this->Shared_Attributes_Unsubscribe();
Shared_Attributes_Unsubscribe();
// Cleanup all client-side or shared attributes requests
this->Attributes_Request_Unsubscribe();
Attributes_Request_Unsubscribe();
// Cleanup all provision requests
this->Provision_Unsubscribe();
Provision_Unsubscribe();
// Stop any ongoing Firmware update,
// which will in turn cleanup the internal member variables of the OTAHandler class
// as well as all firmware subscriptions
// and inform the user of the failed firmware update
this->Stop_Firmware_Update();
Stop_Firmware_Update();
}

/// @brief Connects to the specified ThingsBoard server over the given port as the given device.
Expand Down Expand Up @@ -779,7 +779,7 @@ class ThingsBoardSized {
}
#endif // !THINGSBOARD_ENABLE_DYNAMIC
if (!m_client.subscribe(RPC_SUBSCRIBE_TOPIC)) {
Logger::println(SUBSCRIBE_TOPIC_FAILED);
Logger::printfln(SUBSCRIBE_TOPIC_FAILED, RPC_SUBSCRIBE_TOPIC);
return false;
}

Expand All @@ -801,7 +801,7 @@ class ThingsBoardSized {
}
#endif // !THINGSBOARD_ENABLE_DYNAMIC
if (!m_client.subscribe(RPC_SUBSCRIBE_TOPIC)) {
Logger::println(SUBSCRIBE_TOPIC_FAILED);
Logger::printfln(SUBSCRIBE_TOPIC_FAILED, RPC_SUBSCRIBE_TOPIC);
return false;
}

Expand Down Expand Up @@ -999,7 +999,7 @@ class ThingsBoardSized {
}
#endif // !THINGSBOARD_ENABLE_DYNAMIC
if (!m_client.subscribe(ATTRIBUTE_TOPIC)) {
Logger::println(SUBSCRIBE_TOPIC_FAILED);
Logger::printfln(SUBSCRIBE_TOPIC_FAILED, ATTRIBUTE_TOPIC);
return false;
}

Expand All @@ -1025,7 +1025,7 @@ class ThingsBoardSized {
}
#endif // !THINGSBOARD_ENABLE_DYNAMIC
if (!m_client.subscribe(ATTRIBUTE_TOPIC)) {
Logger::println(SUBSCRIBE_TOPIC_FAILED);
Logger::printfln(SUBSCRIBE_TOPIC_FAILED, ATTRIBUTE_TOPIC);
return false;
}

Expand Down Expand Up @@ -1234,7 +1234,7 @@ class ThingsBoardSized {
/// @return Whether requesting the given callback was successful or not
bool Provision_Subscribe(Provision_Callback const & callback) {
if (!m_client.subscribe(PROV_RESPONSE_TOPIC)) {
Logger::println(SUBSCRIBE_TOPIC_FAILED);
Logger::printfln(SUBSCRIBE_TOPIC_FAILED, PROV_RESPONSE_TOPIC);
return false;
}
m_provision_callback = callback;
Expand Down Expand Up @@ -1273,7 +1273,7 @@ class ThingsBoardSized {
/// @return Whether subscribing to the firmware response topic was successful or not
bool Firmware_OTA_Subscribe() {
if (!m_client.subscribe(FIRMWARE_RESPONSE_SUBSCRIBE_TOPIC)) {
Logger::println(SUBSCRIBE_TOPIC_FAILED);
Logger::printfln(SUBSCRIBE_TOPIC_FAILED, FIRMWARE_RESPONSE_SUBSCRIBE_TOPIC);
Firmware_Send_State(FW_STATE_FAILED, SUBSCRIBE_TOPIC_FAILED);
return false;
}
Expand Down Expand Up @@ -1408,15 +1408,20 @@ class ThingsBoardSized {

/// @brief Resubscribes to topics that establish a permanent connection with MQTT, meaning they may receive more than one event over their lifetime,
/// whereas other events that are only ever called once and then deleted after they have been handled are not resubscribed.
/// This is done, because the chance of disconnecting the moment when a request event (provisioning, attribute request, client-side rpc) was sent
/// and then reconnecting and resubscribing to that topic fast enough to still receive the message is not feasible
/// Only the topics that establish a permanent connection are resubscribed, because all not yet received data is discard on the MQTT broker,
// once we establish a connection again. This is the case because we connect with the cleanSession attribute set to true.
// Therefore we can also clear the buffer of all non-permanent topics.
void Resubscribe_Topics() {
if (!m_rpc_callbacks.empty()) {
m_client.subscribe(RPC_SUBSCRIBE_TOPIC);
if (!m_rpc_callbacks.empty() && !m_client.subscribe(RPC_SUBSCRIBE_TOPIC)) {
Logger::printfln(SUBSCRIBE_TOPIC_FAILED, RPC_SUBSCRIBE_TOPIC);
}
if (!m_shared_attribute_update_callbacks.empty()) {
m_client.subscribe(ATTRIBUTE_TOPIC);
if (!m_shared_attribute_update_callbacks.empty() && !m_client.subscribe(ATTRIBUTE_TOPIC)) {
Logger::printfln(SUBSCRIBE_TOPIC_FAILED, ATTRIBUTE_TOPIC);
}
// Clean up any not yet answered single event subscriptions
RPC_Request_Unsubscribe();
Attributes_Request_Unsubscribe();
Provision_Unsubscribe();
}

/// @brief Subscribes to the client-side RPC response topic
Expand All @@ -1431,7 +1436,7 @@ class ThingsBoardSized {
}
#endif // !THINGSBOARD_ENABLE_DYNAMIC
if (!m_client.subscribe(RPC_RESPONSE_SUBSCRIBE_TOPIC)) {
Logger::println(SUBSCRIBE_TOPIC_FAILED);
Logger::printfln(SUBSCRIBE_TOPIC_FAILED, RPC_RESPONSE_SUBSCRIBE_TOPIC);
return false;
}

Expand Down Expand Up @@ -1465,7 +1470,7 @@ class ThingsBoardSized {
}
#endif // !THINGSBOARD_ENABLE_DYNAMIC
if (!m_client.subscribe(ATTRIBUTE_RESPONSE_SUBSCRIBE_TOPIC)) {
Logger::println(SUBSCRIBE_TOPIC_FAILED);
Logger::printfln(SUBSCRIBE_TOPIC_FAILED, ATTRIBUTE_RESPONSE_SUBSCRIBE_TOPIC);
return false;
}

Expand Down
2 changes: 1 addition & 1 deletion src/Vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class Vector {
/// @brief Removes the element at the given iterator, has to move all element one to the left if the index is not at the end of the array
/// @param iterator Iterator the element should be removed at from the underlying data container
void erase(T const * const iterator) {
size_t const index = Helper::distance(this->cbegin(), iterator);
size_t const index = Helper::distance(cbegin(), iterator);
// Check if the given index is bigger or equal than the actual amount of elements if it is we can not erase that element because it does not exist
if (index < m_size) {
// Move all elements after the index one position to the left
Expand Down

0 comments on commit 0cff437

Please sign in to comment.