diff --git a/libraries/standard/mqtt/src/iot_mqtt_operation.c b/libraries/standard/mqtt/src/iot_mqtt_operation.c index 8bf96297b3..e9a8ab3551 100644 --- a/libraries/standard/mqtt/src/iot_mqtt_operation.c +++ b/libraries/standard/mqtt/src/iot_mqtt_operation.c @@ -1045,11 +1045,12 @@ void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool, * since a network response could modify the status. */ if( networkPending == false ) { + /* Operations that are not waiting for a network response either failed or + * completed successfully. Check that a status was set. */ + IotMqtt_Assert( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING ); + /* Notify of operation completion if this job set a status. */ - if( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING ) - { - _IotMqtt_Notify( pOperation ); - } + _IotMqtt_Notify( pOperation ); } } } @@ -1060,6 +1061,7 @@ void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool, IotTaskPoolJob_t pOperationJob, void * pContext ) { + bool destroyOperation = false; _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext; IotMqttCallbackParam_t callbackParam = { 0 }; @@ -1067,6 +1069,7 @@ void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool, * are disabled. */ ( void ) pTaskPool; ( void ) pOperationJob; + ( void ) destroyOperation; IotMqtt_Assert( pOperationJob == pOperation->job ); /* The operation's callback function and status must be set. */ @@ -1082,11 +1085,11 @@ void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool, pOperation->u.operation.notify.callback.function( pOperation->u.operation.notify.callback.pCallbackContext, &callbackParam ); - /* Attempt to destroy the operation once the user callback returns. */ - if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true ) - { - _IotMqtt_DestroyOperation( pOperation ); - } + /* Decrement the operation reference count. This function is at the end of the + * operation lifecycle, so the operation must be destroyed here. */ + destroyOperation = _IotMqtt_DecrementOperationReferences( pOperation, false ); + IotMqtt_Assert( destroyOperation == true ); + _IotMqtt_DestroyOperation( pOperation ); } /*-----------------------------------------------------------*/ @@ -1289,17 +1292,18 @@ void _IotMqtt_Notify( _mqttOperation_t * pOperation ) } else { + /* Only waitable operations will have a reference count greater than 1. + * Non-waitable operations will not reach this block. */ + IotMqtt_Assert( waitable == true ); + /* Post to a waitable operation's semaphore. */ - if( waitable == true ) - { - IotLogDebug( "(MQTT connection %p, %s operation %p) Waitable operation " - "notified of completion.", - pOperation->pMqttConnection, - IotMqtt_OperationType( pOperation->u.operation.type ), - pOperation ); + IotLogDebug( "(MQTT connection %p, %s operation %p) Waitable operation " + "notified of completion.", + pOperation->pMqttConnection, + IotMqtt_OperationType( pOperation->u.operation.type ), + pOperation ); - IotSemaphore_Post( &( pOperation->u.operation.notify.waitSemaphore ) ); - } + IotSemaphore_Post( &( pOperation->u.operation.notify.waitSemaphore ) ); } } else diff --git a/libraries/standard/mqtt/test/unit/iot_tests_mqtt_api.c b/libraries/standard/mqtt/test/unit/iot_tests_mqtt_api.c index 9a0ff50981..106ea0aef7 100644 --- a/libraries/standard/mqtt/test/unit/iot_tests_mqtt_api.c +++ b/libraries/standard/mqtt/test/unit/iot_tests_mqtt_api.c @@ -132,7 +132,12 @@ * @brief Length of an arbitrary packet for testing. A buffer will be allocated * for it, but its contents don't matter. */ -#define PACKET_LENGTH ( 1 ) +#define PACKET_LENGTH ( 32 ) + +/** + * @brief How many operations to use for the OperationFindMatch test. + */ +#define OPERATION_COUNT (2) /*-----------------------------------------------------------*/ @@ -558,8 +563,8 @@ static IotMqttError_t _getNextByte( IotNetworkConnection_t pNetworkInterface, /** * @brief A PINGREQ serializer that attempts to allocate memory (unlike the default). */ -IotMqttError_t _serializePingreq( uint8_t ** pPingreqPacket, - size_t * pPacketSize ) +static IotMqttError_t _serializePingreq( uint8_t ** pPingreqPacket, + size_t * pPacketSize ) { IotMqttError_t status = IOT_MQTT_SUCCESS; @@ -582,6 +587,18 @@ IotMqttError_t _serializePingreq( uint8_t ** pPingreqPacket, /*-----------------------------------------------------------*/ +/** + * @brief A completion callback that does nothing. + */ +static void _completionCallback( void * pContext, + IotMqttCallbackParam_t * pCallbackParam ) +{ + ( void ) pContext; + ( void ) pCallbackParam; +} + +/*-----------------------------------------------------------*/ + /** * @brief Test group for MQTT API tests. */ @@ -635,6 +652,8 @@ TEST_GROUP_RUNNER( MQTT_Unit_API ) RUN_TEST_CASE( MQTT_Unit_API, StringCoverage ); RUN_TEST_CASE( MQTT_Unit_API, OperationCreateDestroy ); RUN_TEST_CASE( MQTT_Unit_API, OperationWaitTimeout ); + RUN_TEST_CASE( MQTT_Unit_API, OperationFindMatch ); + RUN_TEST_CASE( MQTT_Unit_API, OperationLists ); RUN_TEST_CASE( MQTT_Unit_API, ConnectParameters ); RUN_TEST_CASE( MQTT_Unit_API, ConnectMallocFail ); RUN_TEST_CASE( MQTT_Unit_API, ConnectRestoreSessionMallocFail ); @@ -643,6 +662,7 @@ TEST_GROUP_RUNNER( MQTT_Unit_API ) RUN_TEST_CASE( MQTT_Unit_API, PublishQoS0Parameters ); RUN_TEST_CASE( MQTT_Unit_API, PublishQoS0MallocFail ); RUN_TEST_CASE( MQTT_Unit_API, PublishQoS1 ); + RUN_TEST_CASE( MQTT_Unit_API, PublishRetryPeriod ); RUN_TEST_CASE( MQTT_Unit_API, PublishDuplicates ); RUN_TEST_CASE( MQTT_Unit_API, SubscribeUnsubscribeParameters ); RUN_TEST_CASE( MQTT_Unit_API, SubscribeMallocFail ); @@ -946,6 +966,107 @@ TEST( MQTT_Unit_API, OperationWaitTimeout ) /*-----------------------------------------------------------*/ +/** + * @brief Test edge cases when searching for operations. + */ +TEST( MQTT_Unit_API, OperationFindMatch ) +{ + int32_t i = 0; + uint16_t packetIdentifier = 0; + IotMqttError_t status = IOT_MQTT_STATUS_PENDING; + _mqttOperation_t * pMatchedOperation = NULL; + _mqttOperation_t * pOperation[ OPERATION_COUNT ] = { NULL, NULL }; + + /* Create a new MQTT connection. */ + _pMqttConnection = IotTestMqtt_createMqttConnection( AWS_IOT_MQTT_SERVER, + &_networkInfo, + 0 ); + TEST_ASSERT_NOT_NULL( _pMqttConnection ); + + /* Set up operations. */ + for( i = 0; i < OPERATION_COUNT; i++ ) + { + status = _IotMqtt_CreateOperation( _pMqttConnection, 0, NULL, &( pOperation[ i ] ) ); + TEST_ASSERT_EQUAL( IOT_MQTT_SUCCESS, status ); + + TEST_ASSERT_EQUAL( IOT_TASKPOOL_SUCCESS, IotTaskPool_CreateJob( _IotMqtt_ProcessCompletedOperation, + pOperation[ i ], + &( pOperation[ i ]->jobStorage ), + &( pOperation[ i ]->job ) ) ); + + IotListDouble_Remove( &( pOperation[ i ]->link ) ); + IotListDouble_InsertHead( &( _pMqttConnection->pendingResponse ), &( pOperation[ i ]->link ) ); + + pOperation[ i ]->u.operation.packetIdentifier = ( uint16_t ) ( i + 1 ); + pOperation[ i ]->u.operation.periodic.retry.nextPeriodMs = DUP_CHECK_RETRY_MS; + pOperation[ i ]->u.operation.periodic.retry.limit = DUP_CHECK_RETRY_LIMIT; + } + + pOperation[ 0 ]->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER; + pOperation[ 1 ]->u.operation.type = IOT_MQTT_SUBSCRIBE; + + /* Set one operation's job to an invalid state, then try to find it. The invalid state + * will cause that job to be ignored. */ + packetIdentifier = 1; + pOperation[ 0 ]->jobStorage.status = IOT_TASKPOOL_STATUS_COMPLETED; + pMatchedOperation = _IotMqtt_FindOperation( _pMqttConnection, + IOT_MQTT_PUBLISH_TO_SERVER, + &packetIdentifier ); + TEST_ASSERT_NULL( pMatchedOperation ); + + /* Clean up operations. */ + for( i = 0; i < OPERATION_COUNT; i++ ) + { + TEST_ASSERT_EQUAL_INT( true, _IotMqtt_DecrementOperationReferences( pOperation[ i ], false ) ); + _IotMqtt_DestroyOperation( pOperation[ i ] ); + } + + /* Disconnect the MQTT connection. */ + IotMqtt_Disconnect( _pMqttConnection, IOT_MQTT_FLAG_CLEANUP_ONLY ); +} + +/*-----------------------------------------------------------*/ + +/** + * @brief Tests the behavior of send and notify with different link statuses. + */ +TEST( MQTT_Unit_API, OperationLists ) +{ + _mqttOperation_t * pOperation = NULL; + IotMqttCallbackInfo_t callbackInfo = IOT_MQTT_CALLBACK_INFO_INITIALIZER; + + /* Create a new MQTT connection. */ + _networkInterface.send = _sendSuccess; + _pMqttConnection = IotTestMqtt_createMqttConnection( AWS_IOT_MQTT_SERVER, + &_networkInfo, + 0 ); + TEST_ASSERT_NOT_NULL( _pMqttConnection ); + + /* Create a new MQTT operation. */ + callbackInfo.function = _completionCallback; + TEST_ASSERT_EQUAL( IOT_MQTT_SUCCESS, _IotMqtt_CreateOperation( _pMqttConnection, + 0, + &callbackInfo, + &pOperation ) ); + TEST_ASSERT_NOT_NULL( pOperation ); + pOperation->u.operation.pMqttPacket = IotMqtt_MallocMessage( PACKET_LENGTH ); + pOperation->u.operation.packetSize = PACKET_LENGTH; + + /* Process a send with operation unlinked. Check that operation gets linked afterwards. */ + IotListDouble_Remove( &( pOperation->link ) ); + _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation ); + TEST_ASSERT_EQUAL_INT( true, IotLink_IsLinked( &( pOperation->link ) ) ); + + /* Notify with the operation linked. */ + pOperation->u.operation.status = IOT_MQTT_SUCCESS; + _IotMqtt_Notify( pOperation ); + + /* Disconnect the MQTT connection. */ + IotMqtt_Disconnect( _pMqttConnection, IOT_MQTT_FLAG_CLEANUP_ONLY ); +} + +/*-----------------------------------------------------------*/ + /** * @brief Tests the behavior of @ref mqtt_function_connect with various * invalid parameters. @@ -1396,6 +1517,63 @@ TEST( MQTT_Unit_API, PublishQoS1 ) /*-----------------------------------------------------------*/ +/** + * @brief Tests that PUBLISH retry periods are calculated correctly. + */ +TEST( MQTT_Unit_API, PublishRetryPeriod ) +{ + _mqttOperation_t * pOperation = NULL; + uint32_t periodMs = IOT_MQTT_RETRY_MS_CEILING / 2; + + /* Create a new MQTT connection. */ + _networkInterface.send = _sendSuccess; + _pMqttConnection = IotTestMqtt_createMqttConnection( false, + &_networkInfo, + 0 ); + TEST_ASSERT_NOT_NULL( _pMqttConnection ); + + /* Create a PUBLISH with retry operation. */ + TEST_ASSERT_EQUAL( IOT_MQTT_SUCCESS, _IotMqtt_CreateOperation( _pMqttConnection, + IOT_MQTT_FLAG_WAITABLE, + NULL, + &pOperation ) ); + TEST_ASSERT_NOT_NULL( pOperation ); + pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER; + pOperation->u.operation.pMqttPacket = IotMqtt_MallocMessage( PACKET_LENGTH ); + pOperation->u.operation.packetSize = PACKET_LENGTH; + pOperation->u.operation.periodic.retry.limit = DUP_CHECK_RETRY_LIMIT; + pOperation->u.operation.periodic.retry.nextPeriodMs = periodMs; + IotListDouble_Remove( &( pOperation->link ) ); + + /* Simulate send of PUBLISH. */ + _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation ); + + /* Immediately cancel retried PUBLISH, then check statuses set by send. */ + TEST_ASSERT_EQUAL( IOT_TASKPOOL_SUCCESS, IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL, + pOperation->job, + NULL ) ); + TEST_ASSERT_EQUAL( IOT_MQTT_STATUS_PENDING, pOperation->u.operation.status ); + TEST_ASSERT_EQUAL( 1, pOperation->u.operation.periodic.retry.count ); + TEST_ASSERT_EQUAL( 2 * periodMs, pOperation->u.operation.periodic.retry.nextPeriodMs ); + + /* Simulate another send. Check that the retry ceiling is respected. */ + _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation ); + + /* Immediately cancel retried PUBLISH, then check statuses set by send. */ + TEST_ASSERT_EQUAL( IOT_TASKPOOL_SUCCESS, IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL, + pOperation->job, + NULL ) ); + TEST_ASSERT_EQUAL( IOT_MQTT_STATUS_PENDING, pOperation->u.operation.status ); + TEST_ASSERT_EQUAL( 2, pOperation->u.operation.periodic.retry.count ); + TEST_ASSERT_EQUAL( IOT_MQTT_RETRY_MS_CEILING, pOperation->u.operation.periodic.retry.nextPeriodMs ); + + /* Clean up. */ + TEST_ASSERT_EQUAL_INT( false, _IotMqtt_DecrementOperationReferences( pOperation, false ) ); + IotMqtt_Disconnect( _pMqttConnection, IOT_MQTT_FLAG_CLEANUP_ONLY ); +} + +/*-----------------------------------------------------------*/ + /** * @brief Tests that duplicate QoS 1 PUBLISH packets are different from the * original. diff --git a/libraries/standard/mqtt/test/unit/iot_tests_mqtt_platform.c b/libraries/standard/mqtt/test/unit/iot_tests_mqtt_platform.c index 63b31e4a9b..b656d7c650 100644 --- a/libraries/standard/mqtt/test/unit/iot_tests_mqtt_platform.c +++ b/libraries/standard/mqtt/test/unit/iot_tests_mqtt_platform.c @@ -31,6 +31,9 @@ /* Standard includes. */ #include +/* Platform threads include. */ +#include "platform/iot_threads.h" + /* SDK initialization include. */ #include "iot_init.h" @@ -74,6 +77,12 @@ ( ( void ( * )( void *, \ IotMqttCallbackParam_t * ) ) 0x1 ) +/** + * @brief Length of an arbitrary packet for testing. A buffer will be allocated + * for it, but its contents don't matter. + */ +#define PACKET_LENGTH ( 1 ) + /*-----------------------------------------------------------*/ /** @@ -180,8 +189,8 @@ static IotNetworkError_t _networkDestroy( IotNetworkConnection_t pConnection ) * @brief Serializer override for PUBACK that always fails. */ static IotMqttError_t _serializePuback( uint16_t packetIdentifier, - uint8_t ** pPubackPacket, - size_t * pPacketSize ) + uint8_t ** pPubackPacket, + size_t * pPacketSize ) { ( void ) packetIdentifier; ( void ) pPubackPacket; @@ -248,9 +257,12 @@ TEST_GROUP_RUNNER( MQTT_Unit_Platform ) RUN_TEST_CASE( MQTT_Unit_Platform, ConnectNetworkFailure ); RUN_TEST_CASE( MQTT_Unit_Platform, ConnectScheduleFailure ); RUN_TEST_CASE( MQTT_Unit_Platform, DisconnectNetworkFailure ); + RUN_TEST_CASE( MQTT_Unit_Platform, PingreqSendFailure ); RUN_TEST_CASE( MQTT_Unit_Platform, PublishScheduleFailure ); + RUN_TEST_CASE( MQTT_Unit_Platform, PublishRetryScheduleFailure ); RUN_TEST_CASE( MQTT_Unit_Platform, PubackScheduleSerializeFailure ); RUN_TEST_CASE( MQTT_Unit_Platform, SubscriptionScheduleFailure ); + RUN_TEST_CASE( MQTT_Unit_Platform, NotifyScheduleFailure ); } /*-----------------------------------------------------------*/ @@ -346,6 +358,22 @@ TEST( MQTT_Unit_Platform, DisconnectNetworkFailure ) /*-----------------------------------------------------------*/ +/** + * @brief Tests the behavior when a PINGREQ cannot be sent. + */ +TEST( MQTT_Unit_Platform, PingreqSendFailure ) +{ + _mqttConnection_t * pMqttConnection = NULL; + + pMqttConnection = IotTestMqtt_createMqttConnection( false, &_networkInfo, 100 ); + TEST_ASSERT_NOT_NULL( pMqttConnection ); + + _sendStatus = IOT_NETWORK_FAILURE; + _IotMqtt_ProcessKeepAlive( IOT_SYSTEM_TASKPOOL, pMqttConnection->pingreq.job, pMqttConnection ); +} + +/*-----------------------------------------------------------*/ + /** * @brief Tests the behavior of @ref mqtt_function_publishasync when scheduling fails. */ @@ -390,9 +418,57 @@ TEST( MQTT_Unit_Platform, PublishScheduleFailure ) /*-----------------------------------------------------------*/ +/** + * @brief Tests the behavior when a client-to-server PUBLISH retry fails to + * schedule. + */ +TEST( MQTT_Unit_Platform, PublishRetryScheduleFailure ) +{ + IotMqttConnection_t pMqttConnection = IOT_MQTT_CONNECTION_INITIALIZER; + _mqttOperation_t * pOperation = NULL; + IotTaskPool_t taskPool = IOT_SYSTEM_TASKPOOL; + uint32_t maxThreads = 0; + + /* Create a new MQTT connection. */ + pMqttConnection = IotTestMqtt_createMqttConnection( false, &_networkInfo, 0 ); + TEST_ASSERT_NOT_NULL( pMqttConnection ); + + /* Create a new PUBLISH with retry operation. */ + TEST_ASSERT_EQUAL( IOT_MQTT_SUCCESS, _IotMqtt_CreateOperation( pMqttConnection, + IOT_MQTT_FLAG_WAITABLE, + NULL, + &pOperation ) ); + TEST_ASSERT_NOT_NULL( pOperation ); + pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER; + pOperation->u.operation.periodic.retry.limit = 3; + pOperation->u.operation.periodic.retry.nextPeriodMs = TIMEOUT_MS; + pOperation->u.operation.pMqttPacket = IotMqtt_MallocMessage( PACKET_LENGTH ); + pOperation->u.operation.packetSize = PACKET_LENGTH; + + /* Set the task pool to an invalid state and cause all further scheduling to fail. */ + maxThreads = taskPool->maxThreads; + taskPool->maxThreads = 0; + + /* Send the MQTT PUBLISH. Retry will fail to schedule. */ + _IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation ); + TEST_ASSERT_EQUAL( IOT_MQTT_SCHEDULING_ERROR, pOperation->u.operation.status ); + TEST_ASSERT_EQUAL_UINT32( 1, IotSemaphore_GetCount( &( pOperation->u.operation.notify.waitSemaphore ) ) ); + TEST_ASSERT_EQUAL_UINT32( 1, pOperation->u.operation.periodic.retry.count ); + + /* Restore the task pool to a valid state. */ + taskPool->maxThreads = maxThreads; + + /* Clean up. */ + TEST_ASSERT_EQUAL_INT( true, _IotMqtt_DecrementOperationReferences( pOperation, false ) ); + _IotMqtt_DestroyOperation( pOperation ); + IotMqtt_Disconnect( pMqttConnection, IOT_MQTT_FLAG_CLEANUP_ONLY ); +} + +/*-----------------------------------------------------------*/ + /** * @brief Tests the behavior of the client-to-server PUBACK when scheduling and - * serializing fail. + * serializing fail. */ TEST( MQTT_Unit_Platform, PubackScheduleSerializeFailure ) { @@ -470,3 +546,37 @@ TEST( MQTT_Unit_Platform, SubscriptionScheduleFailure ) } /*-----------------------------------------------------------*/ + +/** + * @brief Tests the behavior of #_IotMqtt_Notify when scheduling fails. + */ +TEST( MQTT_Unit_Platform, NotifyScheduleFailure ) +{ + IotMqttConnection_t pMqttConnection = IOT_MQTT_CONNECTION_INITIALIZER; + _mqttOperation_t * pOperation = NULL; + IotTaskPool_t taskPool = IOT_SYSTEM_TASKPOOL; + uint32_t maxThreads = 0; + + /* Create a new MQTT connection. */ + pMqttConnection = IotTestMqtt_createMqttConnection( false, &_networkInfo, 0 ); + TEST_ASSERT_NOT_NULL( pMqttConnection ); + + /* Create a new MQTT operation. */ + TEST_ASSERT_EQUAL( IOT_MQTT_SUCCESS, _IotMqtt_CreateOperation( pMqttConnection, 0, NULL, &pOperation ) ); + TEST_ASSERT_NOT_NULL( pOperation ); + pOperation->u.operation.notify.callback.function = SUBSCRIPTION_CALLBACK_FUNCTION; + + /* Set the task pool to an invalid state and cause all further scheduling to fail. */ + maxThreads = taskPool->maxThreads; + taskPool->maxThreads = 0; + + _IotMqtt_Notify( pOperation ); + + /* Restore the task pool to a valid state. */ + taskPool->maxThreads = maxThreads; + + /* Clean up. */ + IotMqtt_Disconnect( pMqttConnection, IOT_MQTT_FLAG_CLEANUP_ONLY ); +} + +/*-----------------------------------------------------------*/