From b8494d9a51ca1113442823f23e1f479adc83c4a4 Mon Sep 17 00:00:00 2001 From: Robert Lubos Date: Tue, 5 Feb 2019 12:52:40 +0100 Subject: [PATCH] net: lib: mqtt: Enable blocking PUBLISH payload readout It is convenient to have a blocking version of `mqtt_read_publish_payload` function, for cases when it is called from the event handler. Therefore, extend the 'mqtt_read_publish_payload' argument list with information whether the call should block or not. Signed-off-by: Robert Lubos --- include/net/mqtt.h | 14 +++++++++++++ subsys/net/lib/mqtt/mqtt.c | 20 +++++++++++++++---- subsys/net/lib/mqtt/mqtt_rx.c | 2 +- subsys/net/lib/mqtt/mqtt_transport.c | 10 ++++++---- subsys/net/lib/mqtt/mqtt_transport.h | 6 ++++-- .../net/lib/mqtt/mqtt_transport_socket_tcp.c | 11 ++++++++-- .../net/lib/mqtt/mqtt_transport_socket_tls.c | 11 ++++++++-- .../lib/mqtt_pubsub/src/test_mqtt_pubsub.c | 4 ++-- 8 files changed, 61 insertions(+), 17 deletions(-) diff --git a/include/net/mqtt.h b/include/net/mqtt.h index e1ada204fafc01..65cc36db4089bc 100644 --- a/include/net/mqtt.h +++ b/include/net/mqtt.h @@ -698,6 +698,20 @@ int mqtt_input(struct mqtt_client *client); int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer, size_t length); +/** + * @brief Blocking version of @ref mqtt_read_publish_payload function. + * + * @param[in] client Client instance for which the procedure is requested. + * Shall not be NULL. + * @param[out] buffer Buffer where payload should be stored. + * @param[in] length Length of the buffer, in bytes. + * + * @return Number of bytes read or a negative error code (errno.h) indicating + * reason of failure. + */ +int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer, + size_t length); + #ifdef __cplusplus } #endif diff --git a/subsys/net/lib/mqtt/mqtt.c b/subsys/net/lib/mqtt/mqtt.c index be7523ace121e1..77653d2a66e643 100644 --- a/subsys/net/lib/mqtt/mqtt.c +++ b/subsys/net/lib/mqtt/mqtt.c @@ -596,8 +596,8 @@ int mqtt_input(struct mqtt_client *client) return err_code; } -int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer, - size_t length) +static int read_publish_payload(struct mqtt_client *client, void *buffer, + size_t length, bool shall_block) { int ret; @@ -614,8 +614,8 @@ int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer, length = client->internal.remaining_payload; } - ret = mqtt_transport_read(client, buffer, length); - if (ret == -EAGAIN) { + ret = mqtt_transport_read(client, buffer, length, shall_block); + if (!shall_block && ret == -EAGAIN) { goto exit; } @@ -635,3 +635,15 @@ int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer, return ret; } + +int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer, + size_t length) +{ + return read_publish_payload(client, buffer, length, false); +} + +int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer, + size_t length) +{ + return read_publish_payload(client, buffer, length, true); +} diff --git a/subsys/net/lib/mqtt/mqtt_rx.c b/subsys/net/lib/mqtt/mqtt_rx.c index b4c4da377027dc..6f8b29a6445673 100644 --- a/subsys/net/lib/mqtt/mqtt_rx.c +++ b/subsys/net/lib/mqtt/mqtt_rx.c @@ -158,7 +158,7 @@ static int mqtt_read_message_chunk(struct mqtt_client *client, return -ENOMEM; } - len = mqtt_transport_read(client, buf->end, remaining); + len = mqtt_transport_read(client, buf->end, remaining, false); if (len < 0) { MQTT_TRC("[CID %p]: Transport read error: %d", client, len); return len; diff --git a/subsys/net/lib/mqtt/mqtt_transport.c b/subsys/net/lib/mqtt/mqtt_transport.c index ebf33e65ed982d..c7b31ce39fa4a7 100644 --- a/subsys/net/lib/mqtt/mqtt_transport.c +++ b/subsys/net/lib/mqtt/mqtt_transport.c @@ -16,7 +16,7 @@ extern int mqtt_client_tcp_connect(struct mqtt_client *client); extern int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data, u32_t datalen); extern int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data, - u32_t buflen); + u32_t buflen, bool shall_block); extern int mqtt_client_tcp_disconnect(struct mqtt_client *client); #if defined(CONFIG_MQTT_LIB_TLS) @@ -25,7 +25,7 @@ extern int mqtt_client_tls_connect(struct mqtt_client *client); extern int mqtt_client_tls_write(struct mqtt_client *client, const u8_t *data, u32_t datalen); extern int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data, - u32_t buflen); + u32_t buflen, bool shall_block); extern int mqtt_client_tls_disconnect(struct mqtt_client *client); #endif /* CONFIG_MQTT_LIB_TLS */ @@ -72,9 +72,11 @@ int mqtt_transport_write(struct mqtt_client *client, const u8_t *data, datalen); } -int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen) +int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen, + bool shall_block) { - return transport_fn[client->transport.type].read(client, data, buflen); + return transport_fn[client->transport.type].read(client, data, buflen, + shall_block); } int mqtt_transport_disconnect(struct mqtt_client *client) diff --git a/subsys/net/lib/mqtt/mqtt_transport.h b/subsys/net/lib/mqtt/mqtt_transport.h index 6f295a717af3f7..a2f29a200522cc 100644 --- a/subsys/net/lib/mqtt/mqtt_transport.h +++ b/subsys/net/lib/mqtt/mqtt_transport.h @@ -27,7 +27,7 @@ typedef int (*transport_write_handler_t)(struct mqtt_client *client, /**@brief Transport read handler. */ typedef int (*transport_read_handler_t)(struct mqtt_client *client, u8_t *data, - u32_t buflen); + u32_t buflen, bool shall_block); /**@brief Transport disconnect handler. */ typedef int (*transport_disconnect_handler_t)(struct mqtt_client *client); @@ -79,11 +79,13 @@ int mqtt_transport_write(struct mqtt_client *client, const u8_t *data, * @param[in] client Identifies the client on which the procedure is requested. * @param[in] data Pointer where read data is to be fetched. * @param[in] buflen Size of memory provided for the operation. + * @param[in] shall_block Information whether the call should block or not. * * @retval Number of bytes read or an error code indicating reason for failure. * 0 if connection was closed. */ -int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen); +int mqtt_transport_read(struct mqtt_client *client, u8_t *data, u32_t buflen, + bool shall_block); /**@brief Handles transport disconnection requests on configured transport. * diff --git a/subsys/net/lib/mqtt/mqtt_transport_socket_tcp.c b/subsys/net/lib/mqtt/mqtt_transport_socket_tcp.c index 035917edb12d2c..6aa9c7c992d8b2 100644 --- a/subsys/net/lib/mqtt/mqtt_transport_socket_tcp.c +++ b/subsys/net/lib/mqtt/mqtt_transport_socket_tcp.c @@ -86,15 +86,22 @@ int mqtt_client_tcp_write(struct mqtt_client *client, const u8_t *data, * @param[in] client Identifies the client on which the procedure is requested. * @param[in] data Pointer where read data is to be fetched. * @param[in] buflen Size of memory provided for the operation. + * @param[in] shall_block Information whether the call should block or not. * * @retval Number of bytes read or an error code indicating reason for failure. * 0 if connection was closed. */ -int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data, u32_t buflen) +int mqtt_client_tcp_read(struct mqtt_client *client, u8_t *data, u32_t buflen, + bool shall_block) { + int flags = 0; int ret; - ret = recv(client->transport.tcp.sock, data, buflen, MSG_DONTWAIT); + if (!shall_block) { + flags |= MSG_DONTWAIT; + } + + ret = recv(client->transport.tcp.sock, data, buflen, flags); if (ret < 0) { return -errno; } diff --git a/subsys/net/lib/mqtt/mqtt_transport_socket_tls.c b/subsys/net/lib/mqtt/mqtt_transport_socket_tls.c index 57267b3d9953d0..e3fc5bb097b7c5 100644 --- a/subsys/net/lib/mqtt/mqtt_transport_socket_tls.c +++ b/subsys/net/lib/mqtt/mqtt_transport_socket_tls.c @@ -125,15 +125,22 @@ int mqtt_client_tls_write(struct mqtt_client *client, const u8_t *data, * @param[in] client Identifies the client on which the procedure is requested. * @param[in] data Pointer where read data is to be fetched. * @param[in] buflen Size of memory provided for the operation. + * @param[in] shall_block Information whether the call should block or not. * * @retval Number of bytes read or an error code indicating reason for failure. * 0 if connection was closed. */ -int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data, u32_t buflen) +int mqtt_client_tls_read(struct mqtt_client *client, u8_t *data, u32_t buflen, + bool shall_block) { + int flags = 0; int ret; - ret = recv(client->transport.tls.sock, data, buflen, MSG_DONTWAIT); + if (!shall_block) { + flags |= MSG_DONTWAIT; + } + + ret = recv(client->transport.tls.sock, data, buflen, flags); if (ret < 0) { return -errno; } diff --git a/tests/net/lib/mqtt_pubsub/src/test_mqtt_pubsub.c b/tests/net/lib/mqtt_pubsub/src/test_mqtt_pubsub.c index 2ad781578b7e7c..5d06b8e3802937 100644 --- a/tests/net/lib/mqtt_pubsub/src/test_mqtt_pubsub.c +++ b/tests/net/lib/mqtt_pubsub/src/test_mqtt_pubsub.c @@ -116,8 +116,8 @@ void publish_handler(struct mqtt_client *const client, } while (payload_left > 0) { - wait(APP_SLEEP_MSECS); - rc = mqtt_read_publish_payload(client, buf, sizeof(buf)); + rc = mqtt_read_publish_payload_blocking(client, buf, + sizeof(buf)); if (rc <= 0) { TC_PRINT("Failed to receive payload, err: %d\n", -rc); if (rc == -EAGAIN) {