From 3b2d83d87c2c01ddbafe74dc85a8f3a1751f4c2f Mon Sep 17 00:00:00 2001 From: Moi Ran Date: Tue, 5 Mar 2024 14:43:33 +0800 Subject: [PATCH 1/4] * MDF [exchange_server] Remove exchange_send_cb Signed-off-by: Moi Ran --- src/mqtt/protocol/exchange/exchange_server.c | 60 -------------------- 1 file changed, 60 deletions(-) diff --git a/src/mqtt/protocol/exchange/exchange_server.c b/src/mqtt/protocol/exchange/exchange_server.c index b3570018c..e04de9aba 100644 --- a/src/mqtt/protocol/exchange/exchange_server.c +++ b/src/mqtt/protocol/exchange/exchange_server.c @@ -74,7 +74,6 @@ static void exchange_sock_fini(void *arg); static void exchange_sock_open(void *arg); static void exchange_sock_send(void *arg, nni_aio *aio); static void exchange_sock_recv(void *arg, nni_aio *aio); -static void exchange_send_cb(void *arg); static int exchange_add_ex(exchange_sock_t *s, exchange_t *ex) @@ -378,65 +377,6 @@ exchange_sock_recv(void *arg, nni_aio *aio) return; } - -static void -exchange_send_cb(void *arg) -{ - exchange_node_t *ex_node = arg; - nni_msg *msg = NULL; - nni_aio *user_aio = NULL; - int ret = 0; - - if (ex_node == NULL) { - return; - } - - exchange_sock_t *s = ex_node->sock; - if (nni_atomic_get_bool(&s->closed)) { - // This occurs if the mqtt_pipe_close has been called. - // In that case we don't want any more processing. - return; - } - - if (nni_aio_result(&ex_node->saio) != 0) { - return; - } - - nni_mtx_lock(&ex_node->mtx); - // send cached msg first - while (nni_lmq_get(&ex_node->send_messages, &msg) == 0) { - user_aio = (nni_aio *) nni_msg_get_proto_data(msg); - if (user_aio == NULL) { - log_error("user_aio is NULL\n"); - break; - } - // make sure msg is in order - ret = exchange_client_handle_msg(ex_node, msg, user_aio); - if (ret != 0) { - log_error( - "exchange_client_handle cached msg failed!\n"); - nni_aio_finish_error(user_aio, NNG_EINVAL); - } else { - nni_aio_finish(user_aio, 0, 0); - } - } - // check msg in aio & send - if ((msg = nni_aio_get_msg(&ex_node->saio)) != NULL) { - user_aio = (nni_aio *) nni_msg_get_proto_data(msg); - nni_aio_set_msg(&ex_node->saio, NULL); - ret = exchange_client_handle_msg(ex_node, msg, user_aio); - if (ret != 0) { - log_error("exchange_client_handle_msg failed!\n"); - nni_aio_finish_error(user_aio, NNG_EINVAL); - } else { - nni_aio_finish(user_aio, 0, 0); - } - } - ex_node->isBusy = false; - nni_mtx_unlock(&ex_node->mtx); - return; -} - static int exchange_sock_bind_exchange(void *arg, const void *v, size_t sz, nni_opt_type t) { From 28c45fca0e1c96a2af06a07fb5e86d90e1024c20 Mon Sep 17 00:00:00 2001 From: Moi Ran Date: Tue, 5 Mar 2024 14:44:27 +0800 Subject: [PATCH 2/4] * ADD [exchange_server] New exchange_do_send() Signed-off-by: Moi Ran --- src/mqtt/protocol/exchange/exchange_server.c | 26 ++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/mqtt/protocol/exchange/exchange_server.c b/src/mqtt/protocol/exchange/exchange_server.c index e04de9aba..33c819784 100644 --- a/src/mqtt/protocol/exchange/exchange_server.c +++ b/src/mqtt/protocol/exchange/exchange_server.c @@ -233,6 +233,32 @@ exchange_client_handle_msg(exchange_node_t *ex_node, nni_msg *msg, nni_aio *aio) return 0; } +static void inline +exchange_do_send(exchange_node_t *ex_node, nni_msg *msg, nni_aio *user_aio) +{ + int ret = 0; + + exchange_sock_t *s = ex_node->sock; + if (nni_atomic_get_bool(&s->closed)) { + // This occurs if the mqtt_pipe_close has been called. + // In that case we don't want any more processing. + log_error("exchange sock is closed!"); + nni_aio_finish_error(user_aio, NNG_EINVAL); + return; + } + + ret = exchange_client_handle_msg(ex_node, msg, user_aio); + if (ret != 0) { + log_error( + "exchange_client_handle cached msg failed!\n"); + nni_aio_finish_error(user_aio, NNG_EINVAL); + } else { + nni_aio_finish(user_aio, 0, 0); + } + + return; +} + static void exchange_sock_send(void *arg, nni_aio *aio) { From c17fe6197e46ee8be1369bb1d358b3be6e909bdb Mon Sep 17 00:00:00 2001 From: Moi Ran Date: Tue, 5 Mar 2024 14:46:20 +0800 Subject: [PATCH 3/4] * MDF [exchange_server] Remove unused sendmsg queue and mtx Signed-off-by: Moi Ran --- src/mqtt/protocol/exchange/exchange_server.c | 25 +------------------- 1 file changed, 1 insertion(+), 24 deletions(-) diff --git a/src/mqtt/protocol/exchange/exchange_server.c b/src/mqtt/protocol/exchange/exchange_server.c index 33c819784..5086f4318 100644 --- a/src/mqtt/protocol/exchange/exchange_server.c +++ b/src/mqtt/protocol/exchange/exchange_server.c @@ -52,10 +52,7 @@ struct exchange_node_s { exchange_t *ex; exchange_sock_t *sock; exchange_pipe_t *pipe; - nni_aio saio; bool isBusy; - nni_mtx mtx; - nni_lmq send_messages; }; struct exchange_sock_s { @@ -98,10 +95,6 @@ exchange_add_ex(exchange_sock_t *s, exchange_t *ex) node->ex = ex; node->sock = s; - nni_aio_init(&node->saio, exchange_send_cb, node); - nni_mtx_init(&node->mtx); - nni_lmq_init(&node->send_messages, NANO_MAX_MQ_BUFFER_LEN); - s->ex_node = node; nni_mtx_unlock(&s->mtx); return 0; @@ -127,24 +120,10 @@ exchange_sock_init(void *arg, nni_sock *sock) static void exchange_sock_fini(void *arg) { - nni_msg *msg; - nni_aio *aio; exchange_sock_t *s = arg; exchange_node_t *ex_node; ex_node = s->ex_node; - while (nni_lmq_get(&ex_node->send_messages, &msg) == 0) { - aio = nni_msg_get_proto_data(msg); - if (aio != NULL) { - nni_aio_finish_error(aio, NNG_ECLOSED); - } - - nni_msg_free(msg); - } - - nni_aio_fini(&ex_node->saio); - nni_mtx_fini(&ex_node->mtx); - nni_lmq_fini(&ex_node->send_messages); nni_pollable_fini(&s->writable); nni_pollable_fini(&s->readable); @@ -171,10 +150,8 @@ static void exchange_sock_close(void *arg) { exchange_sock_t *s = arg; - exchange_node_t *ex_node = s->ex_node; nni_atomic_set_bool(&s->closed, true); - nni_aio_close(&ex_node->saio); return; } @@ -233,7 +210,7 @@ exchange_client_handle_msg(exchange_node_t *ex_node, nni_msg *msg, nni_aio *aio) return 0; } -static void inline +static inline void exchange_do_send(exchange_node_t *ex_node, nni_msg *msg, nni_aio *user_aio) { int ret = 0; From fe063f6cdbbc42a6d859406d31a5c27fbe49030e Mon Sep 17 00:00:00 2001 From: Moi Ran Date: Tue, 5 Mar 2024 14:46:47 +0800 Subject: [PATCH 4/4] * MDF [exchange_server] Use exchange_do_send() to send msgs Signed-off-by: Moi Ran --- src/mqtt/protocol/exchange/exchange_server.c | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/src/mqtt/protocol/exchange/exchange_server.c b/src/mqtt/protocol/exchange/exchange_server.c index 5086f4318..8ad0670e1 100644 --- a/src/mqtt/protocol/exchange/exchange_server.c +++ b/src/mqtt/protocol/exchange/exchange_server.c @@ -266,23 +266,7 @@ exchange_sock_send(void *arg, nni_aio *aio) } ex_node = s->ex_node; - nni_mtx_lock(&ex_node->mtx); // Too complex lock, performance lost - /* Store aio in msg proto data */ - nni_msg_set_proto_data(msg, NULL, (void *)aio); - if (!ex_node->isBusy) { - ex_node->isBusy = true; - nni_aio_set_msg(&ex_node->saio, msg); - nni_mtx_unlock(&ex_node->mtx); - // kick off - nni_aio_finish(&ex_node->saio, 0, nni_msg_len(msg)); - } else { - if (nni_lmq_put(&ex_node->send_messages, msg) != 0) { - log_error("nni_lmq_put failed! msg lost\n"); - nni_msg_free(msg); - } - nni_mtx_unlock(&ex_node->mtx); - /* don't finish user aio here, finish user aio in send_cb */ - } + exchange_do_send(ex_node, msg, aio); nni_mtx_unlock(&s->mtx); return; }