Skip to content

Commit

Permalink
net: lib: mqtt: Enable blocking PUBLISH payload readout
Browse files Browse the repository at this point in the history
It is convenient to have a blocking version of
`mqtt_read_publish_payload` function, for cases when it is called from
the event handler. Therefore, extend the 'mqtt_read_publish_payload'
argument list with information whether the call should block or not.

Signed-off-by: Robert Lubos <robert.lubos@nordicsemi.no>
  • Loading branch information
rlubos authored and nashif committed May 8, 2019
1 parent 6f19d00 commit b8494d9
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 17 deletions.
14 changes: 14 additions & 0 deletions include/net/mqtt.h
Expand Up @@ -698,6 +698,20 @@ int mqtt_input(struct mqtt_client *client);
int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer,
size_t length);

/**
* @brief Blocking version of @ref mqtt_read_publish_payload function.
*
* @param[in] client Client instance for which the procedure is requested.
* Shall not be NULL.
* @param[out] buffer Buffer where payload should be stored.
* @param[in] length Length of the buffer, in bytes.
*
* @return Number of bytes read or a negative error code (errno.h) indicating
* reason of failure.
*/
int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer,
size_t length);

#ifdef __cplusplus
}
#endif
Expand Down
20 changes: 16 additions & 4 deletions subsys/net/lib/mqtt/mqtt.c
Expand Up @@ -596,8 +596,8 @@ int mqtt_input(struct mqtt_client *client)
return err_code;
}

int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer,
size_t length)
static int read_publish_payload(struct mqtt_client *client, void *buffer,
size_t length, bool shall_block)
{
int ret;

Expand All @@ -614,8 +614,8 @@ int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer,
length = client->internal.remaining_payload;
}

ret = mqtt_transport_read(client, buffer, length);
if (ret == -EAGAIN) {
ret = mqtt_transport_read(client, buffer, length, shall_block);
if (!shall_block && ret == -EAGAIN) {
goto exit;
}

Expand All @@ -635,3 +635,15 @@ int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer,

return ret;
}

int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer,
size_t length)
{
return read_publish_payload(client, buffer, length, false);
}

int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer,
size_t length)
{
return read_publish_payload(client, buffer, length, true);
}
2 changes: 1 addition & 1 deletion subsys/net/lib/mqtt/mqtt_rx.c
Expand Up @@ -158,7 +158,7 @@ static int mqtt_read_message_chunk(struct mqtt_client *client,
return -ENOMEM;
}

len = mqtt_transport_read(client, buf->end, remaining);
len = mqtt_transport_read(client, buf->end, remaining, false);
if (len < 0) {
MQTT_TRC("[CID %p]: Transport read error: %d", client, len);
return len;
Expand Down
10 changes: 6 additions & 4 deletions subsys/net/lib/mqtt/mqtt_transport.c
Expand Up @@ -16,7 +16,7 @@ extern int mqtt_client_tcp_connect(struct mqtt_client *client);
extern int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data,
u32_t datalen);
extern int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data,
u32_t buflen);
u32_t buflen, bool shall_block);
extern int mqtt_client_tcp_disconnect(struct mqtt_client *client);

#if defined(CONFIG_MQTT_LIB_TLS)
Expand All @@ -25,7 +25,7 @@ extern int mqtt_client_tls_connect(struct mqtt_client *client);
extern int mqtt_client_tls_write(struct mqtt_client *client, const u8_t *data,
u32_t datalen);
extern int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data,
u32_t buflen);
u32_t buflen, bool shall_block);
extern int mqtt_client_tls_disconnect(struct mqtt_client *client);
#endif /* CONFIG_MQTT_LIB_TLS */

Expand Down Expand Up @@ -72,9 +72,11 @@ int mqtt_transport_write(struct mqtt_client *client, const u8_t *data,
datalen);
}

int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen)
int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen,
bool shall_block)
{
return transport_fn[client->transport.type].read(client, data, buflen);
return transport_fn[client->transport.type].read(client, data, buflen,
shall_block);
}

