diff --git a/src/mqtt/protocol/mqtt/mqtt_client.c b/src/mqtt/protocol/mqtt/mqtt_client.c index 7e776ab72..036a81ee3 100644 --- a/src/mqtt/protocol/mqtt/mqtt_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_client.c @@ -84,6 +84,9 @@ struct mqtt_pipe_s { bool busy; uint8_t pingcnt; nni_msg *pingmsg; +#ifdef NNG_HAVE_MQTT_BROKER + conn_param *cparam; +#endif }; // A mqtt_sock_s is our per-socket protocol private structure. @@ -103,10 +106,6 @@ struct mqtt_sock_s { property *dis_prop; // disconnect property nni_mqtt_sqlite_option *sqlite_opt; - -#ifdef NNG_HAVE_MQTT_BROKER - conn_param *cparam; -#endif }; /****************************************************************************** @@ -135,9 +134,7 @@ mqtt_sock_init(void *arg, nni_sock *sock) s->retry = NNI_SECOND * 5; s->keepalive = NNI_SECOND * 10; // default mqtt keepalive s->timeleft = NNI_SECOND * 10; -#ifdef NNG_HAVE_MQTT_BROKER - s->cparam = NULL; -#endif + nni_mtx_init(&s->mtx); mqtt_ctx_init(&s->master, s); @@ -319,6 +316,10 @@ mqtt_pipe_init(void *arg, nni_pipe *pipe, void *s) nni_lmq_init(&p->recv_messages, NNG_MAX_RECV_LMQ); nni_lmq_init(&p->send_messages, NNG_MAX_SEND_LMQ); +#ifdef NNG_HAVE_MQTT_BROKER + p->cparam = NULL; +#endif + return (0); } @@ -547,7 +548,7 @@ mqtt_pipe_close(void *arg) #ifdef NNG_HAVE_MQTT_BROKER nni_aio *user_aio; - if (s->cparam == NULL) { + if (p->cparam == NULL) { nni_mtx_unlock(&s->mtx); return 0; } @@ -555,11 +556,11 @@ mqtt_pipe_close(void *arg) // Return disconnect event to broker, only when compiled with nanomq uint16_t count = 0; mqtt_ctx_t *ctx; - nni_msg *tmsg = nano_msg_notify_disconnect(s->cparam, SERVER_SHUTTING_DOWN); + nni_msg *tmsg = nano_msg_notify_disconnect(p->cparam, SERVER_SHUTTING_DOWN); nni_msg_set_cmd_type(tmsg, CMD_DISCONNECT_EV); // clone once for DISCONNECT_EV state - conn_param_clone(s->cparam); - nni_msg_set_conn_param(tmsg, s->cparam); + conn_param_clone(p->cparam); + nni_msg_set_conn_param(tmsg, p->cparam); // return error to all receving aio // emulate disconnect notify msg as a normal publish while ((ctx = nni_list_first(&s->recv_queue)) != NULL) { @@ -577,7 +578,7 @@ mqtt_pipe_close(void *arg) if (count == 0) { log_warn("disconnect msg of bridging is lost due to no ctx on receving"); nni_msg_free(tmsg); - conn_param_free(s->cparam); + conn_param_free(p->cparam); } // particular for NanoSDK in bridging nni_lmq_flush_cp(&p->recv_messages, true); @@ -845,10 +846,10 @@ mqtt_recv_cb(void *arg) // return CONNACK to APP when working with broker #ifdef NNG_HAVE_MQTT_BROKER nng_msg_set_cmd_type(msg, CMD_CONNACK); - s->cparam = nni_msg_get_conn_param(msg); + p->cparam = nni_msg_get_conn_param(msg); // add connack msg to app layer only for notify in broker // bridge - if (s->cparam != NULL) { + if (p->cparam != NULL) { // Get IPv4 ADDR of client nng_sockaddr addr; uint8_t *arr; @@ -856,7 +857,7 @@ mqtt_recv_cb(void *arg) nng_pipe.id = nni_pipe_id(p->pipe); // Set keepalive - s->keepalive = conn_param_get_keepalive(s->cparam) * 1000; + s->keepalive = conn_param_get_keepalive(p->cparam) * 1000; s->timeleft = s->keepalive; rv = nng_pipe_get_addr( @@ -866,11 +867,11 @@ mqtt_recv_cb(void *arg) if (arr == NULL) { log_warn("Fail to get IP addr from client pipe!"); } else { - sprintf(s->cparam->ip_addr_v4, + sprintf(p->cparam->ip_addr_v4, "%d.%d.%d.%d", arr[0], arr[1], arr[2], arr[3]); log_debug("client connected! addr [%s] port [%d]\n", - s->cparam->ip_addr_v4, addr.s_in.sa_port); + p->cparam->ip_addr_v4, addr.s_in.sa_port); } nni_mqtt_msg_set_packet_type(msg, NNG_MQTT_CONNACK); @@ -886,12 +887,12 @@ mqtt_recv_cb(void *arg) if (rv != MQTT_SUCCESS) { nni_plat_printf("Error in encoding CONNACK.\n"); } - conn_param_clone(s->cparam); + conn_param_clone(p->cparam); if ((ctx = nni_list_first(&s->recv_queue)) == NULL) { // No one waiting to receive yet, putting msg // into lmq if (mqtt_pipe_recv_msgq_putq(p, msg) != 0) - conn_param_free(s->cparam); + conn_param_free(p->cparam); nni_mtx_unlock(&s->mtx); log_warn("Warning: no ctx found!! create more " "ctxs!"); @@ -958,7 +959,7 @@ mqtt_recv_cb(void *arg) // into lmq if (mqtt_pipe_recv_msgq_putq(p, cached_msg) != 0) { #ifdef NNG_HAVE_MQTT_BROKER - conn_param_free(s->cparam); + conn_param_free(p->cparam); #endif log_warn("ERROR: no ctx found! msg queue full! QoS2 msg lost!"); } @@ -979,7 +980,7 @@ mqtt_recv_cb(void *arg) qos = nni_mqtt_msg_get_publish_qos(msg); #ifdef NNG_HAVE_MQTT_BROKER // clone for bridging - conn_param_clone(s->cparam); + conn_param_clone(p->cparam); #endif nng_msg_set_cmd_type(msg, CMD_PUBLISH); if (2 > qos) { @@ -990,7 +991,7 @@ mqtt_recv_cb(void *arg) // into lmq if (mqtt_pipe_recv_msgq_putq(p, msg) != 0) { #ifdef NNG_HAVE_MQTT_BROKER - conn_param_free(s->cparam); + conn_param_free(p->cparam); log_warn("Warning: no ctx found!! PUB msg lost!"); #endif } @@ -1015,7 +1016,7 @@ mqtt_recv_cb(void *arg) "packet id %d duplicates in", packet_id); nni_msg_free(cached_msg); #ifdef NNG_HAVE_MQTT_BROKER - conn_param_free(s->cparam); + conn_param_free(p->cparam); #endif // nni_id_remove(&pipe->nano_qos_db, // pid);