Skip to content

Commit

Permalink
Add options for MQTT QoS and timestamp (#371)
Browse files Browse the repository at this point in the history
  • Loading branch information
aehrisch authored and andig committed Nov 17, 2019
1 parent d18bb07 commit f27adad
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 7 deletions.
2 changes: 2 additions & 0 deletions etc/vzlogger.conf
Expand Up @@ -51,6 +51,8 @@
"pass": "", // optional password for the mqtt server
"retain": false, // optional use retain message flag
"rawAndAgg": false, // optional publish raw values even if agg mode is used
"qos": 0, // optional quality of service, default is 0
"timestamp": false // optional whether to include a timestamp in the payload
},


Expand Down
2 changes: 2 additions & 0 deletions include/mqtt.hpp
Expand Up @@ -45,6 +45,8 @@ class MqttClient
bool _retain = false;
bool _rawAndAgg = false;
std::string _topic;
int _qos = 0;
bool _timestamp = false;

bool _isConnected = false;

Expand Down
46 changes: 39 additions & 7 deletions src/mqtt.cpp
Expand Up @@ -82,6 +82,22 @@ MqttClient::MqttClient(struct json_object *option) : _enabled(false)
_topic = json_object_get_string(local_value);
// todo check for no $ at start and no / at end
}
else if (strcmp(key, "qos") == 0 && local_type == json_type_int)
{
int qos = json_object_get_int(local_value);
if (qos >= 0 && qos <= 2)
{
_qos = qos;
}
else
{
print(log_alert, "Ignoring invalid QoS value %d, assuming default", NULL, qos);
}
}
else if (strcmp(key, "timestamp") == 0 && local_type == json_type_boolean)
{
_timestamp = json_object_get_boolean(local_value);
}
else
{
print(log_alert, "Ignoring invalid field or type: %s=%s", NULL, key,
Expand Down Expand Up @@ -309,9 +325,9 @@ void MqttClient::publish(Channel::Ptr ch, Reading &rds, bool aggregate)
{
std::string name = entry._announceName + v.first;
int res = mosquitto_publish(_mcs, 0,
name.c_str(),
v.second.length(),
v.second.c_str(), 0, _retain);
name.c_str(),
v.second.length(),
v.second.c_str(), _qos, _retain);
if (res != MOSQ_ERR_SUCCESS)
{
print(log_finest, "mosquitto_publish announce %s returned %d", "mqtt", name.c_str(), res);
Expand All @@ -327,15 +343,31 @@ void MqttClient::publish(Channel::Ptr ch, Reading &rds, bool aggregate)
if ((entry._sendAgg and aggregate) or (entry._sendRaw && !aggregate))
{
lock.unlock(); // we can unlock here already
std::string payload = std::to_string(rds.value());
std::string payload;
struct json_object* payload_obj = NULL;

if (_timestamp) {
payload_obj = json_object_new_object();
json_object_object_add(payload_obj, "timestamp", json_object_new_int64(rds.time_ms()));
json_object_object_add(payload_obj, "value", json_object_new_double(rds.value()));
payload = json_object_to_json_string(payload_obj);
}
else {
payload = std::to_string(rds.value());
}

print(log_finest, "publish %s=%s", "mqtt", topic.c_str(), payload.c_str());

int res = mosquitto_publish(_mcs, 0,
topic.c_str(), payload.length(), payload.c_str(), 0, _retain);
int res = mosquitto_publish(_mcs, 0,
topic.c_str(), payload.length(), payload.c_str(), _qos, _retain);
if (res != MOSQ_ERR_SUCCESS)
{
print(log_finest, "mosquitto_publish returned %d", "mqtt", res);
}
if (payload_obj != NULL)
{
json_object_put(payload_obj);
}
}
}

Expand Down Expand Up @@ -398,4 +430,4 @@ void *mqtt_client_thread(void *arg)
void end_mqtt_client_thread()
{
endMqttClientThread = true;
}
}

0 comments on commit f27adad

Please sign in to comment.