diff --git a/src/srnetmqtt.cc b/src/srnetmqtt.cc index f3e26a7..a09646a 100644 --- a/src/srnetmqtt.cc +++ b/src/srnetmqtt.cc @@ -126,6 +126,79 @@ int SrNetMqtt::connect(bool clean, char nflag) return ret == 1 && rc == 0 ? 0 : -1; } +static int _findPuback(unsigned char *respbuf, const int len, const unsigned short msgid) +{ + int i = 0; + for (int n = 0; i < len; i += n) + { + unsigned char* const buf = respbuf + i; + const char type = ((*buf) & 0xf0) >> 4; + + switch (type) + { + case 3: // publish + { + MQTTString topics = MQTTString_initializer; + unsigned char dup = 0, retain = 0, *payload = NULL; + int payloadlen = 0, qos = 0; + unsigned short mypacketid = 0; + if (MQTTDeserialize_publish(&dup, &qos, &retain, &mypacketid, &topics, &payload, &payloadlen, buf, len - i) != 1) + { + return -1; + } + n = payload - buf + payloadlen; + break; + } + + case 4: // puback + { + unsigned short mypacketid = 0; + unsigned char dup = 0, type = 0; + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, buf, 4) != 1) { + n = 4; + break; + } else if (mypacketid != msgid) { + n = 4; + break; + } else { + return 0; + } + break; + } + + case 2: // connack + case 5: // pubrec + case 6: // pubrel + case 7: // pubcomp + case 11: // unsuback + { + n = 4; + break; + } + + case 9: // suback + { + ++i; + i += MQTTPacket_decodeBuf(buf + 1, &n); + break; + } + + case 13: // pingresp + { + n = 2; + break; + } + + default: + { + srWarning("MQTT recv: type " + to_string((int) type)); + n = 1; + } + } + } + return -1; +} + int SrNetMqtt::publish(const string &topic, const string &msg, char nflag) { const int qos = (nflag >> 1) & 3; @@ -177,20 +250,13 @@ int SrNetMqtt::publish(const string &topic, const string &msg, char nflag) return -1; } - unsigned short mypacketid; - unsigned char dup, type; - unsigned char* const readbuf = (unsigned char*) resp.c_str() + offset; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, recvlen) != 1) { - return -1; - } else if (mypacketid != msgid) { - srError("MQTT: Unexpected message ID " + to_string(mypacketid)); - return -1; - } else if (type != 4) { // PUBACK - srError("MQTT: Unexpected message type " + to_string(type)); + unsigned char* const respbuf = (unsigned char*) resp.c_str() + offset; + if (_findPuback(respbuf, recvlen, msgid) < 0) { + srError("MQTT pub: No PUBACK received after publishing"); return -1; - } else { - return 0; } + + return 0; } static int sub(SrNetMqtt *mqtt, MQTTString *ts, int *qos, int n, char *errbuf)