Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix conn_param leaking issue which discovered by wangha #553

Merged
merged 8 commits into from
May 6, 2023
29 changes: 18 additions & 11 deletions src/mqtt/protocol/mqtt/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ struct mqtt_sock_s {
#endif
};


/******************************************************************************
* Sock Implementation *
******************************************************************************/
Expand Down Expand Up @@ -315,11 +316,12 @@ mqtt_pipe_fini(void *arg)
nni_lmq_fini(&p->send_messages);
}

static inline void
static inline int
mqtt_pipe_recv_msgq_putq(mqtt_pipe_t *p, nni_msg *msg)
{
if (0 != nni_lmq_put(&p->recv_messages, msg)) {
// resize to ensure we do not lost messages or just lose it?
int rv = nni_lmq_put(&p->recv_messages, msg);
if (rv != 0) {
// resize to ensure we do not lost messages or just let it go?
// add option to drop messages
// if (0 !=
// nni_lmq_resize(&p->recv_messages,
Expand All @@ -331,6 +333,7 @@ mqtt_pipe_recv_msgq_putq(mqtt_pipe_t *p, nni_msg *msg)
// nni_lmq_put(&p->recv_messages, msg);
nni_msg_free(msg);
}
return rv;
}

// Should be called with mutex lock hold. and it will unlock mtx.
Expand Down Expand Up @@ -513,8 +516,8 @@ mqtt_pipe_close(void *arg)
mqtt_sock_get_sqlite_option(s), &p->send_messages);
}
#endif

nni_lmq_flush(&p->recv_messages);
// particular for NanoSDK in bridging
nni_lmq_flush_cp(&p->recv_messages, true);
nni_lmq_flush(&p->send_messages);

