Skip to content

Commit

Permalink
dmq: improve debug logs related to locks
Browse files Browse the repository at this point in the history
* dmq_node_list->lock
* dmq_worker->lock unreleased and some info inside worker_loop()
  • Loading branch information
linuxmaniac committed Mar 23, 2024
1 parent 0d240e4 commit a03dd2f
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 3 deletions.
14 changes: 11 additions & 3 deletions src/modules/dmq/dmq_funcs.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ int is_from_remote_node(sip_msg_t *msg)
int result = -1;

ip = &msg->rcv.src_ip;

LM_DBG("trying to acquire dmq_node_list->lock\n");
lock_get(&dmq_node_list->lock);
LM_DBG("acquired dmq_node_list->lock\n");
node = dmq_node_list->nodes;

while(node) {
Expand All @@ -153,6 +154,7 @@ int is_from_remote_node(sip_msg_t *msg)
}
done:
lock_release(&dmq_node_list->lock);
LM_DBG("released dmq_node_list->lock\n");
return result;
}

Expand All @@ -169,8 +171,9 @@ int bcast_dmq_message1(dmq_peer_t *peer, str *body, dmq_node_t *except,
int incl_inactive)
{
dmq_node_t *node;

LM_DBG("trying to acquire dmq_node_list->lock\n");
lock_get(&dmq_node_list->lock);
LM_DBG("acquired dmq_node_list->lock\n");
node = dmq_node_list->nodes;
while(node) {
/* we do not send the message to the following:
Expand All @@ -193,9 +196,11 @@ int bcast_dmq_message1(dmq_peer_t *peer, str *body, dmq_node_t *except,
node = node->next;
}
lock_release(&dmq_node_list->lock);
LM_DBG("released dmq_node_list->lock\n");
return 0;
error:
lock_release(&dmq_node_list->lock);
LM_DBG("released dmq_node_list->lock\n");
return -1;
}

Expand Down Expand Up @@ -443,8 +448,9 @@ int ki_dmq_t_replicate_mode(struct sip_msg *msg, int mode)
if(sock) {
set_force_socket(msg, sock);
}

LM_DBG("trying to acquire dmq_node_list->lock\n");
lock_get(&dmq_node_list->lock);
LM_DBG("acquired dmq_node_list->lock\n");
node = dmq_node_list->nodes;
while(node) {
/* we do not send the message to the following:
Expand Down Expand Up @@ -476,9 +482,11 @@ int ki_dmq_t_replicate_mode(struct sip_msg *msg, int mode)
node = node->next;
}
lock_release(&dmq_node_list->lock);
LM_DBG("released dmq_node_list->lock\n");
return 0;
error:
lock_release(&dmq_node_list->lock);
LM_DBG("released dmq_node_list->lock\n");
return -1;
}

Expand Down
11 changes: 11 additions & 0 deletions src/modules/dmq/dmqnode.c
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,9 @@ void pkg_free_node(dmq_node_t *node)
int dmq_node_del_filter(dmq_node_list_t *list, dmq_node_t *node, int filter)
{
dmq_node_t *cur, **prev;
LM_DBG("trying to acquire dmq_node_list->lock\n");
lock_get(&list->lock);
LM_DBG("acquired dmq_node_list->lock\n");
cur = list->nodes;
prev = &list->nodes;
while(cur) {
Expand All @@ -373,12 +375,14 @@ int dmq_node_del_filter(dmq_node_list_t *list, dmq_node_t *node, int filter)
destroy_dmq_node(cur, 1);
}
lock_release(&list->lock);
LM_DBG("released dmq_node_list->lock\n");
return 1;
}
prev = &cur->next;
cur = cur->next;
}
lock_release(&list->lock);
LM_DBG("released dmq_node_list->lock\n");
return 0;
}

Expand Down Expand Up @@ -419,11 +423,14 @@ dmq_node_t *add_dmq_node(dmq_node_list_t *list, str *uri)
goto error;
}
LM_DBG("dmq node successfully created\n");
LM_DBG("trying to acquire dmq_node_list->lock\n");
lock_get(&list->lock);
LM_DBG("acquired dmq_node_list->lock\n");
newnode->next = list->nodes;
list->nodes = newnode;
list->count++;
lock_release(&list->lock);
LM_DBG("released dmq_node_list->lock\n");
return newnode;
error:
return NULL;
Expand All @@ -435,17 +442,21 @@ dmq_node_t *add_dmq_node(dmq_node_list_t *list, str *uri)
int update_dmq_node_status(dmq_node_list_t *list, dmq_node_t *node, int status)
{
dmq_node_t *cur;
LM_DBG("trying to acquire dmq_node_list->lock\n");
lock_get(&list->lock);
LM_DBG("acquired dmq_node_list->lock\n");
cur = list->nodes;
while(cur) {
if(cmp_dmq_node(cur, node)) {
cur->status = status;
lock_release(&list->lock);
LM_DBG("released dmq_node_list->lock\n");
return 1;
}
cur = cur->next;
}
lock_release(&list->lock);
LM_DBG("released dmq_node_list->lock\n");
return 0;
}

Expand Down
8 changes: 8 additions & 0 deletions src/modules/dmq/notification_peer.c
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,9 @@ int extract_node_list(dmq_node_list_t *update_list, struct sip_msg *msg)
end = body.s + body.len;

/* acquire big list lock */
LM_DBG("trying to acquire dmq_node_list->lock\n");
lock_get(&update_list->lock);
LM_DBG("acquired dmq_node_list->lock\n");
while(tmp < end) {
match = q_memchr(tmp, '\n', end - tmp);
if(match) {
Expand Down Expand Up @@ -441,9 +443,11 @@ int extract_node_list(dmq_node_list_t *update_list, struct sip_msg *msg)

/* release big list lock */
lock_release(&update_list->lock);
LM_DBG("released dmq_node_list->lock\n");
return total_nodes;
error:
lock_release(&update_list->lock);
LM_DBG("released dmq_node_list->lock\n");
return -1;
}

Expand Down Expand Up @@ -549,7 +553,9 @@ str *build_notification_body()
return NULL;
}
/* we add each server to the body - each on a different line */
LM_DBG("trying to acquire dmq_node_list->lock\n");
lock_get(&dmq_node_list->lock);
LM_DBG("acquired dmq_node_list->lock\n");
cur_node = dmq_node_list->nodes;
while(cur_node) {
if(cur_node->local || cur_node->status == DMQ_NODE_ACTIVE) {
Expand All @@ -568,10 +574,12 @@ str *build_notification_body()
cur_node = cur_node->next;
}
lock_release(&dmq_node_list->lock);
LM_DBG("released dmq_node_list->lock\n");
body->len = clen;
return body;
error:
lock_release(&dmq_node_list->lock);
LM_DBG("released dmq_node_list->lock\n");
pkg_free(body->s);
pkg_free(body);
return NULL;
Expand Down
3 changes: 3 additions & 0 deletions src/modules/dmq/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ void worker_loop(int id)
LM_ERR("running job failed\n");
goto nextjob;
}
LM_DBG("running job executed\n");
/* add the body to the reply */
if(peer_response.body.s) {
if(set_reply_body(current_job->msg, &peer_response.body,
Expand All @@ -106,6 +107,7 @@ void worker_loop(int id)
LM_WARN("no reply sent\n");
}
worker->jobs_processed++;
LM_DBG("jobs_processed:%d\n", worker->jobs_processed);

nextjob:
/* if body given, free the lumps and free the body */
Expand Down Expand Up @@ -184,6 +186,7 @@ int add_dmq_job(struct sip_msg *msg, dmq_peer_t *peer)
}
if(dmq_worker_usleep <= 0) {
lock_release(&worker->lock);
LM_DBG("dmq_worker [%d %d] lock released\n", i, worker->pid);
}
return 0;
error:
Expand Down

0 comments on commit a03dd2f

Please sign in to comment.