Skip to content

Commit

Permalink
* Fix for non-blocking publish with payload larger than maximum TX bu…
Browse files Browse the repository at this point in the history
…ffer. ZD 16769

* Fix to make sure ctrl+c is honored during a want read/write case.
* Fix the firmware update example to properly synchronize publish and use a unique topic name.
* Fix for write position on cancel of message in progress.
* Improved handling for partial write use-cases.
* Improve multi-thread test message to use larger size (> tx but) and in test mode check for actual message.
* Improve remote test done logic.
  • Loading branch information
dgarske committed Oct 27, 2023
1 parent 02782ea commit cea5a27
Show file tree
Hide file tree
Showing 21 changed files with 354 additions and 187 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/fsanitize-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: fsanitize check Test

on:
push:
branches: [ '*' ]
branches: [ 'master', 'main', 'release/**' ]
pull_request:
branches: [ '*' ]

Expand Down Expand Up @@ -88,4 +88,4 @@ jobs:
- name: Show logs on failure
if: failure() || cancelled()
run: |
more test-suite.log
cat test-suite.log
2 changes: 1 addition & 1 deletion .github/workflows/macos-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: macOS Build Test

on:
push:
branches: [ '*' ]
branches: [ 'master', 'main', 'release/**' ]
pull_request:
branches: [ '*' ]

Expand Down
16 changes: 11 additions & 5 deletions .github/workflows/ubuntu-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Ubuntu Build Test

on:
push:
branches: [ '*' ]
branches: [ 'master', 'main', 'release/**' ]
pull_request:
branches: [ '*' ]

Expand Down Expand Up @@ -59,34 +59,40 @@ jobs:
run: ./configure
- name: wolfmqtt make
run: make
# Note: this will run the external tests for this CI only
- name: wolfmqtt make check
run: make check

env:
WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1

- name: wolfmqtt configure with SN Enabled
env:
WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1
run: ./configure --enable-sn
- name: wolfmqtt make
run: make
- name: wolfmqtt make check
run: make check

- name: wolfmqtt configure with Non-Block
env:
WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1
run: ./configure --enable-nonblock CFLAGS="-DWOLFMQTT_TEST_NONBLOCK"
- name: wolfmqtt make
run: make
- name: wolfmqtt make check
run: make check

- name: wolfmqtt configure with Non-Block and Multi-threading
env:
WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1
run: ./configure --enable-mt --enable-nonblock CFLAGS="-DWOLFMQTT_TEST_NONBLOCK"
- name: wolfmqtt make
run: make
- name: wolfmqtt make check
run: make check

- name: configure with Multi-threading and WOLFMQTT_DYN_PROP
env:
WOLFMQTT_NO_EXTERNAL_BROKER_TESTS: 1
run: ./configure --enable-mt CFLAGS="-DWOLFMQTT_DYN_PROP"
- name: make
run: make
Expand All @@ -97,4 +103,4 @@ jobs:
- name: Show logs on failure
if: failure() || cancelled()
run: |
more test-suite.log
cat test-suite.log
2 changes: 1 addition & 1 deletion .github/workflows/windows-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Windows Build Test

on:
push:
branches: [ '*' ]
branches: [ 'master', 'main', 'release/**' ]
pull_request:
branches: [ '*' ]

Expand Down
30 changes: 19 additions & 11 deletions examples/aws/awsiot.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

/* Locals */
static int mStopRead = 0;
static int mTestDone = 0;

/* Configuration */
#define APP_HARDWARE "wolf_aws_iot_demo"
Expand All @@ -66,8 +67,9 @@ static int mStopRead = 0;
#define AWSIOT_KEEP_ALIVE_SEC DEFAULT_KEEP_ALIVE_SEC
#define AWSIOT_CMD_TIMEOUT_MS DEFAULT_CMD_TIMEOUT_MS