nni_id_map_foreach(&p->sent_unack, mqtt_close_unack_aio_cb);
Expand Down Expand Up @@ -550,9 +553,10 @@ mqtt_pipe_close(void *arg)
count++;
}
if (count == 0) {
nni_println("disconnect msg of bridging is lost due to no ctx "
log_warn("disconnect msg of bridging is lost due to no ctx "
"on receving");
nni_msg_free(tmsg);
conn_param_free(s->cparam);
}
#endif
nni_mtx_unlock(&s->mtx);
Expand Down Expand Up @@ -730,7 +734,8 @@ mqtt_recv_cb(void *arg)
if ((ctx = nni_list_first(&s->recv_queue)) == NULL) {
// No one waiting to receive yet, putting msg
// into lmq
mqtt_pipe_recv_msgq_putq(p, msg);
if (mqtt_pipe_recv_msgq_putq(p, msg) != 0)
conn_param_free(s->cparam);
nni_mtx_unlock(&s->mtx);
log_warn("Warning: no ctx found!! create more "
"ctxs!");
Expand Down Expand Up @@ -795,9 +800,10 @@ mqtt_recv_cb(void *arg)
if ((ctx = nni_list_first(&s->recv_queue)) == NULL) {
// No one waiting to receive yet, putting msg
// into lmq
mqtt_pipe_recv_msgq_putq(p, cached_msg);
if (mqtt_pipe_recv_msgq_putq(p, cached_msg) != 0)
conn_param_free(s->cparam);
nni_mtx_unlock(&s->mtx);
// nni_println("ERROR: no ctx found!! create more ctxs!");
log_warn("ERROR: no ctx found! msg queue full! QoS2 msg lost!");
return;
}
nni_list_remove(&s->recv_queue, ctx);
Expand All @@ -820,7 +826,8 @@ mqtt_recv_cb(void *arg)
if ((ctx = nni_list_first(&s->recv_queue)) == NULL) {
// No one waiting to receive yet, putting msg
// into lmq
mqtt_pipe_recv_msgq_putq(p, msg);
if (mqtt_pipe_recv_msgq_putq(p, msg) != 0)
conn_param_free(s->cparam);
nni_mtx_unlock(&s->mtx);
// nni_println("ERROR: no ctx found!! create more ctxs!");
return;
Expand All @@ -833,7 +840,6 @@ mqtt_recv_cb(void *arg)
nni_aio_finish(user_aio, 0, 0);
return;
} else {
//TODO check if this packetid already there
packet_id = nni_mqtt_msg_get_publish_packet_id(msg);
if ((cached_msg = nni_id_get(
&p->recv_unack, packet_id)) != NULL) {
Expand All @@ -843,6 +849,7 @@ mqtt_recv_cb(void *arg)
log_error(
"packet id %d duplicates in", packet_id);
nni_msg_free(cached_msg);
conn_param_free(s->cparam);
// nni_id_remove(&pipe->nano_qos_db,
// pid);
}
Expand Down
44 changes: 19 additions & 25 deletions src/mqtt/protocol/mqtt/mqttv5_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "supplemental/mqtt/mqtt_qos_db_api.h"
#include "nng/protocol/mqtt/mqtt_parser.h"
#include "nng/mqtt/mqtt_client.h"
#include "nng/protocol/mqtt/mqtt_parser.h"

// MQTT client implementation.
//
Expand Down Expand Up @@ -314,24 +313,6 @@ mqtt_pipe_fini(void *arg)
nni_lmq_fini(&p->send_messages);
}

static inline void
mqtt_pipe_recv_msgq_putq(mqtt_pipe_t *p, nni_msg *msg)
{
if (0 != nni_lmq_put(&p->recv_messages, msg)) {
// resize to ensure we do not lost messages or just lose it?
// add option to drop messages
// if (0 !=
// nni_lmq_resize(&p->recv_messages,
// nni_lmq_len(&p->recv_messages) * 2)) {
// // drop the message when no memory available
// nni_msg_free(msg);
// return;
// }
// nni_lmq_put(&p->recv_messages, msg);
nni_msg_free(msg);
}
}

// Should be called with mutex lock hold. and it will unlock mtx.
// flag indicates if need to skip msg in sqlite 1: check sqlite 0: only aio
static inline void
Expand Down Expand Up @@ -504,7 +485,8 @@ mqtt_pipe_close(void *arg)
}
#endif

nni_lmq_flush(&p->recv_messages);
// particular for NanoSDK in bridging
nni_lmq_flush_cp(&p->recv_messages, true);
nni_lmq_flush(&p->send_messages);

nni_id_map_foreach(&p->sent_unack, mqtt_close_unack_aio_cb);
Expand Down Expand Up @@ -540,6 +522,7 @@ mqtt_pipe_close(void *arg)
if (count == 0) {
log_info("disconnect msg of bridging is lost due to no ctx on receving");
nni_msg_free(tmsg);
conn_param_free(s->cparam);
}
#endif
nni_mtx_unlock(&s->mtx);
Expand Down Expand Up @@ -745,7 +728,10 @@ mqtt_recv_cb(void *arg)
if ((ctx = nni_list_first(&s->recv_queue)) == NULL) {
// No one waiting to receive yet, putting msg
// into lmq
mqtt_pipe_recv_msgq_putq(p, msg);
if (0 != nni_lmq_put(&p->recv_messages, msg)) {
nni_msg_free(msg);
conn_param_free(s->cparam);
}
nni_mtx_unlock(&s->mtx);
log_warn("Warning: no ctx found!! create more "
"ctxs!");
Expand Down Expand Up @@ -810,9 +796,13 @@ mqtt_recv_cb(void *arg)
if ((ctx = nni_list_first(&s->recv_queue)) == NULL) {
// No one waiting to receive yet, putting msg
// into lmq
mqtt_pipe_recv_msgq_putq(p, cached_msg);
if (0 != nni_lmq_put(&p->recv_messages, cached_msg)) {
nni_msg_free(cached_msg);
conn_param_free(s->cparam);
}
nni_mtx_unlock(&s->mtx);
// nni_println("ERROR: no ctx found!! create more ctxs!");
// nni_println("ERROR: no ctx found!! create more
// ctxs!");
return;
}
nni_list_remove(&s->recv_queue, ctx);
Expand All @@ -834,9 +824,13 @@ mqtt_recv_cb(void *arg)
if ((ctx = nni_list_first(&s->recv_queue)) == NULL) {
// No one waiting to receive yet, putting msg
// into lmq
mqtt_pipe_recv_msgq_putq(p, msg);
if (0 != nni_lmq_put(&p->recv_messages, msg)) {
nni_msg_free(msg);
conn_param_free(s->cparam);
}
nni_mtx_unlock(&s->mtx);
// nni_println("ERROR: no ctx found!! create more ctxs!");
// nni_println("ERROR: no ctx found!! create
// more ctxs!");
return;
}
nni_list_remove(&s->recv_queue, ctx);
Expand Down
26 changes: 18 additions & 8 deletions src/sp/protocol/mqtt/nmq_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct nano_sock {
nni_atomic_int ttl;
nni_id_map pipes;
nni_id_map cached_sessions;
nni_lmq waitlmq;
nni_lmq waitlmq; // this is for receving
nni_list recvpipes; // list of pipes with data to receive
nni_list recvq;
nano_ctx ctx; // base socket
Expand Down Expand Up @@ -86,7 +86,7 @@ struct nano_pipe {
// been triggered
uint16_t ka_refresh;
conn_param *conn_param;
nni_lmq rlmq;
nni_lmq rlmq; // only for sending cache
void *nano_qos_db; // 'sqlite' or 'nni_id_hash_map'
};

Expand All @@ -99,18 +99,23 @@ nmq_close_unack_msg_cb(void *key, void *val)
nni_msg_free(msg);
}

// Flush lmq and conn_param
void
nano_nni_lmq_flush(nni_lmq *lmq)
nano_nni_lmq_flush(nni_lmq *lmq, bool cp)
{
while (lmq->lmq_len > 0) {
nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++];
lmq->lmq_get &= lmq->lmq_mask;
lmq->lmq_len--;
if (cp)
conn_param_free(nni_msg_get_conn_param(msg));
nni_msg_free(msg);
}
}

int

// only use for sending lmq
static int
nano_nni_lmq_resize(nni_lmq *lmq, size_t cap)
{
nng_msg *msg;
Expand All @@ -134,7 +139,7 @@ nano_nni_lmq_resize(nni_lmq *lmq, size_t cap)
}

// Flush anything left over.
nano_nni_lmq_flush(lmq);
nano_nni_lmq_flush(lmq, false);

nni_free(lmq->lmq_msgs, lmq->lmq_alloc * sizeof(nng_msg *));
lmq->lmq_msgs = newq;
Expand Down Expand Up @@ -487,6 +492,8 @@ nano_sock_fini(void *arg)
#endif
nni_id_map_fini(&s->pipes);
nni_id_map_fini(&s->cached_sessions);
// flush msg and conn params in waitlmq
nano_nni_lmq_flush(&s->waitlmq, true);
nni_lmq_fini(&s->waitlmq);
nano_ctx_fini(&s->ctx);
nni_pollable_fini(&s->writable);
Expand Down Expand Up @@ -764,7 +771,7 @@ close_pipe(nano_pipe *p)
if (nni_list_active(&s->recvpipes, p)) {
nni_list_remove(&s->recvpipes, p);
}
nano_nni_lmq_flush(&p->rlmq);
nano_nni_lmq_flush(&p->rlmq, false);
nni_mtx_unlock(&p->lk);
nni_id_remove(&s->pipes, nni_pipe_id(p->pipe));
}
Expand Down Expand Up @@ -806,7 +813,7 @@ nano_pipe_close(void *arg)
if (nni_list_active(&s->recvpipes, p)) {
nni_list_remove(&s->recvpipes, p);
}
nano_nni_lmq_flush(&p->rlmq);
nano_nni_lmq_flush(&p->rlmq, false);
nni_mtx_unlock(&s->lk);
nni_mtx_unlock(&p->lk);
return;
Expand Down Expand Up @@ -840,10 +847,13 @@ nano_pipe_close(void *arg)
return;
} else {
// no enough ctx, so cache to waitlmq
// free conn param when discard waitlmq
if (nni_lmq_full(&s->waitlmq)) {
if (nni_lmq_resize(&s->waitlmq,
nni_lmq_cap(&s->waitlmq) * 2) != 0) {
log_error("wait lmq resize failed.");
conn_param_free(p->conn_param);
nni_msg_free(msg);
}
}
nni_lmq_put(&s->waitlmq, msg);
Expand Down Expand Up @@ -1109,8 +1119,8 @@ nano_pipe_recv_cb(void *arg)
nni_list_append(&s->recvpipes, p);
nni_pollable_raise(&s->readable);
nni_mtx_unlock(&s->lk);
// this gonna cause broker lagging
log_warn("no ctx found!! create more ctxs!");
// nni_println("ERROR: no ctx found!! create more ctxs!");
return;
}

