Skip to content

Commit

Permalink
* FIX [protocol/mqtt] fix nanomq/nanomq#1762
Browse files Browse the repository at this point in the history
Signed-off-by: jaylin <jaylin@emqx.io>
  • Loading branch information
JaylinYu committed Apr 22, 2024
1 parent a388f7c commit 525560c
Showing 1 changed file with 24 additions and 23 deletions.
47 changes: 24 additions & 23 deletions src/mqtt/protocol/mqtt/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ struct mqtt_pipe_s {
bool busy;
uint8_t pingcnt;
nni_msg *pingmsg;
#ifdef NNG_HAVE_MQTT_BROKER
conn_param *cparam;
#endif
};

// A mqtt_sock_s is our per-socket protocol private structure.
Expand All @@ -103,10 +106,6 @@ struct mqtt_sock_s {
property *dis_prop; // disconnect property

nni_mqtt_sqlite_option *sqlite_opt;

#ifdef NNG_HAVE_MQTT_BROKER
conn_param *cparam;
#endif
};

/******************************************************************************
Expand Down Expand Up @@ -135,9 +134,7 @@ mqtt_sock_init(void *arg, nni_sock *sock)
s->retry = NNI_SECOND * 5;
s->keepalive = NNI_SECOND * 10; // default mqtt keepalive
s->timeleft = NNI_SECOND * 10;
#ifdef NNG_HAVE_MQTT_BROKER
s->cparam = NULL;
#endif


nni_mtx_init(&s->mtx);
mqtt_ctx_init(&s->master, s);
Expand Down Expand Up @@ -319,6 +316,10 @@ mqtt_pipe_init(void *arg, nni_pipe *pipe, void *s)
nni_lmq_init(&p->recv_messages, NNG_MAX_RECV_LMQ);
nni_lmq_init(&p->send_messages, NNG_MAX_SEND_LMQ);

#ifdef NNG_HAVE_MQTT_BROKER
p->cparam = NULL;
#endif

return (0);
}

Expand Down Expand Up @@ -547,19 +548,19 @@ mqtt_pipe_close(void *arg)
#ifdef NNG_HAVE_MQTT_BROKER
nni_aio *user_aio;

if (s->cparam == NULL) {
if (p->cparam == NULL) {
nni_mtx_unlock(&s->mtx);
return 0;
}

// Return disconnect event to broker, only when compiled with nanomq
uint16_t count = 0;
mqtt_ctx_t *ctx;
nni_msg *tmsg = nano_msg_notify_disconnect(s->cparam, SERVER_SHUTTING_DOWN);
nni_msg *tmsg = nano_msg_notify_disconnect(p->cparam, SERVER_SHUTTING_DOWN);
nni_msg_set_cmd_type(tmsg, CMD_DISCONNECT_EV);
// clone once for DISCONNECT_EV state
conn_param_clone(s->cparam);
nni_msg_set_conn_param(tmsg, s->cparam);
conn_param_clone(p->cparam);
nni_msg_set_conn_param(tmsg, p->cparam);
// return error to all receving aio
// emulate disconnect notify msg as a normal publish
while ((ctx = nni_list_first(&s->recv_queue)) != NULL) {
Expand All @@ -577,7 +578,7 @@ mqtt_pipe_close(void *arg)
if (count == 0) {
log_warn("disconnect msg of bridging is lost due to no ctx on receving");
nni_msg_free(tmsg);
conn_param_free(s->cparam);
conn_param_free(p->cparam);
}
// particular for NanoSDK in bridging
nni_lmq_flush_cp(&p->recv_messages, true);
Expand Down Expand Up @@ -845,18 +846,18 @@ mqtt_recv_cb(void *arg)
// return CONNACK to APP when working with broker
#ifdef NNG_HAVE_MQTT_BROKER
nng_msg_set_cmd_type(msg, CMD_CONNACK);
s->cparam = nni_msg_get_conn_param(msg);
p->cparam = nni_msg_get_conn_param(msg);
// add connack msg to app layer only for notify in broker
// bridge
if (s->cparam != NULL) {
if (p->cparam != NULL) {
// Get IPv4 ADDR of client
nng_sockaddr addr;
uint8_t *arr;
nng_pipe nng_pipe;
nng_pipe.id = nni_pipe_id(p->pipe);

// Set keepalive
s->keepalive = conn_param_get_keepalive(s->cparam) * 1000;
s->keepalive = conn_param_get_keepalive(p->cparam) * 1000;
s->timeleft = s->keepalive;

rv = nng_pipe_get_addr(
Expand All @@ -866,11 +867,11 @@ mqtt_recv_cb(void *arg)
if (arr == NULL) {
log_warn("Fail to get IP addr from client pipe!");
} else {
sprintf(s->cparam->ip_addr_v4,
sprintf(p->cparam->ip_addr_v4,
"%d.%d.%d.%d", arr[0], arr[1], arr[2],
arr[3]);
log_debug("client connected! addr [%s] port [%d]\n",
s->cparam->ip_addr_v4, addr.s_in.sa_port);
p->cparam->ip_addr_v4, addr.s_in.sa_port);
}

nni_mqtt_msg_set_packet_type(msg, NNG_MQTT_CONNACK);
Expand All @@ -886,12 +887,12 @@ mqtt_recv_cb(void *arg)
if (rv != MQTT_SUCCESS) {
nni_plat_printf("Error in encoding CONNACK.\n");
}
conn_param_clone(s->cparam);
conn_param_clone(p->cparam);
if ((ctx = nni_list_first(&s->recv_queue)) == NULL) {
// No one waiting to receive yet, putting msg
// into lmq
if (mqtt_pipe_recv_msgq_putq(p, msg) != 0)
conn_param_free(s->cparam);
conn_param_free(p->cparam);
nni_mtx_unlock(&s->mtx);
log_warn("Warning: no ctx found!! create more "
"ctxs!");
Expand Down Expand Up @@ -958,7 +959,7 @@ mqtt_recv_cb(void *arg)
// into lmq
if (mqtt_pipe_recv_msgq_putq(p, cached_msg) != 0) {
#ifdef NNG_HAVE_MQTT_BROKER
conn_param_free(s->cparam);
conn_param_free(p->cparam);
#endif
log_warn("ERROR: no ctx found! msg queue full! QoS2 msg lost!");
}
Expand All @@ -979,7 +980,7 @@ mqtt_recv_cb(void *arg)
qos = nni_mqtt_msg_get_publish_qos(msg);
#ifdef NNG_HAVE_MQTT_BROKER
// clone for bridging
conn_param_clone(s->cparam);
conn_param_clone(p->cparam);
#endif
nng_msg_set_cmd_type(msg, CMD_PUBLISH);
if (2 > qos) {
Expand All @@ -990,7 +991,7 @@ mqtt_recv_cb(void *arg)
// into lmq
if (mqtt_pipe_recv_msgq_putq(p, msg) != 0) {
#ifdef NNG_HAVE_MQTT_BROKER
conn_param_free(s->cparam);
conn_param_free(p->cparam);
log_warn("Warning: no ctx found!! PUB msg lost!");
#endif
}
Expand All @@ -1015,7 +1016,7 @@ mqtt_recv_cb(void *arg)
"packet id %d duplicates in", packet_id);
nni_msg_free(cached_msg);
#ifdef NNG_HAVE_MQTT_BROKER
conn_param_free(s->cparam);
conn_param_free(p->cparam);
#endif
// nni_id_remove(&pipe->nano_qos_db,
// pid);
Expand Down

0 comments on commit 525560c

Please sign in to comment.