From 152fc830f55d1b3d3af97b48e57af439c302a9ed Mon Sep 17 00:00:00 2001 From: Luis Alves Date: Mon, 24 Jan 2022 22:07:34 +0000 Subject: [PATCH] mqttlib: merge fixes from https://github.com/LiamBindle/MQTT-C/ --- src/mqttlib.c | 311 +++++++++++++++++++++++++++++--------------------- src/mqttlib.h | 51 +++++++-- 2 files changed, 221 insertions(+), 141 deletions(-) diff --git a/src/mqttlib.c b/src/mqttlib.c index 74e6501..94ade7a 100644 --- a/src/mqttlib.c +++ b/src/mqttlib.c @@ -47,17 +47,18 @@ static ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, char* buf, size_t buf /* successfully read bytes from the socket */ buf += rv; bufsz -= rv; - } else if (rv < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + } else if (rv == 0 || (rv < 0 && errno != EAGAIN && errno != EWOULDBLOCK)) { /* an error occurred that wasn't "nothing to read". */ return MQTT_ERROR_SOCKET_ERROR; } - } while (rv > 0); + } while (rv > 0 && bufsz > 0); return buf - start; } enum MQTTErrors mqtt_sync(struct mqtt_client *client) { /* Recover from any errors */ + enum MQTTErrors err; MQTT_PAL_MUTEX_LOCK(&client->mutex); if (client->error != MQTT_OK && client->reconnect_callback != NULL) { client->reconnect_callback(client, &client->reconnect_state); @@ -67,7 +68,6 @@ enum MQTTErrors mqtt_sync(struct mqtt_client *client) { } /* Call inspector callback if necessary */ - enum MQTTErrors err; if (client->inspector_callback != NULL) { MQTT_PAL_MUTEX_LOCK(&client->mutex); err = client->inspector_callback(client); @@ -85,12 +85,13 @@ enum MQTTErrors mqtt_sync(struct mqtt_client *client) { } uint16_t __mqtt_next_pid(struct mqtt_client *client) { + int pid_exists = 0; if (client->pid_lfsr == 0) { client->pid_lfsr = 163u; } /* LFSR taps taken from: https://en.wikipedia.org/wiki/Linear-feedback_shift_register */ - int pid_exists = 0; do { + struct mqtt_queued_message *curr; unsigned lsb = client->pid_lfsr & 1; (client->pid_lfsr) >>= 1; if (lsb) { @@ -99,7 +100,6 @@ uint16_t __mqtt_next_pid(struct mqtt_client *client) { /* check that the PID is unique */ pid_exists = 0; - struct mqtt_queued_message *curr; for(curr = mqtt_mq_get(&(client->mq), 0); curr >= client->mq.queue_tail; --curr) { if (curr->packet_id == client->pid_lfsr) { pid_exists = 1; @@ -140,12 +140,12 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, client->number_of_keep_alives = 0; client->typical_response_time = -1.0; client->publish_response_callback = publish_response_callback; + client->pid_lfsr = 0; + client->send_offset = 0; client->inspector_callback = NULL; client->reconnect_callback = NULL; client->reconnect_state = NULL; - - client->pid_lfsr = 0; return MQTT_OK; } @@ -173,6 +173,7 @@ void mqtt_init_reconnect(struct mqtt_client *client, client->number_of_keep_alives = 0; client->typical_response_time = -1.0; client->publish_response_callback = publish_response_callback; + client->send_offset = 0; client->inspector_callback = NULL; client->reconnect_callback = reconnect; @@ -273,10 +274,11 @@ enum MQTTErrors mqtt_publish(struct mqtt_client *client, size_t application_message_size, uint8_t publish_flags) { - MQTT_PAL_MUTEX_LOCK(&client->mutex); - uint16_t packet_id = __mqtt_next_pid(client); - ssize_t rv; struct mqtt_queued_message *msg; + ssize_t rv; + uint16_t packet_id; + MQTT_PAL_MUTEX_LOCK(&client->mutex); + packet_id = __mqtt_next_pid(client); /* try to pack the message */ MQTT_CLIENT_TRY_PACK( @@ -387,10 +389,11 @@ enum MQTTErrors mqtt_subscribe(struct mqtt_client *client, const char* topic_name, int max_qos_level) { - MQTT_PAL_MUTEX_LOCK(&client->mutex); - uint16_t packet_id = __mqtt_next_pid(client); ssize_t rv; + uint16_t packet_id; struct mqtt_queued_message *msg; + MQTT_PAL_MUTEX_LOCK(&client->mutex); + packet_id = __mqtt_next_pid(client); /* try to pack the message */ MQTT_CLIENT_TRY_PACK( @@ -400,7 +403,7 @@ enum MQTTErrors mqtt_subscribe(struct mqtt_client *client, packet_id, topic_name, max_qos_level, - NULL + (const char*) NULL ), 1 ); @@ -415,10 +418,10 @@ enum MQTTErrors mqtt_subscribe(struct mqtt_client *client, enum MQTTErrors mqtt_unsubscribe(struct mqtt_client *client, const char* topic_name) { - MQTT_PAL_MUTEX_LOCK(&client->mutex); uint16_t packet_id = __mqtt_next_pid(client); ssize_t rv; struct mqtt_queued_message *msg; + MQTT_PAL_MUTEX_LOCK(&client->mutex); /* try to pack the message */ MQTT_CLIENT_TRY_PACK( @@ -427,7 +430,7 @@ enum MQTTErrors mqtt_unsubscribe(struct mqtt_client *client, client->mq.curr, client->mq.curr_sz, packet_id, topic_name, - NULL + (const char*)NULL ), 1 ); @@ -469,9 +472,9 @@ enum MQTTErrors __mqtt_ping(struct mqtt_client *client) enum MQTTErrors mqtt_disconnect(struct mqtt_client *client) { - MQTT_PAL_MUTEX_LOCK(&client->mutex); ssize_t rv; struct mqtt_queued_message *msg; + MQTT_PAL_MUTEX_LOCK(&client->mutex); /* try to pack the message */ MQTT_CLIENT_TRY_PACK( @@ -490,8 +493,11 @@ enum MQTTErrors mqtt_disconnect(struct mqtt_client *client) ssize_t __mqtt_send(struct mqtt_client *client) { - MQTT_PAL_MUTEX_LOCK(&client->mutex); uint8_t inspected; + ssize_t len; + int inflight_qos2 = 0; + int i = 0; + MQTT_PAL_MUTEX_LOCK(&client->mutex); if (client->error < 0 && client->error != MQTT_ERROR_SEND_BUFFER_IS_FULL) { MQTT_PAL_MUTEX_UNLOCK(&client->mutex); @@ -499,9 +505,8 @@ ssize_t __mqtt_send(struct mqtt_client *client) } /* loop through all messages in the queue */ - int len = mqtt_mq_length(&client->mq); - int inflight_qos2 = 0; - for(int i = 0; i < len; ++i) { + len = mqtt_mq_length(&client->mq); + for(; i < len; ++i) { struct mqtt_queued_message *msg = mqtt_mq_get(&client->mq, i); int resend = 0; if (msg->state == MQTT_QUEUED_UNSENT) { @@ -512,6 +517,7 @@ ssize_t __mqtt_send(struct mqtt_client *client) if (MQTT_PAL_TIME() > msg->time_sent + client->response_timeout) { resend = 1; client->number_of_timeouts += 1; + client->send_offset = 0; } } @@ -534,11 +540,22 @@ ssize_t __mqtt_send(struct mqtt_client *client) } /* we're sending the message */ - ssize_t tmp = mqtt_pal_sendall(client->socketfd, (char*) msg->start, msg->size, 0); - if (tmp < 0) { + { + ssize_t tmp = mqtt_pal_sendall(client->socketfd, (char*) msg->start + client->send_offset, msg->size - client->send_offset, 0); + if (tmp < 0) { client->error = (enum MQTTErrors) tmp; MQTT_PAL_MUTEX_UNLOCK(&client->mutex); return tmp; + } else { + client->send_offset += tmp; + if(client->send_offset < msg->size) { + /* partial sent. Await additional calls */ + break; + } else { + /* whole message has been sent */ + client->send_offset = 0; + } + } } /* update timeout watcher */ @@ -570,13 +587,13 @@ ssize_t __mqtt_send(struct mqtt_client *client) msg->state = MQTT_QUEUED_COMPLETE; break; case MQTT_CONTROL_PUBLISH: - inspected = 0x03 & ((msg->start[0]) >> 1); /* qos */ + inspected = ( MQTT_PUBLISH_QOS_MASK & (msg->start[0]) ) >> 1; /* qos */ if (inspected == 0) { msg->state = MQTT_QUEUED_COMPLETE; } else if (inspected == 1) { msg->state = MQTT_QUEUED_AWAITING_ACK; - /*set DUP flag for subsequent sends */ - msg->start[1] |= MQTT_PUBLISH_DUP; + /*set DUP flag for subsequent sends [Spec MQTT-3.3.1-1] */ + msg->start[0] |= MQTT_PUBLISH_DUP; } else { msg->state = MQTT_QUEUED_AWAITING_ACK; } @@ -597,13 +614,15 @@ ssize_t __mqtt_send(struct mqtt_client *client) } /* check for keep-alive */ - mqtt_pal_time_t keep_alive_timeout = client->time_of_last_send + (mqtt_pal_time_t) ((float) (client->keep_alive) * 0.75); - if (MQTT_PAL_TIME() > keep_alive_timeout) { - ssize_t rv = __mqtt_ping(client); - if (rv != MQTT_OK) { + { + mqtt_pal_time_t keep_alive_timeout = client->time_of_last_send + (mqtt_pal_time_t) ((float) (client->keep_alive) * 0.75); + if (MQTT_PAL_TIME() > keep_alive_timeout) { + ssize_t rv = __mqtt_ping(client); + if (rv != MQTT_OK) { client->error = (enum MQTTErrors) rv; MQTT_PAL_MUTEX_UNLOCK(&client->mutex); return rv; + } } } @@ -611,15 +630,17 @@ ssize_t __mqtt_send(struct mqtt_client *client) return MQTT_OK; } -int __mqtt_recv(struct mqtt_client *client) +int __mqtt_recv(struct mqtt_client *client) { - MQTT_PAL_MUTEX_LOCK(&client->mutex); struct mqtt_response response; + ssize_t mqtt_recv_ret = MQTT_OK; + MQTT_PAL_MUTEX_LOCK(&client->mutex); - /* read until there is nothing left to read */ - while(1) { + /* read until there is nothing left to read, or there was an error */ + while(mqtt_recv_ret == MQTT_OK) { /* read in as many bytes as possible */ ssize_t rv, consumed; + struct mqtt_queued_message *msg = NULL; rv = mqtt_pal_recvall(client->socketfd, (char*) client->recv_buffer.curr, client->recv_buffer.curr_sz, 0); if (rv < 0) { @@ -653,7 +674,6 @@ int __mqtt_recv(struct mqtt_client *client) } /* response was unpacked successfully */ - struct mqtt_queued_message *msg = NULL; /* The switch statement below manages how the client responds to messages from the broker. @@ -689,17 +709,22 @@ int __mqtt_recv(struct mqtt_client *client) msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_CONNECT, NULL); if (msg == NULL) { client->error = MQTT_ERROR_ACK_OF_UNKNOWN; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return MQTT_ERROR_ACK_OF_UNKNOWN; + mqtt_recv_ret = MQTT_ERROR_ACK_OF_UNKNOWN; + break; } msg->state = MQTT_QUEUED_COMPLETE; /* initialize typical response time */ client->typical_response_time = (double) (MQTT_PAL_TIME() - msg->time_sent); /* check that connection was successful */ if (response.decoded.connack.return_code != MQTT_CONNACK_ACCEPTED) { - client->error = MQTT_ERROR_CONNECTION_REFUSED; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return MQTT_ERROR_CONNECTION_REFUSED; + if (response.decoded.connack.return_code == MQTT_CONNACK_REFUSED_IDENTIFIER_REJECTED) { + client->error = MQTT_ERROR_CONNECT_CLIENT_ID_REFUSED; + mqtt_recv_ret = MQTT_ERROR_CONNECT_CLIENT_ID_REFUSED; + } else { + client->error = MQTT_ERROR_CONNECTION_REFUSED; + mqtt_recv_ret = MQTT_ERROR_CONNECTION_REFUSED; + } + break; } break; case MQTT_CONTROL_PUBLISH: @@ -708,8 +733,8 @@ int __mqtt_recv(struct mqtt_client *client) rv = __mqtt_puback(client, response.decoded.publish.packet_id); if (rv != MQTT_OK) { client->error = (enum MQTTErrors) rv; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return rv; + mqtt_recv_ret = rv; + break; } } else if (response.decoded.publish.qos_level == 2) { /* check if this is a duplicate */ @@ -720,8 +745,8 @@ int __mqtt_recv(struct mqtt_client *client) rv = __mqtt_pubrec(client, response.decoded.publish.packet_id); if (rv != MQTT_OK) { client->error = (enum MQTTErrors) rv; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return rv; + mqtt_recv_ret = rv; + break; } } /* call publish callback */ @@ -732,8 +757,8 @@ int __mqtt_recv(struct mqtt_client *client) msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBLISH, &response.decoded.puback.packet_id); if (msg == NULL) { client->error = MQTT_ERROR_ACK_OF_UNKNOWN; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return MQTT_ERROR_ACK_OF_UNKNOWN; + mqtt_recv_ret = MQTT_ERROR_ACK_OF_UNKNOWN; + break; } msg->state = MQTT_QUEUED_COMPLETE; /* update response time */ @@ -748,8 +773,8 @@ int __mqtt_recv(struct mqtt_client *client) msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBLISH, &response.decoded.pubrec.packet_id); if (msg == NULL) { client->error = MQTT_ERROR_ACK_OF_UNKNOWN; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return MQTT_ERROR_ACK_OF_UNKNOWN; + mqtt_recv_ret = MQTT_ERROR_ACK_OF_UNKNOWN; + break; } msg->state = MQTT_QUEUED_COMPLETE; /* update response time */ @@ -758,8 +783,8 @@ int __mqtt_recv(struct mqtt_client *client) rv = __mqtt_pubrel(client, response.decoded.pubrec.packet_id); if (rv != MQTT_OK) { client->error = (enum MQTTErrors) rv; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return rv; + mqtt_recv_ret = rv; + break; } break; case MQTT_CONTROL_PUBREL: @@ -767,8 +792,8 @@ int __mqtt_recv(struct mqtt_client *client) msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREC, &response.decoded.pubrel.packet_id); if (msg == NULL) { client->error = MQTT_ERROR_ACK_OF_UNKNOWN; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return MQTT_ERROR_ACK_OF_UNKNOWN; + mqtt_recv_ret = MQTT_ERROR_ACK_OF_UNKNOWN; + break; } msg->state = MQTT_QUEUED_COMPLETE; /* update response time */ @@ -777,8 +802,8 @@ int __mqtt_recv(struct mqtt_client *client) rv = __mqtt_pubcomp(client, response.decoded.pubrec.packet_id); if (rv != MQTT_OK) { client->error = (enum MQTTErrors) rv; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return rv; + mqtt_recv_ret = rv; + break; } break; case MQTT_CONTROL_PUBCOMP: @@ -786,8 +811,8 @@ int __mqtt_recv(struct mqtt_client *client) msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREL, &response.decoded.pubcomp.packet_id); if (msg == NULL) { client->error = MQTT_ERROR_ACK_OF_UNKNOWN; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return MQTT_ERROR_ACK_OF_UNKNOWN; + mqtt_recv_ret = MQTT_ERROR_ACK_OF_UNKNOWN; + break; } msg->state = MQTT_QUEUED_COMPLETE; /* update response time */ @@ -798,8 +823,8 @@ int __mqtt_recv(struct mqtt_client *client) msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_SUBSCRIBE, &response.decoded.suback.packet_id); if (msg == NULL) { client->error = MQTT_ERROR_ACK_OF_UNKNOWN; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return MQTT_ERROR_ACK_OF_UNKNOWN; + mqtt_recv_ret = MQTT_ERROR_ACK_OF_UNKNOWN; + break; } msg->state = MQTT_QUEUED_COMPLETE; /* update response time */ @@ -807,8 +832,8 @@ int __mqtt_recv(struct mqtt_client *client) /* check that subscription was successful (not currently only one subscribe at a time) */ if (response.decoded.suback.return_codes[0] == MQTT_SUBACK_FAILURE) { client->error = MQTT_ERROR_SUBSCRIBE_FAILED; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return MQTT_ERROR_SUBSCRIBE_FAILED; + mqtt_recv_ret = MQTT_ERROR_SUBSCRIBE_FAILED; + break; } break; case MQTT_CONTROL_UNSUBACK: @@ -816,8 +841,8 @@ int __mqtt_recv(struct mqtt_client *client) msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_UNSUBSCRIBE, &response.decoded.unsuback.packet_id); if (msg == NULL) { client->error = MQTT_ERROR_ACK_OF_UNKNOWN; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return MQTT_ERROR_ACK_OF_UNKNOWN; + mqtt_recv_ret = MQTT_ERROR_ACK_OF_UNKNOWN; + break; } msg->state = MQTT_QUEUED_COMPLETE; /* update response time */ @@ -828,8 +853,8 @@ int __mqtt_recv(struct mqtt_client *client) msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PINGREQ, NULL); if (msg == NULL) { client->error = MQTT_ERROR_ACK_OF_UNKNOWN; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return MQTT_ERROR_ACK_OF_UNKNOWN; + mqtt_recv_ret = MQTT_ERROR_ACK_OF_UNKNOWN; + break; } msg->state = MQTT_QUEUED_COMPLETE; /* update response time */ @@ -837,22 +862,24 @@ int __mqtt_recv(struct mqtt_client *client) break; default: client->error = MQTT_ERROR_MALFORMED_RESPONSE; - MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - return MQTT_ERROR_MALFORMED_RESPONSE; + mqtt_recv_ret = MQTT_ERROR_MALFORMED_RESPONSE; + break; } - /* we've handled the response, now clean the buffer */ - void *dest = client->recv_buffer.mem_start; - void *src = client->recv_buffer.mem_start + consumed; - size_t n = client->recv_buffer.curr - client->recv_buffer.mem_start - consumed; - memmove(dest, src, n); - client->recv_buffer.curr -= consumed; - client->recv_buffer.curr_sz += consumed; + { + /* we've handled the response, now clean the buffer */ + void *dest = client->recv_buffer.mem_start; + void *src = client->recv_buffer.mem_start + consumed; + size_t n = client->recv_buffer.curr - client->recv_buffer.mem_start - consumed; + memmove(dest, src, n); + client->recv_buffer.curr -= consumed; + client->recv_buffer.curr_sz += consumed; + } } - /* never hit (always return once there's nothing left. */ - //MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - //return MQTT_OK; + /* In case there was some error handling the (well formed) message, we end up here */ + MQTT_PAL_MUTEX_UNLOCK(&client->mutex); + return mqtt_recv_ret; } /* FIXED HEADER */ @@ -920,7 +947,7 @@ struct { } }; -ssize_t mqtt_fixed_header_rule_violation(const struct mqtt_fixed_header *fixed_header) { +static ssize_t mqtt_fixed_header_rule_violation(const struct mqtt_fixed_header *fixed_header) { uint8_t control_type; uint8_t control_flags; uint8_t required_flags; @@ -1058,7 +1085,7 @@ ssize_t mqtt_pack_fixed_header(uint8_t *buf, size_t bufsz, const struct mqtt_fix } /* CONNECT */ -ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz, +ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz, const char* client_id, const char* will_topic, const void* will_message, @@ -1069,10 +1096,9 @@ ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz, uint16_t keep_alive) { struct mqtt_fixed_header fixed_header; - uint32_t remaining_length; + size_t remaining_length; const uint8_t *const start = buf; ssize_t rv; - uint8_t temp; /* pack the fixed headr */ fixed_header.control_type = MQTT_CONTROL_CONNECT; @@ -1083,14 +1109,17 @@ ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz, remaining_length = 10; /* size of variable header */ if (client_id == NULL) { - /* client_id is a mandatory parameter */ - return MQTT_ERROR_CONNECT_NULL_CLIENT_ID; - } else { - /* mqtt_string length is strlen + 2 */ - remaining_length += __mqtt_packed_cstrlen(client_id); + client_id = ""; } - + /* For an empty client_id, a clean session is required */ + if (client_id[0] == '\0' && !(connect_flags & MQTT_CONNECT_CLEAN_SESSION)) { + return MQTT_ERROR_CLEAN_SESSION_IS_REQUIRED; + } + /* mqtt_string length is strlen + 2 */ + remaining_length += __mqtt_packed_cstrlen(client_id); + if (will_topic != NULL) { + uint8_t temp; /* there is a will */ connect_flags |= MQTT_CONNECT_WILL_FLAG; remaining_length += __mqtt_packed_cstrlen(will_topic); @@ -1156,15 +1185,13 @@ ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz, *buf++ = (uint8_t) 'T'; *buf++ = MQTT_PROTOCOL_LEVEL; *buf++ = connect_flags; - *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(keep_alive); - buf += 2; + buf += __mqtt_pack_uint16(buf, keep_alive); /* pack the payload */ buf += __mqtt_pack_str(buf, client_id); if (connect_flags & MQTT_CONNECT_WILL_FLAG) { buf += __mqtt_pack_str(buf, will_topic); - *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(will_message_size); - buf += 2; + buf += __mqtt_pack_uint16(buf, will_message_size); memcpy(buf, will_message, will_message_size); buf += will_message_size; } @@ -1236,7 +1263,7 @@ ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz, const uint8_t *const start = buf; ssize_t rv; struct mqtt_fixed_header fixed_header; - uint16_t remaining_length; + uint32_t remaining_length; uint8_t inspected_qos; /* check for null pointers */ @@ -1245,25 +1272,25 @@ ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz, } /* inspect QoS level */ - inspected_qos = (publish_flags & 0x06) >> 1; /* mask */ + inspected_qos = (publish_flags & MQTT_PUBLISH_QOS_MASK) >> 1; /* mask */ /* build the fixed header */ fixed_header.control_type = MQTT_CONTROL_PUBLISH; /* calculate remaining length */ - remaining_length = __mqtt_packed_cstrlen(topic_name); + remaining_length = (uint32_t)__mqtt_packed_cstrlen(topic_name); if (inspected_qos > 0) { remaining_length += 2; } - remaining_length += application_message_size; + remaining_length += (uint32_t)application_message_size; fixed_header.remaining_length = remaining_length; - /* force dup to 0 if qos is 0 */ + /* force dup to 0 if qos is 0 [Spec MQTT-3.3.1-2] */ if (inspected_qos == 0) { publish_flags &= ~MQTT_PUBLISH_DUP; } - /* make sure that qos is not 3 */ + /* make sure that qos is not 3 [Spec MQTT-3.3.1-4] */ if (inspected_qos == 3) { return MQTT_ERROR_PUBLISH_FORBIDDEN_QOS; } @@ -1286,8 +1313,7 @@ ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz, /* pack variable header */ buf += __mqtt_pack_str(buf, topic_name); if (inspected_qos > 0) { - *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id); - buf += 2; + buf += __mqtt_pack_uint16(buf, packet_id); } /* pack payload */ @@ -1308,7 +1334,7 @@ ssize_t mqtt_unpack_publish_response(struct mqtt_response *mqtt_response, const /* get flags */ response->dup_flag = (fixed_header->control_flags & MQTT_PUBLISH_DUP) >> 3; - response->qos_level = (fixed_header->control_flags & 0x06) >> 1; + response->qos_level = (fixed_header->control_flags & MQTT_PUBLISH_QOS_MASK) >> 1; response->retain_flag = fixed_header->control_flags & MQTT_PUBLISH_RETAIN; /* make sure that remaining length is valid */ @@ -1317,13 +1343,13 @@ ssize_t mqtt_unpack_publish_response(struct mqtt_response *mqtt_response, const } /* parse variable header */ - response->topic_name_size = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf); + response->topic_name_size = __mqtt_unpack_uint16(buf); buf += 2; response->topic_name = buf; buf += response->topic_name_size; if (response->qos_level > 0) { - response->packet_id = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf); + response->packet_id = __mqtt_unpack_uint16(buf); buf += 2; } @@ -1371,8 +1397,7 @@ ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz, return 0; } - *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id); - buf += 2; + buf += __mqtt_pack_uint16(buf, packet_id); return buf - start; } @@ -1388,7 +1413,7 @@ ssize_t mqtt_unpack_pubxxx_response(struct mqtt_response *mqtt_response, const u } /* parse packet_id */ - packet_id = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf); + packet_id = __mqtt_unpack_uint16(buf); buf += 2; if (mqtt_response->fixed_header.control_type == MQTT_CONTROL_PUBACK) { @@ -1415,7 +1440,7 @@ ssize_t mqtt_unpack_suback_response (struct mqtt_response *mqtt_response, const } /* unpack packet_id */ - mqtt_response->decoded.suback.packet_id = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf); + mqtt_response->decoded.suback.packet_id = __mqtt_unpack_uint16(buf); buf += 2; remaining_length -= 2; @@ -1428,7 +1453,7 @@ ssize_t mqtt_unpack_suback_response (struct mqtt_response *mqtt_response, const } /* SUBSCRIBE */ -ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, uint16_t packet_id, ...) { +ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, unsigned int packet_id, ...) { va_list args; const uint8_t *const start = buf; ssize_t rv; @@ -1447,6 +1472,7 @@ ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, uint16_t packet_ break; } if (num_subs >= MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS) { + va_end(args); return MQTT_ERROR_SUBSCRIBE_TOO_MANY_TOPICS; } max_qos[num_subs] = (uint8_t) va_arg(args, unsigned int); @@ -1477,8 +1503,7 @@ ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, uint16_t packet_ } /* pack variable header */ - *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id); - buf += 2; + buf += __mqtt_pack_uint16(buf, packet_id); /* pack payload */ @@ -1500,14 +1525,14 @@ ssize_t mqtt_unpack_unsuback_response(struct mqtt_response *mqtt_response, const } /* parse packet_id */ - mqtt_response->decoded.unsuback.packet_id = (uint16_t) MQTT_PAL_NTOHS(*(uint16_t*) buf); + mqtt_response->decoded.unsuback.packet_id = __mqtt_unpack_uint16(buf); buf += 2; return buf - start; } /* UNSUBSCRIBE */ -ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, uint16_t packet_id, ...) { +ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, unsigned int packet_id, ...) { va_list args; const uint8_t *const start = buf; ssize_t rv; @@ -1527,6 +1552,7 @@ ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, uint16_t packe ++num_subs; if (num_subs >= MQTT_UNSUBSCRIBE_REQUEST_MAX_NUM_TOPICS) { + va_end(args); return MQTT_ERROR_UNSUBSCRIBE_TOO_MANY_TOPICS; } } @@ -1555,8 +1581,7 @@ ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, uint16_t packe } /* pack variable header */ - *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(packet_id); - buf += 2; + buf += __mqtt_pack_uint16(buf, packet_id); /* pack payload */ @@ -1570,11 +1595,14 @@ ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, uint16_t packe /* MESSAGE QUEUE */ void mqtt_mq_init(struct mqtt_message_queue *mq, void *buf, size_t bufsz) { - mq->mem_start = buf; - mq->mem_end = (uint8_t*)buf + bufsz; - mq->curr = buf; - mq->queue_tail = mq->mem_end; - mq->curr_sz = mqtt_mq_currsz(mq); + if(buf != NULL) + { + mq->mem_start = buf; + mq->mem_end = (unsigned char*)buf + bufsz; + mq->curr = buf; + mq->queue_tail = mq->mem_end; + mq->curr_sz = mqtt_mq_currsz(mq); + } } struct mqtt_queued_message* mqtt_mq_register(struct mqtt_message_queue *mq, size_t nbytes) @@ -1611,19 +1639,26 @@ void mqtt_mq_clean(struct mqtt_message_queue *mq) { } /* move buffered data */ - size_t n = mq->curr - new_head->start; - size_t removing = new_head->start - (uint8_t*) mq->mem_start; - memmove(mq->mem_start, new_head->start, n); - mq->curr = (uint8_t*)mq->mem_start + n; + { + size_t n = mq->curr - new_head->start; + size_t removing = new_head->start - (uint8_t*) mq->mem_start; + memmove(mq->mem_start, new_head->start, n); + mq->curr = (uint8_t*)mq->mem_start + n; - /* move queue */ - ssize_t new_tail_idx = new_head - mq->queue_tail; - memmove(mqtt_mq_get(mq, new_tail_idx), mq->queue_tail, sizeof(struct mqtt_queued_message) * (new_tail_idx + 1)); - mq->queue_tail = mqtt_mq_get(mq, new_tail_idx); - - /* bump back start's */ - for(ssize_t i = 0; i < new_tail_idx + 1; ++i) { - mqtt_mq_get(mq, i)->start -= removing; + /* move queue */ + { + ssize_t new_tail_idx = new_head - mq->queue_tail; + memmove(mqtt_mq_get(mq, new_tail_idx), mq->queue_tail, sizeof(struct mqtt_queued_message) * (new_tail_idx + 1)); + mq->queue_tail = mqtt_mq_get(mq, new_tail_idx); + + { + /* bump back start's */ + ssize_t i = 0; + for(; i < new_tail_idx + 1; ++i) { + mqtt_mq_get(mq, i)->start -= removing; + } + } + } } /* get curr_sz */ @@ -1688,15 +1723,29 @@ ssize_t mqtt_unpack_response(struct mqtt_response* response, const uint8_t *buf, } /* EXTRA DETAILS */ +ssize_t __mqtt_pack_uint16(uint8_t *buf, uint16_t integer) +{ + uint16_t integer_htons = MQTT_PAL_HTONS(integer); + memcpy(buf, &integer_htons, 2); + return 2; +} + +uint16_t __mqtt_unpack_uint16(const uint8_t *buf) +{ + uint16_t integer_htons; + memcpy(&integer_htons, buf, 2); + return MQTT_PAL_NTOHS(integer_htons); +} + ssize_t __mqtt_pack_str(uint8_t *buf, const char* str) { uint16_t length = strlen(str); + int i = 0; /* pack string length */ - *(uint16_t*) buf = (uint16_t) MQTT_PAL_HTONS(length); - buf += 2; + buf += __mqtt_pack_uint16(buf, length); /* pack string */ - for(int i = 0; i < length; ++i) { + for(; i < length; ++i) { *(buf++) = str[i]; } diff --git a/src/mqttlib.h b/src/mqttlib.h index 9079ba4..c6fda35 100644 --- a/src/mqttlib.h +++ b/src/mqttlib.h @@ -189,7 +189,7 @@ struct mqtt_fixed_header { enum MQTTControlPacketType control_type; /** The packets control flags.*/ - uint8_t control_flags: 4; + uint32_t control_flags: 4; /** The remaining size of the packet in bytes (i.e. the size of variable header and payload).*/ uint32_t remaining_length; @@ -214,7 +214,7 @@ struct mqtt_fixed_header { MQTT_ERROR(MQTT_ERROR_CONTROL_FORBIDDEN_TYPE) \ MQTT_ERROR(MQTT_ERROR_CONTROL_INVALID_FLAGS) \ MQTT_ERROR(MQTT_ERROR_CONTROL_WRONG_TYPE) \ - MQTT_ERROR(MQTT_ERROR_CONNECT_NULL_CLIENT_ID) \ + MQTT_ERROR(MQTT_ERROR_CONNECT_CLIENT_ID_REFUSED) \ MQTT_ERROR(MQTT_ERROR_CONNECT_NULL_WILL_MESSAGE) \ MQTT_ERROR(MQTT_ERROR_CONNECT_FORBIDDEN_WILL_QOS) \ MQTT_ERROR(MQTT_ERROR_CONNACK_FORBIDDEN_FLAGS) \ @@ -235,7 +235,8 @@ struct mqtt_fixed_header { MQTT_ERROR(MQTT_ERROR_SUBSCRIBE_FAILED) \ MQTT_ERROR(MQTT_ERROR_CONNECTION_CLOSED) \ MQTT_ERROR(MQTT_ERROR_INITIAL_RECONNECT) \ - MQTT_ERROR(MQTT_ERROR_INVALID_REMAINING_LENGTH) + MQTT_ERROR(MQTT_ERROR_INVALID_REMAINING_LENGTH) \ + MQTT_ERROR(MQTT_ERROR_CLEAN_SESSION_IS_REQUIRED) /* todo: add more connection refused errors */ @@ -276,6 +277,29 @@ enum MQTTErrors { */ const char* mqtt_error_str(enum MQTTErrors error); +/** + * @brief Pack a MQTT 16 bit integer, given a native 16 bit integer . + * + * @param[out] buf the buffer that the MQTT integer will be written to. + * @param[in] integer the native integer to be written to \p buf. + * + * @warning This function provides no error checking. + * + * @returns 2 +*/ +ssize_t __mqtt_pack_uint16(uint8_t *buf, uint16_t integer); + +/** + * @brief Unpack a MQTT 16 bit integer to a native 16 bit integer. + * + * @param[in] buf the buffer that the MQTT integer will be read from. + * + * @warning This function provides no error checking and does not modify \p buf. + * + * @returns The native integer +*/ +uint16_t __mqtt_unpack_uint16(const uint8_t *buf); + /** * @brief Pack a MQTT string, given a c-string \p str. * @@ -686,7 +710,7 @@ enum MQTTConnectFlags { MQTT_CONNECT_WILL_QOS_2 = (2u & 0x03) << 3, MQTT_CONNECT_WILL_RETAIN = 32u, MQTT_CONNECT_PASSWORD = 64u, - MQTT_CONNECT_USER_NAME = 128u, + MQTT_CONNECT_USER_NAME = 128u }; /** @@ -695,8 +719,8 @@ enum MQTTConnectFlags { * * @param[out] buf the buffer to pack the connection request packet into. * @param[in] bufsz the number of bytes left in \p buf. - * @param[in] client_id the ID that identifies the local client. \p client_id is a required - * parameter. + * @param[in] client_id the ID that identifies the local client. \p client_id can be NULL or an empty + * string for Anonymous clients. * @param[in] will_topic the topic under which the local client's will message will be published. * Set to \c NULL for no will message. If \p will_topic is not \c NULL a * \p will_message must also be provided. @@ -710,7 +734,7 @@ enum MQTTConnectFlags { * @param[in] password the password to be used to connect to the broker with. Set to \c NULL if * no password is required. * @param[in] connect_flags additional MQTTConnectFlags to be set. The only flags that need to be - * set manually are \c MQTT_CONNECT_CLEAN_SESSION, + * set manually are \c MQTT_CONNECT_CLEAN_SESSION, * \c MQTT_CONNECT_WILL_QOS_X (for \c X ∈ {0, 1, 2}), and * \c MQTT_CONNECT_WILL_RETAIN. Set to 0 if no additional flags are * required. @@ -848,7 +872,7 @@ ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz, * packet, a negative value if there was a protocol violation. */ ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, - uint16_t packet_id, + unsigned int packet_id, ...); /* null terminated */ /** @@ -882,7 +906,7 @@ ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, * packet, a negative value if there was a protocol violation. */ ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, - uint16_t packet_id, + unsigned int packet_id, ...); /* null terminated */ /** @@ -1111,6 +1135,13 @@ struct mqtt_client { */ int number_of_keep_alives; + /** + * @brief The current sent offset. + * + * This is used to allow partial send commands. + */ + size_t send_offset; + /** * @brief The timestamp of the last message sent to the buffer. * @@ -1415,7 +1446,7 @@ void mqtt_reinit(struct mqtt_client* client, * @pre mqtt_init must have been called. * * @param[in,out] client The MQTT client. - * @param[in] client_id The unique name identifying the client. + * @param[in] client_id The unique name identifying the client. (or NULL) * @param[in] will_topic The topic name of client's \p will_message. If no will message is * desired set to \c NULL. * @param[in] will_message The application message (data) to be published in the event the