#define AWSIOT_SUBSCRIBE_TOPIC "$aws/things/" AWSIOT_DEVICE_ID "/shadow/update/delta"
#define AWSIOT_PUBLISH_TOPIC "$aws/things/" AWSIOT_DEVICE_ID "/shadow/update"
#define AWSIOT_SUBSCRIBE_TOPIC AWSIOT_PUBLISH_TOPIC


#define AWSIOT_PUBLISH_MSG_SZ 400

Expand Down Expand Up @@ -293,6 +295,9 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg,

if (msg_done) {
PRINTF("MQTT Message: Done");
if (mqttCtx->test_mode) {
mTestDone = 1;
}
}

/* Return negative to terminate publish processing */
Expand Down Expand Up @@ -626,13 +631,6 @@ int awsiot_test(MQTTCtx *mqttCtx)
mqttCtx->stat = WMQ_WAIT_MSG;

do {
/* check for test mode or stop */
if (mStopRead || mqttCtx->test_mode) {
rc = MQTT_CODE_SUCCESS;
PRINTF("MQTT Exiting...");
break;
}

/* Try and read packet */
rc = MqttClient_WaitMessage(&mqttCtx->client, mqttCtx->cmd_timeout_ms);

Expand All @@ -646,8 +644,17 @@ int awsiot_test(MQTTCtx *mqttCtx)
if (rc == MQTT_CODE_CONTINUE) {
return rc;
}

/* check for test mode or stop */
if (mStopRead || mTestDone) {
rc = MQTT_CODE_SUCCESS;
mqttCtx->stat = WMQ_DISCONNECT;
PRINTF("MQTT Exiting...");
break;
}

#ifdef WOLFMQTT_ENABLE_STDIN_CAP
else if (rc == MQTT_CODE_STDIN_WAKE) {
if (rc == MQTT_CODE_STDIN_WAKE) {
/* Get data from STDIO */
XMEMSET(mqttCtx->rx_buf, 0, MAX_BUFFER_SIZE);
if (XFGETS((char*)mqttCtx->rx_buf, MAX_BUFFER_SIZE - 1, stdin) != NULL) {
Expand All @@ -672,8 +679,9 @@ int awsiot_test(MQTTCtx *mqttCtx)
MqttClient_ReturnCodeToString(rc), rc);
}
}
else
#endif
else if (rc == MQTT_CODE_ERROR_TIMEOUT) {
if (rc == MQTT_CODE_ERROR_TIMEOUT) {
/* Keep Alive */
PRINTF("Keep-alive timeout, sending ping");

Expand Down Expand Up @@ -838,7 +846,7 @@ int awsiot_test(MQTTCtx *mqttCtx)
#ifdef ENABLE_AWSIOT_EXAMPLE
do {
rc = awsiot_test(&mqttCtx);
} while (rc == MQTT_CODE_CONTINUE);
} while (!mStopRead && rc == MQTT_CODE_CONTINUE);

mqtt_free_ctx(&mqttCtx);
#else
Expand Down
70 changes: 45 additions & 25 deletions examples/azure/azureiothub.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@

/* Locals */
static int mStopRead = 0;
static int mTestDone = 0;

/* Configuration */
/* Reference:
Expand Down Expand Up @@ -160,6 +161,9 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg,

if (msg_done) {
PRINTF("MQTT Message: Done");
if (mqttCtx->test_mode) {
mTestDone = 1;
}
}

/* Return negative to terminate publish processing */
Expand Down Expand Up @@ -449,13 +453,6 @@ int azureiothub_test(MQTTCtx *mqttCtx)
mqttCtx->stat = WMQ_WAIT_MSG;

do {
/* check for test mode or stop */
if (mStopRead || mqttCtx->test_mode) {
rc = MQTT_CODE_SUCCESS;
PRINTF("MQTT Exiting...");
break;
}

/* Try and read packet */
rc = MqttClient_WaitMessage(&mqttCtx->client, mqttCtx->cmd_timeout_ms);

Expand All @@ -469,8 +466,17 @@ int azureiothub_test(MQTTCtx *mqttCtx)
if (rc == MQTT_CODE_CONTINUE) {
return rc;
}

/* check for test mode or stop */
if (mStopRead || mTestDone) {
rc = MQTT_CODE_SUCCESS;
mqttCtx->stat = WMQ_DISCONNECT;
PRINTF("MQTT Exiting...");
goto disconn;
}

#ifdef WOLFMQTT_ENABLE_STDIN_CAP
else if (rc == MQTT_CODE_STDIN_WAKE) {
if (rc == MQTT_CODE_STDIN_WAKE) {
/* Get data from STDIO */
XMEMSET(mqttCtx->rx_buf, 0, MAX_BUFFER_SIZE);
if (XFGETS((char*)mqttCtx->rx_buf, MAX_BUFFER_SIZE - 1, stdin) != NULL) {
Expand All @@ -493,38 +499,53 @@ int azureiothub_test(MQTTCtx *mqttCtx)
MqttClient_ReturnCodeToString(rc), rc);
}
}
else
#endif
else if (rc == MQTT_CODE_ERROR_TIMEOUT) {
if (rc == MQTT_CODE_ERROR_TIMEOUT) {
/* Keep Alive */
PRINTF("Keep-alive timeout, sending ping");

rc = MqttClient_Ping_ex(&mqttCtx->client, &mqttCtx->ping);
if (rc == MQTT_CODE_CONTINUE) {
return rc;
}
else if (rc != MQTT_CODE_SUCCESS) {
PRINTF("MQTT Ping Keep Alive Error: %s (%d)",
MqttClient_ReturnCodeToString(rc), rc);
break;
}
mqttCtx->stat = WMQ_PING;
break;
}
else if (rc != MQTT_CODE_SUCCESS) {
/* There was an error */
PRINTF("MQTT Message Wait: %s (%d)",
MqttClient_ReturnCodeToString(rc), rc);
break;
goto disconn;
}
} while (1);
}
FALL_THROUGH;

/* Check for error */
if (rc != MQTT_CODE_SUCCESS) {
goto disconn;
case WMQ_PING:
{
mqttCtx->stat = WMQ_PING;

rc = MqttClient_Ping_ex(&mqttCtx->client, &mqttCtx->ping);
if (rc == MQTT_CODE_CONTINUE) {
return rc;
}
else if (rc != MQTT_CODE_SUCCESS) {
PRINTF("MQTT Ping Keep Alive Error: %s (%d)",
MqttClient_ReturnCodeToString(rc), rc);
break;
}

if (mqttCtx->test_mode) {
PRINTF("MQTT Ping Done, exiting for test mode");
mTestDone = 1;
}
else {
mqttCtx->stat = WMQ_WAIT_MSG;
break;
}
}
FALL_THROUGH;

case WMQ_DISCONNECT:
{
mqttCtx->stat = WMQ_DISCONNECT;

/* Disconnect */
rc = MqttClient_Disconnect(&mqttCtx->client);
if (rc == MQTT_CODE_CONTINUE) {
Expand Down Expand Up @@ -559,7 +580,6 @@ int azureiothub_test(MQTTCtx *mqttCtx)
}

case WMQ_UNSUB: /* not used */
case WMQ_PING:
default:
rc = MQTT_CODE_ERROR_STAT;
goto exit;
Expand Down Expand Up @@ -659,7 +679,7 @@ int azureiothub_test(MQTTCtx *mqttCtx)
#ifdef ENABLE_AZUREIOTHUB_EXAMPLE
do {
rc = azureiothub_test(&mqttCtx);
} while (rc == MQTT_CODE_CONTINUE);
} while (!mStopRead && rc == MQTT_CODE_CONTINUE);

mqtt_free_ctx(&mqttCtx);
#else
Expand Down
Loading

0 comments on commit cea5a27

Please sign in to comment.