Skip to content
Permalink
Browse files

net: mqtt: add mqtt_readall_publish_payload()

This function uses mqtt_read_publish_payload_blocking to perform a
blocking read of the specified number of bytes.

When reading out a payload, the normal use case is to read the
entire payload. This function facilitates that use case.

Signed-off-by: Håkon Øye Amundsen <haakon.amundsen@nordicsemi.no>
  • Loading branch information...
hakonfam authored and jukkar committed Jun 18, 2019
1 parent e1f0b61 commit 05cd3420ac0c68a57d6368bc3888ffdfa5eb3353
Showing with 48 additions and 19 deletions.
  1. +15 −0 include/net/mqtt.h
  2. +22 −0 subsys/net/lib/mqtt/mqtt.c
  3. +11 −19 tests/net/lib/mqtt_pubsub/src/test_mqtt_pubsub.c
@@ -715,6 +715,21 @@ int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer,
int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer,
size_t length);

/**
* @brief Blocking version of @ref mqtt_read_publish_payload function which
* runs until the required number of bytes are read.
*
* @param[in] client Client instance for which the procedure is requested.
* Shall not be NULL.
* @param[out] buffer Buffer where payload should be stored.
* @param[in] length Number of bytes to read.
*
* @return 0 if success, otherwise a negative error code (errno.h) indicating
* reason of failure.
*/
int mqtt_readall_publish_payload(struct mqtt_client *client, u8_t *buffer,
size_t length);

#ifdef __cplusplus
}
#endif
@@ -647,3 +647,25 @@ int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer,
{
return read_publish_payload(client, buffer, length, true);
}

int mqtt_readall_publish_payload(struct mqtt_client *client, u8_t *buffer,
size_t length)
{
u8_t *end = buffer + length;

while (buffer < end) {
int ret = mqtt_read_publish_payload_blocking(client, buffer,
end - buffer);

if (ret < 0) {
return ret;
} else if (ret == 0) {
return -EIO;
}

buffer += ret;
}

return 0;
}

@@ -101,8 +101,7 @@ void publish_handler(struct mqtt_client *const client,
const struct mqtt_evt *evt)
{
int rc;
u8_t buf[16];
u32_t offset = 0U;
static u8_t buf[sizeof(payload_long)];

if (evt->result != 0) {
TC_PRINT("MQTT PUBLISH error: %d\n", evt->result);
@@ -115,26 +114,19 @@ void publish_handler(struct mqtt_client *const client,
goto error;
}

while (payload_left > 0) {
rc = mqtt_read_publish_payload_blocking(client, buf,
sizeof(buf));
if (rc <= 0) {
TC_PRINT("Failed to receive payload, err: %d\n", -rc);
if (rc == -EAGAIN) {
continue;
}
goto error;
}

if (memcmp(payload + offset, buf, rc) != 0) {
TC_PRINT("Invalid payload content\n");
goto error;
}
rc = mqtt_readall_publish_payload(client, buf, payload_left);
if (rc != 0) {
TC_PRINT("Error while reading publish payload\n");
goto error;
}

payload_left -= rc;
offset += rc;
if (memcmp(payload, buf, evt->param.publish.message.payload.len != 0)) {
TC_PRINT("Invalid payload content\n");
goto error;
}

payload_left = 0;

return;

error:

0 comments on commit 05cd342

Please sign in to comment.
You can’t perform that action at this time.