int mqtt_transport_disconnect(struct mqtt_client *client)
Expand Down
6 changes: 4 additions & 2 deletions subsys/net/lib/mqtt/mqtt_transport.h
Expand Up @@ -27,7 +27,7 @@ typedef int (*transport_write_handler_t)(struct mqtt_client *client,

/**@brief Transport read handler. */
typedef int (*transport_read_handler_t)(struct mqtt_client *client, u8_t *data,
u32_t buflen);
u32_t buflen, bool shall_block);

/**@brief Transport disconnect handler. */
typedef int (*transport_disconnect_handler_t)(struct mqtt_client *client);
Expand Down Expand Up @@ -79,11 +79,13 @@ int mqtt_transport_write(struct mqtt_client *client, const u8_t *data,
* @param[in] client Identifies the client on which the procedure is requested.
* @param[in] data Pointer where read data is to be fetched.
* @param[in] buflen Size of memory provided for the operation.
* @param[in] shall_block Information whether the call should block or not.
*
* @retval Number of bytes read or an error code indicating reason for failure.
* 0 if connection was closed.
*/
int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen);
int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen,
bool shall_block);

/**@brief Handles transport disconnection requests on configured transport.
*
Expand Down
11 changes: 9 additions & 2 deletions subsys/net/lib/mqtt/mqtt_transport_socket_tcp.c
Expand Up @@ -86,15 +86,22 @@ int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data,
* @param[in] client Identifies the client on which the procedure is requested.
* @param[in] data Pointer where read data is to be fetched.
* @param[in] buflen Size of memory provided for the operation.
* @param[in] shall_block Information whether the call should block or not.
*
* @retval Number of bytes read or an error code indicating reason for failure.
* 0 if connection was closed.
*/
int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data, u32_t buflen)
int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data, u32_t buflen,
bool shall_block)
{
int flags = 0;
int ret;

ret = recv(client->transport.tcp.sock, data, buflen, MSG_DONTWAIT);
if (!shall_block) {
flags |= MSG_DONTWAIT;
}

ret = recv(client->transport.tcp.sock, data, buflen, flags);
if (ret < 0) {
return -errno;
}
Expand Down
11 changes: 9 additions & 2 deletions subsys/net/lib/mqtt/mqtt_transport_socket_tls.c
Expand Up @@ -125,15 +125,22 @@ int mqtt_client_tls_write(struct mqtt_client *client, const u8_t *data,
* @param[in] client Identifies the client on which the procedure is requested.
* @param[in] data Pointer where read data is to be fetched.
* @param[in] buflen Size of memory provided for the operation.
* @param[in] shall_block Information whether the call should block or not.
*
* @retval Number of bytes read or an error code indicating reason for failure.
* 0 if connection was closed.
*/
int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data, u32_t buflen)
int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data, u32_t buflen,
bool shall_block)
{
int flags = 0;
int ret;

ret = recv(client->transport.tls.sock, data, buflen, MSG_DONTWAIT);
if (!shall_block) {
flags |= MSG_DONTWAIT;
}

ret = recv(client->transport.tls.sock, data, buflen, flags);
if (ret < 0) {
return -errno;
}
Expand Down
4 changes: 2 additions & 2 deletions tests/net/lib/mqtt_pubsub/src/test_mqtt_pubsub.c
Expand Up @@ -116,8 +116,8 @@ void publish_handler(struct mqtt_client *const client,
}

while (payload_left > 0) {
wait(APP_SLEEP_MSECS);
rc = mqtt_read_publish_payload(client, buf, sizeof(buf));
rc = mqtt_read_publish_payload_blocking(client, buf,
sizeof(buf));
if (rc <= 0) {
TC_PRINT("Failed to receive payload, err: %d\n", -rc);
if (rc == -EAGAIN) {
Expand Down

0 comments on commit b8494d9

Please sign in to comment.