Permalink
Browse files

Add keepalive_spec and stream handling tests

  • Loading branch information...
1 parent 3930b15 commit c37e03e2a8500d76d2b1eceff08d68700ece037e @knolleary committed Feb 8, 2014
@@ -29,21 +29,21 @@ PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,u
this->stream = NULL;
}
-PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client, Stream *stream) {
+PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client, Stream& stream) {
this->_client = &client;
this->callback = callback;
this->ip = ip;
this->port = port;
this->domain = NULL;
- this->stream = stream;
+ this->stream = &stream;
}
-PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client, Stream *stream) {
+PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client, Stream& stream) {
this->_client = &client;
this->callback = callback;
this->domain = domain;
this->port = port;
- this->stream = stream;
+ this->stream = &stream;
}
boolean PubSubClient::connect(char *id) {
@@ -144,6 +144,7 @@ uint8_t PubSubClient::readByte() {
uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint16_t len = 0;
buffer[len++] = readByte();
+ bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
uint32_t multiplier = 1;
uint16_t length = 0;
uint8_t digit = 0;
@@ -158,7 +159,7 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
} while ((digit & 128) != 0);
*lengthLength = len-1;
- if ((buffer[0]&0xF0) == MQTTPUBLISH) {
+ if (isPublish) {
// Read in topic length to calculate bytes to skip over for Stream writing
buffer[len++] = readByte();
buffer[len++] = readByte();
@@ -172,18 +173,21 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
for (uint16_t i = start;i<length;i++) {
digit = readByte();
- if(this->stream && ((buffer[0]&0xF0) == MQTTPUBLISH) && len-*lengthLength-2>skip) {
- this->stream->write(digit);
+ if (this->stream) {
+ if (isPublish && len-*lengthLength-2>skip) {
+ this->stream->write(digit);
+ }
}
if (len < MQTT_MAX_PACKET_SIZE) {
- buffer[len++] = digit;
- } else if (!this->stream) {
- len = 0; // This will cause the packet to be ignored.
+ buffer[len] = digit;
}
+ len++;
+ }
+
+ if (!this->stream && len > MQTT_MAX_PACKET_SIZE) {
+ len = 0; // This will cause the packet to be ignored.
}
- // If a stream has been provided, indicate that we wrote the whole length,
- // else return 0 if the length exceed the max packet size
return len;
}
@@ -220,11 +224,18 @@ boolean PubSubClient::loop() {
}
topic[tl] = 0;
// msgId only present for QOS>0
- if (buffer[0]&MQTTQOS1) {
+ if ((buffer[0]&0x06) == MQTTQOS1) {
msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
payload = buffer+llen+3+tl+2;
callback(topic,payload,len-llen-3-tl-2);
- puback(msgId);
+
+ buffer[0] = MQTTPUBACK;
+ buffer[1] = 2;
+ buffer[2] = (msgId >> 8);
+ buffer[3] = (msgId & 0xFF);
+ _client->write(buffer,4);
+ lastOutActivity = t;
+
} else {
payload = buffer+llen+3+tl;
callback(topic,payload,len-llen-3-tl);
@@ -366,17 +377,6 @@ boolean PubSubClient::subscribe(char* topic, uint8_t qos) {
return false;
}
-boolean PubSubClient::puback(uint16_t msgId) {
- if(connected()) {
- // Leave room in the buffer for header and variable length field
- uint16_t length = 5;
- buffer[length++] = (msgId >> 8);
- buffer[length++] = (msgId & 0xFF);
- return write(MQTTPUBACK,buffer,length-5);
- }
- return false;
-}
-
boolean PubSubClient::unsubscribe(char* topic) {
if (connected()) {
uint16_t length = 5;
@@ -51,17 +51,16 @@ class PubSubClient {
uint8_t readByte();
boolean write(uint8_t header, uint8_t* buf, uint16_t length);
uint16_t writeString(char* string, uint8_t* buf, uint16_t pos);
- boolean puback(uint16_t msgId);
uint8_t *ip;
char* domain;
uint16_t port;
Stream* stream;
public:
PubSubClient();
PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client);
- PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client, Stream*);
+ PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client, Stream&);
PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client);
- PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client, Stream*);
+ PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client, Stream&);
boolean connect(char *);
boolean connect(char *, char *, char *);
boolean connect(char *, char *, uint8_t, uint8_t, char *);
@@ -35,7 +35,7 @@ void callback(char* topic, byte* payload, unsigned int length) {
}
EthernetClient ethClient;
-PubSubClient client(server, 1883, callback, ethClient, &sram);
+PubSubClient client(server, 1883, callback, ethClient, sram);
void setup()
{
@@ -0,0 +1,177 @@
+#include "PubSubClient.h"
+#include "ShimClient.h"
+#include "Buffer.h"
+#include "BDDTest.h"
+#include "trace.h"
+
+
+byte server[] = { 172, 16, 0, 2 };
+
+void callback(char* topic, byte* payload, unsigned int length) {
+ // handle message arrived
+}
+
+
+int test_keepalive_pings_idle() {
+ IT("keeps an idle connection alive");
+
+ ShimClient shimClient;
+ shimClient.setAllowConnect(true);
+
+ byte connack[] = { 0x20, 0x02, 0x00, 0x00 };
+ shimClient.respond(connack,4);
+
+ PubSubClient client(server, 1883, callback, shimClient);
+ int rc = client.connect((char*)"client_test1");
+ IS_TRUE(rc);
+
+ byte pingreq[] = { 0xC0,0x0 };
+ shimClient.expect(pingreq,2);
+ byte pingresp[] = { 0xD0,0x0 };
+ shimClient.respond(pingresp,2);
+
+ for (int i = 0; i < 50; i++) {
+ sleep(1);
+ rc = client.loop();
+ IS_TRUE(rc);
+ }
+
+ IS_FALSE(shimClient.error());
+
+ END_IT
+}
+
+int test_keepalive_pings_with_outbound_qos0() {
+ IT("keeps a connection alive that only sends qos0");
+
+ ShimClient shimClient;
+ shimClient.setAllowConnect(true);
+
+ byte connack[] = { 0x20, 0x02, 0x00, 0x00 };
+ shimClient.respond(connack,4);
+
+ PubSubClient client(server, 1883, callback, shimClient);
+ int rc = client.connect((char*)"client_test1");
+ IS_TRUE(rc);
+
+ byte publish[] = {0x30,0xe,0x0,0x5,0x74,0x6f,0x70,0x69,0x63,0x70,0x61,0x79,0x6c,0x6f,0x61,0x64};
+
+ for (int i = 0; i < 50; i++) {
+ TRACE(i<<":");
+ shimClient.expect(publish,16);
+ rc = client.publish((char*)"topic",(char*)"payload");
+ IS_TRUE(rc);
+ IS_FALSE(shimClient.error());
+ sleep(1);
+ if ( i == 15 || i == 31 || i == 47) {
+ byte pingreq[] = { 0xC0,0x0 };
+ shimClient.expect(pingreq,2);
+ byte pingresp[] = { 0xD0,0x0 };
+ shimClient.respond(pingresp,2);
+ }
+ rc = client.loop();
+ IS_TRUE(rc);
+ IS_FALSE(shimClient.error());
+ }
+
+ END_IT
+}
+
+int test_keepalive_pings_with_inbound_qos0() {
+ IT("keeps a connection alive that only receives qos0");
+
+ ShimClient shimClient;
+ shimClient.setAllowConnect(true);
+
+ byte connack[] = { 0x20, 0x02, 0x00, 0x00 };
+ shimClient.respond(connack,4);
+
+ PubSubClient client(server, 1883, callback, shimClient);
+ int rc = client.connect((char*)"client_test1");
+ IS_TRUE(rc);
+
+ byte publish[] = {0x30,0xe,0x0,0x5,0x74,0x6f,0x70,0x69,0x63,0x70,0x61,0x79,0x6c,0x6f,0x61,0x64};
+
+ for (int i = 0; i < 50; i++) {
+ TRACE(i<<":");
+ sleep(1);
+ if ( i == 15 || i == 31 || i == 47) {
+ byte pingreq[] = { 0xC0,0x0 };
+ shimClient.expect(pingreq,2);
+ byte pingresp[] = { 0xD0,0x0 };
+ shimClient.respond(pingresp,2);
+ }
+ shimClient.respond(publish,16);
+ rc = client.loop();
+ IS_TRUE(rc);
+ IS_FALSE(shimClient.error());
+ }
+
+ END_IT
+}
+
+int test_keepalive_no_pings_inbound_qos1() {
+ IT("does not send pings for connections with inbound qos1");
+
+ ShimClient shimClient;
+ shimClient.setAllowConnect(true);
+
+ byte connack[] = { 0x20, 0x02, 0x00, 0x00 };
+ shimClient.respond(connack,4);
+
+ PubSubClient client(server, 1883, callback, shimClient);
+ int rc = client.connect((char*)"client_test1");
+ IS_TRUE(rc);
+
+ byte publish[] = {0x32,0x10,0x0,0x5,0x74,0x6f,0x70,0x69,0x63,0x12,0x34,0x70,0x61,0x79,0x6c,0x6f,0x61,0x64};
+ byte puback[] = {0x40,0x2,0x12,0x34};
+
+ for (int i = 0; i < 50; i++) {
+ shimClient.respond(publish,18);
+ shimClient.expect(puback,4);
+ sleep(1);
+ rc = client.loop();
+ IS_TRUE(rc);
+ IS_FALSE(shimClient.error());
+ }
+
+ END_IT
+}
+
+int test_keepalive_disconnects_hung() {
+ IT("disconnects a hung connection");
+
+ ShimClient shimClient;
+ shimClient.setAllowConnect(true);
+
+ byte connack[] = { 0x20, 0x02, 0x00, 0x00 };
+ shimClient.respond(connack,4);
+
+ PubSubClient client(server, 1883, callback, shimClient);
+ int rc = client.connect((char*)"client_test1");
+ IS_TRUE(rc);
+
+ byte pingreq[] = { 0xC0,0x0 };
+ shimClient.expect(pingreq,2);
+
+ for (int i = 0; i < 32; i++) {
+ sleep(1);
+ rc = client.loop();
+ }
+ IS_FALSE(rc);
+
+ IS_FALSE(shimClient.error());
+
+ END_IT
+}
+
+int main()
+{
+ test_keepalive_pings_idle();
+ test_keepalive_pings_with_outbound_qos0();
+ test_keepalive_pings_with_inbound_qos0();
+ test_keepalive_no_pings_inbound_qos1();
+ test_keepalive_disconnects_hung();
+
+ FINISH
+}
@@ -0,0 +1,39 @@
+#include "Stream.h"
+#include "trace.h"
+#include <iostream>
+#include <Arduino.h>
+
+Stream::Stream() {
+ this->expectBuffer = new Buffer();
+ this->_error = false;
+ this->_written = 0;
+}
+
+size_t Stream::write(uint8_t b) {
+ this->_written++;
+ TRACE(std::hex << (unsigned int)b);
+ if (this->expectBuffer->available()) {
+ uint8_t expected = this->expectBuffer->next();
+ if (expected != b) {
+ this->_error = true;
+ TRACE("!=" << (unsigned int)expected);
+ }
+ } else {
+ this->_error = true;
+ }
+ TRACE("\n"<< std::dec);
+ return 1;
+}
+
+
+bool Stream::error() {
+ return this->_error;
+}
+
+void Stream::expect(uint8_t *buf, size_t size) {
+ this->expectBuffer->add(buf,size);
+}
+
+uint16_t Stream::length() {
+ return this->_written;
+}
Oops, something went wrong.

0 comments on commit c37e03e

Please sign in to comment.