Skip to content

Commit

Permalink
Improve multi-thread test message and in test mode check for actual m…
Browse files Browse the repository at this point in the history
…essage.
  • Loading branch information
dgarske committed Oct 24, 2023
1 parent 50b807f commit 0951553
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 44 deletions.
64 changes: 40 additions & 24 deletions examples/mqttexample.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,40 +154,56 @@ static int mqtt_get_rand(byte* data, word32 len)
return ret;
}

int mqtt_fill_random_hexstr(char* buf, word32 bufLen)
{
int rc = 0;
word32 pos = 0, sz, i;
const char kHexChar[] = { '0', '1', '2', '3', '4', '5', '6', '7',
'8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
byte rndBytes[32]; /* fill up to x bytes at a time */

while (rc == 0 && pos < bufLen) {
sz = bufLen - pos;
if (sz > (int)sizeof(rndBytes))
sz = (int)sizeof(rndBytes);
sz /= 2; /* 1 byte expands to 2 bytes */

rc = mqtt_get_rand(rndBytes, sz);
if (rc == 0) {
/* Convert random to hex string */
for (i=0; i<sz; i++) {
byte in = rndBytes[i];
buf[pos + (i*2)] = kHexChar[in >> 4];
buf[pos + (i*2)+1] = kHexChar[in & 0xf];
}
pos += sz*2;
}
}
return rc;
}

#ifndef TEST_RAND_SZ
#define TEST_RAND_SZ 4
#endif
static char* mqtt_append_random(const char* inStr, word32 inLen)
{
int rc;
const char kHexChar[] = { '0', '1', '2', '3', '4', '5', '6', '7',
'8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
byte rndBytes[TEST_RAND_SZ], rndHexStr[TEST_RAND_SZ*2];
char *tmp = NULL;
int rc = 0;
char *tmp;

rc = mqtt_get_rand(rndBytes, (word32)sizeof(rndBytes));
if (rc == 0) {
/* Convert random to hex string */
int i;
for (i=0; i<(int)sizeof(rndBytes); i++) {
byte in = rndBytes[i];
rndHexStr[(i*2)] = kHexChar[in >> 4];
rndHexStr[(i*2)+1] = kHexChar[in & 0xf];
}
}
if (rc == 0) {
/* Allocate topic name and client id */
tmp = (char*)WOLFMQTT_MALLOC(inLen + 1 + sizeof(rndHexStr) + 1);
if (tmp == NULL) {
rc = MQTT_CODE_ERROR_MEMORY;
}
tmp = (char*)WOLFMQTT_MALLOC(inLen + 1 + (TEST_RAND_SZ*2) + 1);
if (tmp == NULL) {
rc = MQTT_CODE_ERROR_MEMORY;
}
if (rc == 0) {
/* Format: inStr + `_` randhex + null term */
XMEMCPY(tmp, inStr, inLen);
tmp[inLen] = '_';
XMEMCPY(tmp + inLen + 1, rndHexStr, sizeof(rndHexStr));
tmp[inLen + 1 + sizeof(rndHexStr)] = '\0';
rc = mqtt_fill_random_hexstr(tmp + inLen + 1, (TEST_RAND_SZ*2));
tmp[inLen + 1 + (TEST_RAND_SZ*2)] = '\0'; /* null term */
}
if (rc != 0) {
WOLFMQTT_FREE(tmp);
tmp = NULL;
}
return tmp;
}
Expand Down Expand Up @@ -407,7 +423,7 @@ int mqtt_parse_args(MQTTCtx* mqttCtx, int argc, char** argv)

/* Remove SNI functionality for sn-client */
if(!XSTRNCMP(mqttCtx->app_name, "sn-client", 10)){
#ifdef HAVE_SNI
#ifdef HAVE_SNI
useSNI=0;
#endif
}
Expand Down
2 changes: 2 additions & 0 deletions examples/mqttexample.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ word16 mqtt_get_packetid(void);
int mqtt_check_timeout(int rc, word32* start_sec, word32 timeout_sec);
#endif

int mqtt_fill_random_hexstr(char* buf, word32 bufLen);

int mqtt_file_load(const char* filePath, byte** fileBuf, int *fileLen);

#ifdef WOLFSSL_ENCRYPTED_KEYS
Expand Down
35 changes: 15 additions & 20 deletions examples/multithread/multithread.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,9 @@
#define MAX_BUFFER_SIZE 1024

/* Total size of test message to build */
#define TEST_MESSAGE_SIZE 4680
#define TEST_MESSAGE_SIZE 1048 /* span more than one max packet */

/* Locals */
static const char testMessageBase[] = {
'a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z',
'A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z',
'0','1','2','3','4','5','6','7','8','9',
' ', '!', '"', '#', '$', '%', '&', '\'', '(', ')', '*', '+', ',', '-','.', '/',
':', ';', '<', '=', '>', '?', '@', '[', '\\', ']', '^', '_','`', '{', '|', '}'
};
static char mTestMessage[TEST_MESSAGE_SIZE];
static int mStopRead = 0;
static int mNumMsgsRecvd;
Expand Down Expand Up @@ -181,11 +174,6 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg,
/* Print incoming message */
PRINTF("MQTT Message: Topic %s, Qos %d, Id %d, Len %u, %u, %u",
buf, msg->qos, msg->packet_id, msg->total_len, msg->buffer_len, msg->buffer_pos);

/* for test mode: count the number of messages received */
if (mqttCtx->test_mode) {
mNumMsgsRecvd++;
}
}

/* Print message payload */
Expand All @@ -199,6 +187,17 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg,
msg->buffer_pos, msg->buffer_pos + msg->buffer_len, len, buf);

if (msg_done) {
/* for test mode: count the number of messages received */
if (mqttCtx->test_mode) {
if (msg->buffer_pos + msg->buffer_len ==
(word32)sizeof(mTestMessage) &&
XMEMCMP(&mTestMessage[msg->buffer_pos], msg->buffer,
msg->buffer_len) == 0)
{
mNumMsgsRecvd++;
}
}

PRINTF("MQTT Message: Done");
}
wm_SemUnlock(&mtLock);
Expand Down Expand Up @@ -689,16 +688,12 @@ static int unsubscribe_do(MQTTCtx *mqttCtx)
int multithread_test(MQTTCtx *mqttCtx)
{
int rc = 0, i, threadCount = 0;
size_t msgSz;
THREAD_T threadList[NUM_PUB_TASKS+3];

/* Build test message */
for (msgSz=0; msgSz<TEST_MESSAGE_SIZE; ) {
size_t x = sizeof(testMessageBase);
if (msgSz + x > TEST_MESSAGE_SIZE)
x = TEST_MESSAGE_SIZE - msgSz;
XMEMCPY(&mTestMessage[msgSz], testMessageBase, x);
msgSz += x;
rc = mqtt_fill_random_hexstr(mTestMessage, (word32)sizeof(mTestMessage));
if (rc != 0) {
return rc;
}

rc = multithread_test_init(mqttCtx);
Expand Down

0 comments on commit 0951553

Please sign in to comment.