Expand Down
16 changes: 16 additions & 0 deletions src/supplemental/mqtt/mqtt_msg.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "nng/protocol/mqtt/mqtt_parser.h"
#include "mqtt_msg.h"
#include <stdlib.h>
#include <string.h>
Expand Down Expand Up @@ -1012,3 +1013,18 @@ nni_mqtt_msg_get_connect_property(nni_msg *msg)
nni_mqtt_proto_data *proto_data = nni_msg_get_proto_data(msg);
return proto_data->var_header.connect.properties;
}

// peculiar API for SDK to free Conn_param
void
nni_lmq_flush_cp(nni_lmq *lmq, bool cp)
{
while (lmq->lmq_len > 0) {
nng_msg *msg = lmq->lmq_msgs[lmq->lmq_get++];
lmq->lmq_get &= lmq->lmq_mask;
lmq->lmq_len--;
uint8_t packet_type = nni_msg_get_type(msg);
if (cp && (packet_type == CMD_PUBLISH || packet_type == CMD_CONNACK))
conn_param_free(nni_msg_get_conn_param(msg));
JaylinYu marked this conversation as resolved.
Show resolved Hide resolved
nni_msg_free(msg);
}
}
1 change: 1 addition & 0 deletions src/supplemental/mqtt/mqtt_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ NNG_DECL property_type_enum property_get_value_type(uint8_t prop_id);
NNG_DECL property_data * property_get_value(property *prop, uint8_t prop_id);
NNG_DECL void property_append(property *prop_list, property *last);
NNG_DECL int property_value_copy(property *dest,const property *src);
NNG_DECL void nni_lmq_flush_cp(nni_lmq *lmq, bool cp);

