Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exchange_server] Remove send messages queue, and send message directly #886

Merged
merged 4 commits into from
Mar 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
127 changes: 27 additions & 100 deletions src/mqtt/protocol/exchange/exchange_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ struct exchange_node_s {
exchange_t *ex;
exchange_sock_t *sock;
exchange_pipe_t *pipe;
nni_aio saio;
bool isBusy;
nni_mtx mtx;
nni_lmq send_messages;
};

struct exchange_sock_s {
Expand All @@ -74,7 +71,6 @@ static void exchange_sock_fini(void *arg);
static void exchange_sock_open(void *arg);
static void exchange_sock_send(void *arg, nni_aio *aio);
static void exchange_sock_recv(void *arg, nni_aio *aio);
static void exchange_send_cb(void *arg);

static int
exchange_add_ex(exchange_sock_t *s, exchange_t *ex)
Expand All @@ -99,10 +95,6 @@ exchange_add_ex(exchange_sock_t *s, exchange_t *ex)
node->ex = ex;
node->sock = s;

nni_aio_init(&node->saio, exchange_send_cb, node);
nni_mtx_init(&node->mtx);
nni_lmq_init(&node->send_messages, NANO_MAX_MQ_BUFFER_LEN);

s->ex_node = node;
nni_mtx_unlock(&s->mtx);
return 0;
Expand All @@ -128,24 +120,10 @@ exchange_sock_init(void *arg, nni_sock *sock)
static void
exchange_sock_fini(void *arg)
{
nni_msg *msg;
nni_aio *aio;
exchange_sock_t *s = arg;
exchange_node_t *ex_node;

ex_node = s->ex_node;
while (nni_lmq_get(&ex_node->send_messages, &msg) == 0) {
aio = nni_msg_get_proto_data(msg);
if (aio != NULL) {
nni_aio_finish_error(aio, NNG_ECLOSED);
}

nni_msg_free(msg);
}

nni_aio_fini(&ex_node->saio);
nni_mtx_fini(&ex_node->mtx);
nni_lmq_fini(&ex_node->send_messages);

nni_pollable_fini(&s->writable);
nni_pollable_fini(&s->readable);
Expand All @@ -172,10 +150,8 @@ static void
exchange_sock_close(void *arg)
{
exchange_sock_t *s = arg;
exchange_node_t *ex_node = s->ex_node;

nni_atomic_set_bool(&s->closed, true);
nni_aio_close(&ex_node->saio);

return;
}
Expand Down Expand Up @@ -234,6 +210,32 @@ exchange_client_handle_msg(exchange_node_t *ex_node, nni_msg *msg, nni_aio *aio)
return 0;
}

static inline void
exchange_do_send(exchange_node_t *ex_node, nni_msg *msg, nni_aio *user_aio)
{
int ret = 0;

exchange_sock_t *s = ex_node->sock;
if (nni_atomic_get_bool(&s->closed)) {
// This occurs if the mqtt_pipe_close has been called.
// In that case we don't want any more processing.
log_error("exchange sock is closed!");
nni_aio_finish_error(user_aio, NNG_EINVAL);
return;
}

ret = exchange_client_handle_msg(ex_node, msg, user_aio);
if (ret != 0) {
log_error(
"exchange_client_handle cached msg failed!\n");
nni_aio_finish_error(user_aio, NNG_EINVAL);
} else {
nni_aio_finish(user_aio, 0, 0);
}

return;
}

static void
exchange_sock_send(void *arg, nni_aio *aio)
{
Expand Down Expand Up @@ -264,23 +266,7 @@ exchange_sock_send(void *arg, nni_aio *aio)
}

ex_node = s->ex_node;
nni_mtx_lock(&ex_node->mtx); // Too complex lock, performance lost
/* Store aio in msg proto data */
nni_msg_set_proto_data(msg, NULL, (void *)aio);
if (!ex_node->isBusy) {
ex_node->isBusy = true;
nni_aio_set_msg(&ex_node->saio, msg);
nni_mtx_unlock(&ex_node->mtx);
// kick off
nni_aio_finish(&ex_node->saio, 0, nni_msg_len(msg));
} else {
if (nni_lmq_put(&ex_node->send_messages, msg) != 0) {
log_error("nni_lmq_put failed! msg lost\n");
nni_msg_free(msg);
}
nni_mtx_unlock(&ex_node->mtx);
/* don't finish user aio here, finish user aio in send_cb */
}
exchange_do_send(ex_node, msg, aio);
nni_mtx_unlock(&s->mtx);
return;
}
Expand Down Expand Up @@ -378,65 +364,6 @@ exchange_sock_recv(void *arg, nni_aio *aio)
return;
}


static void
exchange_send_cb(void *arg)
{
exchange_node_t *ex_node = arg;
nni_msg *msg = NULL;
nni_aio *user_aio = NULL;
int ret = 0;

if (ex_node == NULL) {
return;
}

exchange_sock_t *s = ex_node->sock;
if (nni_atomic_get_bool(&s->closed)) {
// This occurs if the mqtt_pipe_close has been called.
// In that case we don't want any more processing.
return;
}

if (nni_aio_result(&ex_node->saio) != 0) {
return;
}

nni_mtx_lock(&ex_node->mtx);
// send cached msg first
while (nni_lmq_get(&ex_node->send_messages, &msg) == 0) {
user_aio = (nni_aio *) nni_msg_get_proto_data(msg);
if (user_aio == NULL) {
log_error("user_aio is NULL\n");
break;
}
// make sure msg is in order
ret = exchange_client_handle_msg(ex_node, msg, user_aio);
if (ret != 0) {
log_error(
"exchange_client_handle cached msg failed!\n");
nni_aio_finish_error(user_aio, NNG_EINVAL);
} else {
nni_aio_finish(user_aio, 0, 0);
}
}
// check msg in aio & send
if ((msg = nni_aio_get_msg(&ex_node->saio)) != NULL) {
user_aio = (nni_aio *) nni_msg_get_proto_data(msg);
nni_aio_set_msg(&ex_node->saio, NULL);
ret = exchange_client_handle_msg(ex_node, msg, user_aio);
if (ret != 0) {
log_error("exchange_client_handle_msg failed!\n");
nni_aio_finish_error(user_aio, NNG_EINVAL);
} else {
nni_aio_finish(user_aio, 0, 0);
}
}
ex_node->isBusy = false;
nni_mtx_unlock(&ex_node->mtx);
return;
}

static int
exchange_sock_bind_exchange(void *arg, const void *v, size_t sz, nni_opt_type t)
{
Expand Down