Skip to content

Commit

Permalink
Added MQTT_QueuePublishWithCommand
Browse files Browse the repository at this point in the history
  • Loading branch information
iprak committed Oct 20, 2022
1 parent 05befd5 commit 23b5d04
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
27 changes: 22 additions & 5 deletions src/mqtt/new_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -933,9 +933,9 @@ void MQTT_init()

mqtt_initialised = 1;

CMD_RegisterCommand("publish", "", MQTT_PublishCommand, "Sqqq", NULL);
CMD_RegisterCommand("publishAll", "", MQTT_PublishAll, "Sqqq", NULL);
CMD_RegisterCommand("publishChannels", "", MQTT_PublishChannels, "Sqqq", NULL);
CMD_RegisterCommand(MQTT_COMMAND_PUBLISH, "", MQTT_PublishCommand, "Sqqq", NULL);
CMD_RegisterCommand(MQTT_COMMAND_PUBLISH_ALL, "", MQTT_PublishAll, "Sqqq", NULL);
CMD_RegisterCommand(MQTT_COMMAND_PUBLISH_CHANNELS, "", MQTT_PublishChannels, "Sqqq", NULL);
}

OBK_Publish_Result MQTT_DoItemPublishString(const char* sChannel, const char* valueStr)
Expand Down Expand Up @@ -1182,12 +1182,13 @@ MqttPublishItem_t* find_queue_reusable_item(MqttPublishItem_t* head) {
return head;
}

/// @brief Queue an entry for publish.
/// @brief Queue an entry for publish and execute a command after the publish.
/// @param topic
/// @param channel
/// @param value
/// @param flags
void MQTT_QueuePublish(char* topic, char* channel, char* value, int flags) {
/// @param command Command to execute after the publish
void MQTT_QueuePublishWithCommand(char* topic, char* channel, char* value, int flags, const char* command) {
if (g_MqttPublishItemsQueued >= MQTT_MAX_QUEUE_SIZE) {
addLogAdv(LOG_ERROR, LOG_FEATURE_MQTT, "Unable to queue! %i items already present\r\n", g_MqttPublishItemsQueued);
return;
Expand Down Expand Up @@ -1223,19 +1224,31 @@ void MQTT_QueuePublish(char* topic, char* channel, char* value, int flags) {
os_strcpy(newItem->topic, topic);
os_strcpy(newItem->channel, channel);
os_strcpy(newItem->value, value);
newItem->command = command;
newItem->flags = flags;

g_MqttPublishItemsQueued++;
addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "Queued topic=%s/%s %i, items queued", newItem->topic, newItem->channel, g_MqttPublishItemsQueued);
}

/// @brief Queue an entry for publish.
/// @param topic
/// @param channel
/// @param value
/// @param flags
void MQTT_QueuePublish(char* topic, char* channel, char* value, int flags) {
MQTT_QueuePublishWithCommand(topic, channel, value, flags, NULL);
}


/// @brief Publish MQTT_QUEUED_ITEMS_PUBLISHED_AT_ONCE queued items.
/// @return
OBK_Publish_Result PublishQueuedItems() {
OBK_Publish_Result result = OBK_PUBLISH_WAS_NOT_REQUIRED;

int count = 0;
MqttPublishItem_t* head = g_MqttPublishQueueHead;
char* command;

//The next actionable item might not be at the front. The queue size is limited to MQTT_QUEUED_ITEMS_PUBLISHED_AT_ONCE
//so this traversal is fast.
Expand All @@ -1249,6 +1262,10 @@ OBK_Publish_Result PublishQueuedItems() {

//Stop if last publish failed
if (result != OBK_PUBLISH_OK) break;

if (head->command != NULL) {
CMD_ExecuteCommand(head->command, COMMAND_FLAG_SOURCE_MQTT);
}
}
else {
//addLogAdv(LOG_INFO,LOG_FEATURE_MQTT,"PublishQueuedItems item skipped reusable");
Expand Down
7 changes: 7 additions & 0 deletions src/mqtt/new_mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,14 @@ typedef struct MqttPublishItem
char value[MQTT_PUBLISH_ITEM_VALUE_LENGTH];
int flags;
struct MqttPublishItem* next;
const char* command;
} MqttPublishItem_t;

#define MQTT_COMMAND_PUBLISH "publish"
#define MQTT_COMMAND_PUBLISH_ALL "publishAll"
#define MQTT_COMMAND_PUBLISH_CHANNELS "publishChannels"


// Count of queued items published at once.
#define MQTT_QUEUED_ITEMS_PUBLISHED_AT_ONCE 3
#define MQTT_MAX_QUEUE_SIZE 7
Expand Down Expand Up @@ -82,6 +88,7 @@ OBK_Publish_Result MQTT_PublishMain_StringString(const char* sChannel, const cha
OBK_Publish_Result MQTT_ChannelChangeCallback(int channel, int iVal);
void MQTT_PublishOnlyDeviceChannelsIfPossible();
void MQTT_QueuePublish(char* topic, char* channel, char* value, int flags);
void MQTT_QueuePublishWithCommand(char* topic, char* channel, char* value, int flags, const char* command);
OBK_Publish_Result MQTT_Publish(char* sTopic, char* sChannel, char* value, int flags);
bool MQTT_IsReady();

Expand Down

0 comments on commit 23b5d04

Please sign in to comment.