Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for disconnect callback #69

Merged
merged 2 commits into from
May 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,19 @@ fi
AM_CONDITIONAL([BUILD_STDINCAP], [test "x$ENABLED_STDINCAP" = "xyes"])


# Disconnect callback
AC_ARG_ENABLE([discb],
[AS_HELP_STRING([--enable-discb],[Enable disconnect callback (default: enabled)])],
[ ENABLED_DISCB=$enableval ],
[ ENABLED_DISCB=yes ]
)

if test "x$ENABLED_DISCB" = "xyes"
then
AM_CPPFLAGS="$AM_CPPFLAGS -DWOLFMQTT_DISCONNECT_CB"
fi



# HARDEN FLAGS
AX_HARDEN_CC_COMPILER_FLAGS
Expand Down Expand Up @@ -296,4 +309,5 @@ echo " * Non-Blocking: $ENABLED_NONBLOCK"
echo " * Examples: $ENABLED_EXAMPLES"
echo " * STDIN Capture: $ENABLED_STDINCAP"
echo " * Error Strings: $ENABLED_ERROR_STRINGS"
echo " * Disconnect Callback $ENABLED_DISCB"

19 changes: 19 additions & 0 deletions examples/mqttclient/mqttclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ static int mStopRead = 0;
#define TEST_MESSAGE "test"


#ifdef WOLFMQTT_DISCONNECT_CB
static int mqtt_disconnect_cb(MqttClient* client, int error_code, void* ctx)
{
(void)client;
(void)ctx;
PRINTF("Disconnect (error %d)", error_code);
return 0;
}
#endif

static int mqtt_message_cb(MqttClient *client, MqttMessage *msg,
byte msg_new, byte msg_done)
{
Expand Down Expand Up @@ -141,6 +151,15 @@ int mqttclient_test(MQTTCtx *mqttCtx)
}
mqttCtx->client.ctx = mqttCtx;

#ifdef WOLFMQTT_DISCONNECT_CB
/* setup disconnect callback */
rc = MqttClient_SetDisconnectCallback(&mqttCtx->client,
mqtt_disconnect_cb, NULL);
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
#endif

FALL_THROUGH;
}

Expand Down
14 changes: 14 additions & 0 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,20 @@ int MqttClient_Init(MqttClient *client, MqttNet* net,
return rc;
}

#ifdef WOLFMQTT_DISCONNECT_CB
int MqttClient_SetDisconnectCallback(MqttClient *client, MqttDisconnectCb cb,
void* ctx)
{
if (client == NULL)
return MQTT_CODE_ERROR_BAD_ARG;

client->disconnect_cb = cb;
client->disconnect_ctx = ctx;

return MQTT_CODE_SUCCESS;
}
#endif

int MqttClient_Connect(MqttClient *client, MqttConnect *connect)
{
int rc, len;
Expand Down
30 changes: 23 additions & 7 deletions src/mqtt_packet.c
Original file line number Diff line number Diff line change
Expand Up @@ -694,12 +694,28 @@ int MqttEncode_Disconnect(byte *tx_buf, int tx_buf_len)
return header_len;
}

static int MqttPacket_HandleNetError(MqttClient *client, int rc)
{
(void)client;
#ifdef WOLFMQTT_DISCONNECT_CB
if (rc < 0 &&
rc != MQTT_CODE_CONTINUE &&
rc != MQTT_CODE_STDIN_WAKE)
{
/* don't use return code for now - future use */
if (client->disconnect_cb)
client->disconnect_cb(client, rc, client->disconnect_ctx);
}
#endif
return rc;
}

int MqttPacket_Write(MqttClient *client, byte* tx_buf, int tx_buf_len)
{
int rc;
rc = MqttSocket_Write(client, tx_buf, tx_buf_len, client->cmd_timeout_ms);
return rc;

return MqttPacket_HandleNetError(client, rc);
}

