Skip to content

Commit

Permalink
[MTM-33389][fix]srnetmqtt: Added findPuback() to fix an issue when re…
Browse files Browse the repository at this point in the history
…sponse buffer having more than one packet
  • Loading branch information
rina23q committed Jul 21, 2020
1 parent c1a780f commit 171719a
Showing 1 changed file with 78 additions and 12 deletions.
90 changes: 78 additions & 12 deletions src/srnetmqtt.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,79 @@ int SrNetMqtt::connect(bool clean, char nflag)
return ret == 1 && rc == 0 ? 0 : -1;
}

static int _findPuback(unsigned char *respbuf, const int len, const unsigned short msgid)
{
int i = 0;
for (int n = 0; i < len; i += n)
{
unsigned char* const buf = respbuf + i;
const char type = ((*buf) & 0xf0) >> 4;

switch (type)
{
case 3: // publish
{
MQTTString topics = MQTTString_initializer;
unsigned char dup = 0, retain = 0, *payload = NULL;
int payloadlen = 0, qos = 0;
unsigned short mypacketid = 0;
if (MQTTDeserialize_publish(&dup, &qos, &retain, &mypacketid, &topics, &payload, &payloadlen, buf, len - i) != 1)
{
return -1;
}
n = payload - buf + payloadlen;
break;
}

case 4: // puback
{
unsigned short mypacketid = 0;
unsigned char dup = 0, type = 0;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, buf, 4) != 1) {
n = 4;
break;
} else if (mypacketid != msgid) {
n = 4;
break;
} else {
return 0;
}
break;
}

case 2: // connack
case 5: // pubrec
case 6: // pubrel
case 7: // pubcomp
case 11: // unsuback
{
n = 4;
break;
}

case 9: // suback
{
++i;
i += MQTTPacket_decodeBuf(buf + 1, &n);
break;
}

case 13: // pingresp
{
n = 2;
break;
}

default:
{
srWarning("MQTT recv: type " + to_string((int) type));
n = 1;
}
}
}
return -1;
}

int SrNetMqtt::publish(const string &topic, const string &msg, char nflag)
{
const int qos = (nflag >> 1) & 3;
Expand Down Expand Up @@ -177,20 +250,13 @@ int SrNetMqtt::publish(const string &topic, const string &msg, char nflag)
return -1;
}

unsigned short mypacketid;
unsigned char dup, type;
unsigned char* const readbuf = (unsigned char*) resp.c_str() + offset;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, recvlen) != 1) {
return -1;
} else if (mypacketid != msgid) {
srError("MQTT: Unexpected message ID " + to_string(mypacketid));
return -1;
} else if (type != 4) { // PUBACK
srError("MQTT: Unexpected message type " + to_string(type));
unsigned char* const respbuf = (unsigned char*) resp.c_str() + offset;
if (_findPuback(respbuf, recvlen, msgid) < 0) {
srError("MQTT pub: No PUBACK received after publishing");
return -1;
} else {
return 0;
}

return 0;
}

static int sub(SrNetMqtt *mqtt, MQTTString *ts, int *qos, int n, char *errbuf)
Expand Down

0 comments on commit 171719a

Please sign in to comment.