From 49adf6b697c87c1e0426390511048eeacfe4c826 Mon Sep 17 00:00:00 2001 From: Daniel-Constantin Mierla Date: Tue, 15 Aug 2023 22:49:46 +0200 Subject: [PATCH] htable: option to dmq sync a single htable via rpc --- src/modules/htable/ht_dmq.c | 30 +++++++++++++++++++++++------- src/modules/htable/ht_dmq.h | 3 ++- src/modules/htable/htable.c | 8 +++++++- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/src/modules/htable/ht_dmq.c b/src/modules/htable/ht_dmq.c index f010097ba1a..ea6b97c38ba 100644 --- a/src/modules/htable/ht_dmq.c +++ b/src/modules/htable/ht_dmq.c @@ -57,7 +57,7 @@ dmq_peer_t *ht_dmq_peer = NULL; dmq_resp_cback_t ht_dmq_resp_callback = {&ht_dmq_resp_callback_f, 0}; int ht_dmq_send(str *body, dmq_node_t *node); -int ht_dmq_send_sync(dmq_node_t *node); +int ht_dmq_send_sync(dmq_node_t *node, str *htname); int ht_dmq_handle_sync(srjson_doc_t *jdoc); static int ht_dmq_cell_group_init(void) @@ -217,7 +217,8 @@ int ht_dmq_initialize() } not_peer.callback = ht_dmq_handle_msg; - not_peer.init_callback = (ht_dmq_init_sync ? ht_dmq_request_sync : NULL); + not_peer.init_callback = + (ht_dmq_init_sync ? ht_dmq_request_sync_all : NULL); not_peer.description.s = "htable"; not_peer.description.len = 6; not_peer.peer_id.s = "htable"; @@ -261,7 +262,8 @@ int ht_dmq_handle_msg( int content_length; str body; ht_dmq_action_t action = HT_DMQ_NONE; - str htname, cname; + str htname = str_init(""); + str cname; int type = 0, mode = 0; int_str val; srjson_doc_t jdoc; @@ -306,7 +308,6 @@ int ht_dmq_handle_msg( if(unlikely(strcmp(jdoc.root->child->string, "cells") == 0)) { ht_dmq_handle_sync(&jdoc); } else { - for(it = jdoc.root->child; it; it = it->next) { LM_DBG("found field: %s\n", it->string); if(strcmp(it->string, "action") == 0) { @@ -333,7 +334,7 @@ int ht_dmq_handle_msg( } if(unlikely(action == HT_DMQ_SYNC)) { - ht_dmq_send_sync(dmq_node); + ht_dmq_send_sync(dmq_node, &htname); } else { if(ht_dmq_replay_action(action, &htname, &cname, type, &val, mode) != 0) { @@ -456,7 +457,7 @@ int ht_dmq_replay_action(ht_dmq_action_t action, str *htname, str *cname, } } -int ht_dmq_request_sync() +int ht_dmq_request_sync(str *htname) { srjson_doc_t jdoc; @@ -471,6 +472,10 @@ int ht_dmq_request_sync() } srjson_AddNumberToObject(&jdoc, jdoc.root, "action", HT_DMQ_SYNC); + if(htname != NULL && htname->len > 0) { + srjson_AddStrToObject( + &jdoc, jdoc.root, "htname", htname->s, htname->len); + } jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root); if(jdoc.buf.s == NULL) { LM_ERR("unable to serialize data\n"); @@ -496,7 +501,12 @@ int ht_dmq_request_sync() return -1; } -int ht_dmq_send_sync(dmq_node_t *node) +int ht_dmq_request_sync_all() +{ + return ht_dmq_request_sync(NULL); +} + +int ht_dmq_send_sync(dmq_node_t *node, str *htname) { ht_t *ht; ht_cell_t *it; @@ -518,6 +528,12 @@ int ht_dmq_send_sync(dmq_node_t *node) if(!ht->dmqreplicate) goto skip; + if(htname != NULL && htname->len > 0) { + if(htname->len != ht->name.len + || strncmp(htname->s, ht->name.s, htname->len) != 0) { + goto skip; + } + } for(i = 0; i < ht->htsize; i++) { ht_slot_lock(ht, i); it = ht->entries[i].first; diff --git a/src/modules/htable/ht_dmq.h b/src/modules/htable/ht_dmq.h index 302af8cf5f5..1ae6b784450 100644 --- a/src/modules/htable/ht_dmq.h +++ b/src/modules/htable/ht_dmq.h @@ -49,7 +49,8 @@ int ht_dmq_replicate_action(ht_dmq_action_t action, str *htname, str *cname, int type, int_str *val, int mode); int ht_dmq_replay_action(ht_dmq_action_t action, str *htname, str *cname, int type, int_str *val, int mode); -int ht_dmq_request_sync(); +int ht_dmq_request_sync(str *htname); +int ht_dmq_request_sync_all(); int ht_dmq_resp_callback_f( struct sip_msg *msg, int code, dmq_node_t *node, void *param); diff --git a/src/modules/htable/htable.c b/src/modules/htable/htable.c index 5b73e4fafb6..b12d44f0927 100644 --- a/src/modules/htable/htable.c +++ b/src/modules/htable/htable.c @@ -2105,8 +2105,14 @@ static void htable_rpc_store(rpc_t *rpc, void *c) /*! \brief RPC htable.dmqsync command to sync a hash table via dmq */ static void htable_rpc_dmqsync(rpc_t *rpc, void *c) { + int n = 0; + str htname = str_init(""); - if(ht_dmq_request_sync() < 0) { + n = rpc->scan(c, "*S", &htname); + if(n != 1) { + htname.len = 0; + } + if(ht_dmq_request_sync(&htname) < 0) { rpc->fault(c, 500, "HTable DMQ Sync Failed"); return; }