/* Read return code is length when > 0 */
Expand All @@ -719,10 +735,10 @@ int MqttPacket_Read(MqttClient *client, byte* rx_buf, int rx_buf_len,
/* Read fix header portion */
rc = MqttSocket_Read(client, rx_buf, client->packet.header_len, timeout_ms);
if (rc < 0) {
return rc;
return MqttPacket_HandleNetError(client, rc);
}
else if (rc != client->packet.header_len) {
return MQTT_CODE_ERROR_NETWORK;
return MqttPacket_HandleNetError(client, MQTT_CODE_ERROR_NETWORK);
}

FALL_THROUGH;
Expand All @@ -736,7 +752,7 @@ int MqttPacket_Read(MqttClient *client, byte* rx_buf, int rx_buf_len,
/* Try and decode remaining length */
rc = MqttDecode_RemainLen(header, client->packet.header_len, &client->packet.remain_len);
if (rc < 0) { /* Indicates error */
return rc;
return MqttPacket_HandleNetError(client, rc);
}
/* Indicates decode success and rc is len of header */
else if (rc > 0) {
Expand All @@ -748,10 +764,10 @@ int MqttPacket_Read(MqttClient *client, byte* rx_buf, int rx_buf_len,
len = 1;
rc = MqttSocket_Read(client, &rx_buf[client->packet.header_len], len, timeout_ms);
if (rc < 0) {
return rc;
return MqttPacket_HandleNetError(client, rc);
}
else if (rc != len) {
return MQTT_CODE_ERROR_NETWORK;
return MqttPacket_HandleNetError(client, MQTT_CODE_ERROR_NETWORK);
}
client->packet.header_len += len;

Expand All @@ -774,7 +790,7 @@ int MqttPacket_Read(MqttClient *client, byte* rx_buf, int rx_buf_len,
rc = MqttSocket_Read(client, &rx_buf[client->packet.header_len],
client->packet.remain_len, timeout_ms);
if (rc <= 0) {
return rc;
return MqttPacket_HandleNetError(client, rc);
}
remain_read = rc;
}
Expand Down
33 changes: 29 additions & 4 deletions wolfmqtt/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ typedef struct _MqttSk {
int len;
} MqttSk;

#ifdef WOLFMQTT_DISCONNECT_CB
typedef int (*MqttDisconnectCb)(struct _MqttClient* client, int error_code, void* ctx);
#endif

/* Client structure */
typedef struct _MqttClient {
word32 flags; /* MqttClientFlags */
Expand All @@ -110,10 +114,16 @@ typedef struct _MqttClient {
* Used for MqttClient_Ping and MqttClient_WaitType */

void* ctx; /* user supplied context for publish callbacks */

#ifdef WOLFMQTT_DISCONNECT_CB
MqttDisconnectCb disconnect_cb;
void *disconnect_ctx;
#endif
} MqttClient;


/* Application Interfaces */

/*! \brief Initializes the MqttClient structure
* \param client Pointer to MqttClient structure
(uninitialized is okay)
Expand All @@ -137,9 +147,24 @@ WOLFMQTT_API int MqttClient_Init(
byte *rx_buf, int rx_buf_len,
int cmd_timeout_ms);

#ifdef WOLFMQTT_DISCONNECT_CB
/*! \brief Sets a disconnect callback with custom context
* \param client Pointer to MqttClient structure
(uninitialized is okay)
* \param disCb Pointer to disconnect callback function
* \param ctx Pointer to your own context
* \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_BAD_ARG
(see enum MqttPacketResponseCodes)
*/
WOLFMQTT_API int MqttClient_SetDisconnectCallback(
MqttClient *client,
MqttDisconnectCb cb,
void* ctx);
#endif


/*! \brief Encodes and sends the MQTT Connect packet and waits for the
Connect Acknowledgement packet
Connect Acknowledgment packet
* \discussion This is a blocking function that will wait for MqttNet.read
* \param client Pointer to MqttClient structure
* \param connect Pointer to MqttConnect structure initialized
Expand Down Expand Up @@ -169,7 +194,7 @@ WOLFMQTT_API int MqttClient_Publish(
MqttPublish *publish);

/*! \brief Encodes and sends the MQTT Subscribe packet and waits for the
Subscribe Acknowledgement packet
Subscribe Acknowledgment packet
* \discussion This is a blocking function that will wait for MqttNet.read
* \param client Pointer to MqttClient structure
* \param subscribe Pointer to MqttSubscribe structure initialized with
Expand All @@ -182,7 +207,7 @@ WOLFMQTT_API int MqttClient_Subscribe(
MqttSubscribe *subscribe);

/*! \brief Encodes and sends the MQTT Unsubscribe packet and waits for the
Unsubscribe Acknowledgement packet
Unsubscribe Acknowledgment packet
* \discussion This is a blocking function that will wait for MqttNet.read
* \param client Pointer to MqttClient structure
* \param unsubscribe Pointer to MqttUnsubscribe structure initialized
Expand Down Expand Up @@ -215,7 +240,7 @@ WOLFMQTT_API int MqttClient_Disconnect(
MqttClient *client);


/*! \brief Waits for packets to arrive. Incomming publish messages
/*! \brief Waits for packets to arrive. Incoming publish messages
will arrive via callback provided in MqttClient_Init.
* \discussion This is a blocking function that will wait for MqttNet.read
* \param client Pointer to MqttClient structure
Expand Down