/* introduced from mqtt_parser, might be duplicated */
NNG_DECL int nni_mqtt_pubres_decode(nng_msg *msg, uint16_t *packet_id,
Expand Down
12 changes: 5 additions & 7 deletions tests/trantest.h
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,8 @@ trantest_mqtt_sub_pub(trantest *tt)
nng_mqtt_client_free(client, true);
nng_close(tt->repsock);
nng_close(tt->reqsock);
conn_param_free(cp1);
conn_param_free(cp2);
// conn_param_free(cp1);
// conn_param_free(cp2);
});
}

Expand Down Expand Up @@ -718,8 +718,8 @@ trantest_mqttv5_sub_pub(trantest *tt)
nng_mqtt_client_free(client, true);
nng_close(tt->repsock);
nng_close(tt->reqsock);
conn_param_free(cp1);
conn_param_free(cp2);
// conn_param_free(cp1);
// conn_param_free(cp2);
});
}

Expand Down Expand Up @@ -1136,8 +1136,7 @@ trantest_mqtt_broker_send_recv(trantest *tt)
// send CONNACK back to the client.
nng_aio_set_msg(work->aio, rmsg);
nng_ctx_send(work->ctx, work->aio);
// cp is cloned in protocol and app layer, so we free it twice.
conn_param_free(cp);
// cp is cloned in protocol layer, so we free it here
conn_param_free(cp);

// client recv CONNACK msg.
Expand Down Expand Up @@ -1194,7 +1193,6 @@ trantest_mqtt_broker_send_recv(trantest *tt)
// here to aviod heap-use-after-free.
nng_close(tt->repsock);
conn_param_free(rcp);
conn_param_free(rcp);
nng_msg_free(msg);
// for previously pub msg
conn_param_free(cp);
Expand Down
Loading