Skip to content

Commit

Permalink
* FIX [exchange_client] Resolve TSAN data race
Browse files Browse the repository at this point in the history
Signed-off-by: Moi Ran <maoyi.ran@emqx.io>
  • Loading branch information
RanMaoyi authored and JaylinYu committed Nov 25, 2023
1 parent 8334db5 commit 583bf19
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 12 deletions.
2 changes: 2 additions & 0 deletions include/nng/exchange/exchange_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ typedef struct exchange_node_s exchange_node_t;

struct exchange_node_s {
exchange_t *ex;
exchange_sock_t *sock;
nni_aio saio;
nni_lmq send_messages;
nni_list_node exnode;
Expand All @@ -22,6 +23,7 @@ struct exchange_node_s {

struct exchange_sock_s {
nni_mtx mtx;
nni_atomic_bool closed;
nni_list ex_queue;
};

Expand Down
53 changes: 41 additions & 12 deletions src/mqtt/protocol/exchange/exchange_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ exchange_add_ex(exchange_sock_t *s, exchange_t *ex)

node->isBusy = false;
node->ex = ex;
node->sock = s;

nni_aio_init(&node->saio, exchange_send_cb, ex);
nni_aio_init(&node->saio, exchange_send_cb, node);
nni_lmq_init(&node->send_messages, 1024);

NNI_LIST_NODE_INIT(&node->exnode);
Expand All @@ -53,6 +54,8 @@ exchange_sock_init(void *arg, nni_sock *sock)
NNI_ARG_UNUSED(sock);
exchange_sock_t *s = arg;

nni_atomic_init_bool(&s->closed);
nni_atomic_set_bool(&s->closed, false);
nni_mtx_init(&s->mtx);
NNI_LIST_INIT(&s->ex_queue, exchange_node_t, exnode);

Expand All @@ -63,6 +66,25 @@ static void
exchange_sock_fini(void *arg)
{
exchange_sock_t *s = arg;
exchange_node_t *ex_node;

NNI_LIST_FOREACH (&s->ex_queue, ex_node) {
nni_aio_fini(&ex_node->saio);
nni_lmq_fini(&ex_node->send_messages);
nni_mtx_fini(&ex_node->mtx);
exchange_release(ex_node->ex);
}

ex_node = NULL;
while (!nni_list_empty(&s->ex_queue)) {
ex_node = nni_list_last(&s->ex_queue);
if (ex_node) {
nni_list_remove(&s->ex_queue, ex_node);
nng_free(ex_node, sizeof(*ex_node));
ex_node = NULL;
}
}

nni_mtx_fini(&s->mtx);
return;
}
Expand All @@ -80,18 +102,9 @@ exchange_sock_close(void *arg)
exchange_sock_t *s = arg;
exchange_node_t *ex_node;

nni_atomic_set_bool(&s->closed, true);
NNI_LIST_FOREACH (&s->ex_queue, ex_node) {
exchange_release(ex_node->ex);
nni_lmq_fini(&ex_node->send_messages);
}

ex_node = NULL;
while (!nni_list_empty(&s->ex_queue)) {
ex_node = nni_list_last(&s->ex_queue);
if (ex_node) {
nni_list_remove(&s->ex_queue, ex_node);
nng_free(ex_node, sizeof(*ex_node));
}
nni_aio_close(&ex_node->saio);
}

return;
Expand Down Expand Up @@ -157,7 +170,23 @@ exchange_send_cb(void *arg)
exchange_node_t *ex_node = arg;
nni_msg * msg = NULL;

if (ex_node == NULL) {
return;
}

exchange_sock_t *s = ex_node->sock;
if (nni_atomic_get_bool(&s->closed)) {
// This occurs if the mqtt_pipe_close has been called.
// In that case we don't want any more processing.
return;
}

nni_mtx_lock(&ex_node->mtx);
if (nni_aio_result(&ex_node->saio) != 0) {
nni_mtx_unlock(&ex_node->mtx);
return;
}

if (nni_lmq_get(&ex_node->send_messages, &msg) == 0) {
nni_mtx_unlock(&ex_node->mtx);
(void)exchange_handle_msg(ex_node->ex, msg);
Expand Down

0 comments on commit 583bf19

Please sign in to comment.