Skip to content

Commit

Permalink
htable: option to dmq sync a single htable via rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
miconda committed Aug 15, 2023
1 parent a053bcc commit 49adf6b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 9 deletions.
30 changes: 23 additions & 7 deletions src/modules/htable/ht_dmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ dmq_peer_t *ht_dmq_peer = NULL;
dmq_resp_cback_t ht_dmq_resp_callback = {&ht_dmq_resp_callback_f, 0};

int ht_dmq_send(str *body, dmq_node_t *node);
int ht_dmq_send_sync(dmq_node_t *node);
int ht_dmq_send_sync(dmq_node_t *node, str *htname);
int ht_dmq_handle_sync(srjson_doc_t *jdoc);

static int ht_dmq_cell_group_init(void)
Expand Down Expand Up @@ -217,7 +217,8 @@ int ht_dmq_initialize()
}

not_peer.callback = ht_dmq_handle_msg;
not_peer.init_callback = (ht_dmq_init_sync ? ht_dmq_request_sync : NULL);
not_peer.init_callback =
(ht_dmq_init_sync ? ht_dmq_request_sync_all : NULL);
not_peer.description.s = "htable";
not_peer.description.len = 6;
not_peer.peer_id.s = "htable";
Expand Down Expand Up @@ -261,7 +262,8 @@ int ht_dmq_handle_msg(
int content_length;
str body;
ht_dmq_action_t action = HT_DMQ_NONE;
str htname, cname;
str htname = str_init("");
str cname;
int type = 0, mode = 0;
int_str val;
srjson_doc_t jdoc;
Expand Down Expand Up @@ -306,7 +308,6 @@ int ht_dmq_handle_msg(
if(unlikely(strcmp(jdoc.root->child->string, "cells") == 0)) {
ht_dmq_handle_sync(&jdoc);
} else {

for(it = jdoc.root->child; it; it = it->next) {
LM_DBG("found field: %s\n", it->string);
if(strcmp(it->string, "action") == 0) {
Expand All @@ -333,7 +334,7 @@ int ht_dmq_handle_msg(
}

if(unlikely(action == HT_DMQ_SYNC)) {
ht_dmq_send_sync(dmq_node);
ht_dmq_send_sync(dmq_node, &htname);
} else {
if(ht_dmq_replay_action(action, &htname, &cname, type, &val, mode)
!= 0) {
Expand Down Expand Up @@ -456,7 +457,7 @@ int ht_dmq_replay_action(ht_dmq_action_t action, str *htname, str *cname,
}
}

int ht_dmq_request_sync()
int ht_dmq_request_sync(str *htname)
{

srjson_doc_t jdoc;
Expand All @@ -471,6 +472,10 @@ int ht_dmq_request_sync()
}

srjson_AddNumberToObject(&jdoc, jdoc.root, "action", HT_DMQ_SYNC);
if(htname != NULL && htname->len > 0) {
srjson_AddStrToObject(
&jdoc, jdoc.root, "htname", htname->s, htname->len);
}
jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
if(jdoc.buf.s == NULL) {
LM_ERR("unable to serialize data\n");
Expand All @@ -496,7 +501,12 @@ int ht_dmq_request_sync()
return -1;
}

int ht_dmq_send_sync(dmq_node_t *node)
int ht_dmq_request_sync_all()
{
return ht_dmq_request_sync(NULL);
}

int ht_dmq_send_sync(dmq_node_t *node, str *htname)
{
ht_t *ht;
ht_cell_t *it;
Expand All @@ -518,6 +528,12 @@ int ht_dmq_send_sync(dmq_node_t *node)
if(!ht->dmqreplicate)
goto skip;

if(htname != NULL && htname->len > 0) {
if(htname->len != ht->name.len
|| strncmp(htname->s, ht->name.s, htname->len) != 0) {
goto skip;
}
}
for(i = 0; i < ht->htsize; i++) {
ht_slot_lock(ht, i);
it = ht->entries[i].first;
Expand Down
3 changes: 2 additions & 1 deletion src/modules/htable/ht_dmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ int ht_dmq_replicate_action(ht_dmq_action_t action, str *htname, str *cname,
int type, int_str *val, int mode);
int ht_dmq_replay_action(ht_dmq_action_t action, str *htname, str *cname,
int type, int_str *val, int mode);
int ht_dmq_request_sync();
int ht_dmq_request_sync(str *htname);
int ht_dmq_request_sync_all();
int ht_dmq_resp_callback_f(
struct sip_msg *msg, int code, dmq_node_t *node, void *param);

Expand Down
8 changes: 7 additions & 1 deletion src/modules/htable/htable.c
Original file line number Diff line number Diff line change
Expand Up @@ -2105,8 +2105,14 @@ static void htable_rpc_store(rpc_t *rpc, void *c)
/*! \brief RPC htable.dmqsync command to sync a hash table via dmq */
static void htable_rpc_dmqsync(rpc_t *rpc, void *c)
{
int n = 0;
str htname = str_init("");

if(ht_dmq_request_sync() < 0) {
n = rpc->scan(c, "*S", &htname);
if(n != 1) {
htname.len = 0;
}
if(ht_dmq_request_sync(&htname) < 0) {
rpc->fault(c, 500, "HTable DMQ Sync Failed");
return;
}
Expand Down

0 comments on commit 49adf6b

Please sign in to comment.