diff --git a/src/modules/htable/doc/htable_admin.xml b/src/modules/htable/doc/htable_admin.xml
index e67e7cc74b2..708973a0452 100644
--- a/src/modules/htable/doc/htable_admin.xml
+++ b/src/modules/htable/doc/htable_admin.xml
@@ -647,6 +647,28 @@ modparam("htable", "db_expires", 1)
...
modparam("htable", "enable_dmq", 1)
...
+
+
+
+
+ dmq_init_sync (integer)
+
+ If set to 1, will request synchronization from other nodes at startup. It applies
+ to all tables having the "dmqreplicate" parameter set. As above, it is important to
+ ensure the definition (size, autoexpire etc.) of replicated tables is identical
+ across all instances.
+
+
+
+ Default value is 0.
+
+
+
+ Set dmq_init_sync parameter
+
+...
+modparam("htable", "dmq_init_sync", 1)
+...
diff --git a/src/modules/htable/ht_dmq.c b/src/modules/htable/ht_dmq.c
index 975daf09bfb..f1fe069f177 100644
--- a/src/modules/htable/ht_dmq.c
+++ b/src/modules/htable/ht_dmq.c
@@ -24,11 +24,6 @@
#include "ht_dmq.h"
#include "ht_api.h"
-static str ht_dmq_content_type = str_init("application/json");
-static str dmq_200_rpl = str_init("OK");
-static str dmq_400_rpl = str_init("Bad Request");
-static str dmq_500_rpl = str_init("Server Internal Error");
-
typedef struct _ht_dmq_repdata {
int action;
str htname;
@@ -39,10 +34,155 @@ typedef struct _ht_dmq_repdata {
int expire;
} ht_dmq_repdata_t;
+typedef struct _ht_dmq_jdoc_cell_group {
+ int count;
+ int size;
+ srjson_doc_t jdoc;
+ srjson_t *jdoc_cells;
+} ht_dmq_jdoc_cell_group_t;
+
+static str ht_dmq_content_type = str_init("application/json");
+static str dmq_200_rpl = str_init("OK");
+static str dmq_400_rpl = str_init("Bad Request");
+static str dmq_500_rpl = str_init("Server Internal Error");
+static int dmq_cell_group_empty_size = 12; // {"cells":[]}
+static int dmq_cell_group_max_size = 60000;
+static ht_dmq_jdoc_cell_group_t ht_dmq_jdoc_cell_group;
+int ht_dmq_init_sync;
+
dmq_api_t ht_dmqb;
dmq_peer_t* ht_dmq_peer = NULL;
dmq_resp_cback_t ht_dmq_resp_callback = {&ht_dmq_resp_callback_f, 0};
+int ht_dmq_send(str* body, dmq_node_t* node);
+int ht_dmq_send_sync(dmq_node_t* node);
+int ht_dmq_handle_sync(srjson_doc_t* jdoc);
+
+static int ht_dmq_cell_group_init(void) {
+
+ if (ht_dmq_jdoc_cell_group.jdoc.root)
+ return 0; // already initialised
+
+ ht_dmq_jdoc_cell_group.count = 0;
+ ht_dmq_jdoc_cell_group.size = dmq_cell_group_empty_size;
+
+ srjson_InitDoc(&ht_dmq_jdoc_cell_group.jdoc, NULL);
+
+ ht_dmq_jdoc_cell_group.jdoc.root = srjson_CreateObject(&ht_dmq_jdoc_cell_group.jdoc);
+ if (ht_dmq_jdoc_cell_group.jdoc.root==NULL) {
+ LM_ERR("cannot create json root object! \n");
+ return -1;
+ }
+
+ ht_dmq_jdoc_cell_group.jdoc_cells = srjson_CreateArray(&ht_dmq_jdoc_cell_group.jdoc);
+ if (ht_dmq_jdoc_cell_group.jdoc_cells==NULL) {
+ LM_ERR("cannot create json cells array! \n");
+ srjson_DestroyDoc(&ht_dmq_jdoc_cell_group.jdoc);
+ return -1;
+ }
+
+ return 0;
+}
+
+static int ht_dmq_cell_group_write(str* htname, ht_cell_t* ptr) {
+
+ // jsonify cell and add to array
+
+ str tmp;
+ srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
+ srjson_t *jdoc_cells = ht_dmq_jdoc_cell_group.jdoc_cells;
+ srjson_t * jdoc_cell = srjson_CreateObject(jdoc);
+
+ if(!jdoc_cell) {
+ LM_ERR("cannot create cell json root\n");
+ return -1;
+ }
+
+ // add json overhead
+ if(ptr->flags&AVP_VAL_STR) {
+ ht_dmq_jdoc_cell_group.size += 54; // {"htname":"","cname":"","type":,"strval":"","expire":}
+ } else {
+ ht_dmq_jdoc_cell_group.size += 52; // {"htname":"","cname":"","type":,"intval":,"expire":}
+ }
+
+ srjson_AddStrToObject(jdoc, jdoc_cell, "htname", htname->s, htname->len);
+ ht_dmq_jdoc_cell_group.size += htname->len;
+
+ srjson_AddStrToObject(jdoc, jdoc_cell, "cname", ptr->name.s, ptr->name.len);
+ ht_dmq_jdoc_cell_group.size += ptr->name.len;
+
+ if (ptr->flags&AVP_VAL_STR) {
+ srjson_AddNumberToObject(jdoc, jdoc_cell, "type", AVP_VAL_STR);
+ ht_dmq_jdoc_cell_group.size += 1;
+ srjson_AddStrToObject(jdoc, jdoc_cell, "strval", ptr->value.s.s, ptr->value.s.len);
+ ht_dmq_jdoc_cell_group.size += ptr->value.s.len;
+ } else {
+ srjson_AddNumberToObject(jdoc, jdoc_cell, "type", 0);
+ ht_dmq_jdoc_cell_group.size += 1;
+ srjson_AddNumberToObject(jdoc, jdoc_cell, "intval", ptr->value.n);
+ tmp.s = sint2str((long)ptr->value.n, &tmp.len);
+ ht_dmq_jdoc_cell_group.size += tmp.len;
+ }
+
+ srjson_AddNumberToObject(jdoc, jdoc_cell, "expire", ptr->expire);
+ tmp.s = sint2str((long)ptr->expire, &tmp.len);
+ ht_dmq_jdoc_cell_group.size += tmp.len;
+
+ srjson_AddItemToArray(jdoc, jdoc_cells, jdoc_cell);
+
+ ht_dmq_jdoc_cell_group.count++;
+
+ return 0;
+}
+
+static int ht_dmq_cell_group_flush(dmq_node_t* node) {
+
+ srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
+ srjson_t *jdoc_cells = ht_dmq_jdoc_cell_group.jdoc_cells;
+
+ srjson_AddItemToObject(jdoc, jdoc->root, "cells", jdoc_cells);
+
+ LM_DBG("json[%s]\n", srjson_PrintUnformatted(jdoc, jdoc->root));
+ jdoc->buf.s = srjson_PrintUnformatted(jdoc, jdoc->root);
+ if(jdoc->buf.s==NULL) {
+ LM_ERR("unable to serialize data\n");
+ return -1;
+ }
+ jdoc->buf.len = strlen(jdoc->buf.s);
+
+ LM_DBG("sending serialized data %.*s\n", jdoc->buf.len, jdoc->buf.s);
+ if (ht_dmq_send(&jdoc->buf, node)!=0) {
+ LM_ERR("unable to send data\n");
+ return -1;
+ }
+
+ LM_DBG("jdoc size[%d]\n", ht_dmq_jdoc_cell_group.size);
+
+ srjson_Delete(jdoc, jdoc_cells);
+ ht_dmq_jdoc_cell_group.jdoc_cells = srjson_CreateArray(&ht_dmq_jdoc_cell_group.jdoc);
+ if (ht_dmq_jdoc_cell_group.jdoc_cells==NULL) {
+ LM_ERR("cannot re-create json cells array! \n");
+ return -1;
+ }
+
+ ht_dmq_jdoc_cell_group.count = 0;
+ ht_dmq_jdoc_cell_group.size = dmq_cell_group_empty_size;
+
+ return 0;
+}
+
+static void ht_dmq_cell_group_destroy() {
+
+ srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
+
+ if(jdoc->buf.s!=NULL) {
+ jdoc->free_fn(jdoc->buf.s);
+ jdoc->buf.s = NULL;
+ }
+ srjson_DestroyDoc(jdoc);
+
+}
+
/**
* @brief add notification peer
*/
@@ -59,7 +199,7 @@ int ht_dmq_initialize()
}
not_peer.callback = ht_dmq_handle_msg;
- not_peer.init_callback = NULL;
+ not_peer.init_callback = (ht_dmq_init_sync ? ht_dmq_request_sync : NULL);
not_peer.description.s = "htable";
not_peer.description.len = 6;
not_peer.peer_id.s = "htable";
@@ -76,14 +216,20 @@ int ht_dmq_initialize()
return -1;
}
-int ht_dmq_broadcast(str* body)
-{
+int ht_dmq_send(str* body, dmq_node_t* node) {
if (!ht_dmq_peer) {
LM_ERR("ht_dmq_peer is null!\n");
return -1;
}
- LM_DBG("sending broadcast...\n");
- ht_dmqb.bcast_message(ht_dmq_peer, body, 0, &ht_dmq_resp_callback, 1, &ht_dmq_content_type);
+ if (node) {
+ LM_DBG("sending dmq message ...\n");
+ ht_dmqb.send_message(ht_dmq_peer, body, node,
+ &ht_dmq_resp_callback, 1, &ht_dmq_content_type);
+ } else {
+ LM_DBG("sending dmq broadcast...\n");
+ ht_dmqb.bcast_message(ht_dmq_peer, body, 0,
+ &ht_dmq_resp_callback, 1, &ht_dmq_content_type);
+ }
return 0;
}
@@ -138,35 +284,45 @@ int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq
}
}
- for(it=jdoc.root->child; it; it = it->next)
- {
- LM_DBG("found field: %s\n", it->string);
- if (strcmp(it->string, "action")==0) {
- action = SRJSON_GET_INT(it);
- } else if (strcmp(it->string, "htname")==0) {
- htname.s = it->valuestring;
- htname.len = strlen(htname.s);
- } else if (strcmp(it->string, "cname")==0) {
- cname.s = it->valuestring;
- cname.len = strlen(cname.s);
- } else if (strcmp(it->string, "type")==0) {
- type = SRJSON_GET_INT(it);
- } else if (strcmp(it->string, "strval")==0) {
- val.s.s = it->valuestring;
- val.s.len = strlen(val.s.s);
- } else if (strcmp(it->string, "intval")==0) {
- val.n = SRJSON_GET_INT(it);
- } else if (strcmp(it->string, "mode")==0) {
- mode = SRJSON_GET_INT(it);
+ if (unlikely(strcmp(jdoc.root->child->string, "cells")==0)) {
+ ht_dmq_handle_sync(&jdoc);
+ } else {
+
+ for(it=jdoc.root->child; it; it = it->next)
+ {
+ LM_DBG("found field: %s\n", it->string);
+ if (strcmp(it->string, "action")==0) {
+ action = SRJSON_GET_INT(it);
+ } else if (strcmp(it->string, "htname")==0) {
+ htname.s = it->valuestring;
+ htname.len = strlen(htname.s);
+ } else if (strcmp(it->string, "cname")==0) {
+ cname.s = it->valuestring;
+ cname.len = strlen(cname.s);
+ } else if (strcmp(it->string, "type")==0) {
+ type = SRJSON_GET_INT(it);
+ } else if (strcmp(it->string, "strval")==0) {
+ val.s.s = it->valuestring;
+ val.s.len = strlen(val.s.s);
+ } else if (strcmp(it->string, "intval")==0) {
+ val.n = SRJSON_GET_INT(it);
+ } else if (strcmp(it->string, "mode")==0) {
+ mode = SRJSON_GET_INT(it);
+ } else {
+ LM_ERR("unrecognized field in json object\n");
+ goto invalid;
+ }
+ }
+
+ if (unlikely(action == HT_DMQ_SYNC)) {
+ ht_dmq_send_sync(dmq_node);
} else {
- LM_ERR("unrecognized field in json object\n");
- goto invalid;
+ if (ht_dmq_replay_action(action, &htname, &cname, type, &val, mode)!=0) {
+ LM_ERR("failed to replay action\n");
+ goto error;
+ }
}
- }
- if (ht_dmq_replay_action(action, &htname, &cname, type, &val, mode)!=0) {
- LM_ERR("failed to replay action\n");
- goto error;
}
srjson_DestroyDoc(&jdoc);
@@ -222,7 +378,7 @@ int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int
if(jdoc.buf.s!=NULL) {
jdoc.buf.len = strlen(jdoc.buf.s);
LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
- if (ht_dmq_broadcast(&jdoc.buf)!=0) {
+ if (ht_dmq_send(&jdoc.buf, 0)!=0) {
goto error;
}
jdoc.free_fn(jdoc.buf.s);
@@ -264,9 +420,172 @@ int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int ty
} else if (action==HT_DMQ_RM_CELL_RE) {
return ht_rm_cell_re(&val->s, ht, mode);
} else {
- LM_ERR("unrecognized action");
+ LM_ERR("unrecognized action\n");
+ return -1;
+ }
+}
+
+int ht_dmq_request_sync() {
+
+ srjson_doc_t jdoc;
+
+ LM_DBG("requesting sync from dmq peers\n");
+ srjson_InitDoc(&jdoc, NULL);
+
+ jdoc.root = srjson_CreateObject(&jdoc);
+ if(jdoc.root==NULL) {
+ LM_ERR("cannot create json root\n");
+ goto error;
+ }
+
+ srjson_AddNumberToObject(&jdoc, jdoc.root, "action", HT_DMQ_SYNC);
+ jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
+ if(jdoc.buf.s==NULL) {
+ LM_ERR("unable to serialize data\n");
+ goto error;
+ }
+ jdoc.buf.len = strlen(jdoc.buf.s);
+ LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
+ if (ht_dmq_send(&jdoc.buf, 0)!=0) {
+ goto error;
+ }
+
+ jdoc.free_fn(jdoc.buf.s);
+ jdoc.buf.s = NULL;
+ srjson_DestroyDoc(&jdoc);
+ return 0;
+
+error:
+ if(jdoc.buf.s!=NULL) {
+ jdoc.free_fn(jdoc.buf.s);
+ jdoc.buf.s = NULL;
+ }
+ srjson_DestroyDoc(&jdoc);
+ return -1;
+}
+
+int ht_dmq_send_sync(dmq_node_t* node) {
+ ht_t *ht;
+ ht_cell_t *it;
+ time_t now;
+ int i;
+
+ ht = ht_get_root();
+ if(ht==NULL)
+ {
+ LM_DBG("no htables to sync!\n");
+ return 0;
+ }
+
+ if (ht_dmq_cell_group_init() < 0)
return -1;
+
+ now = time(NULL);
+
+ while (ht != NULL)
+ {
+ if (!ht->dmqreplicate)
+ goto skip;
+
+ for(i=0; ihtsize; i++)
+ {
+ ht_slot_lock(ht, i);
+ it = ht->entries[i].first;
+ while(it)
+ {
+ if(ht->htexpire > 0) {
+ if (it->expire <= now) {
+ LM_DBG("skipping expired entry\n");
+ it = it->next;
+ continue;
+ }
+ }
+
+ if (ht_dmq_cell_group_write(&ht->name, it) < 0) {
+ ht_slot_unlock(ht, i);
+ goto error;
+ }
+
+ if (ht_dmq_jdoc_cell_group.size >= dmq_cell_group_max_size) {
+ LM_DBG("sending group count[%d]size[%d]\n", ht_dmq_jdoc_cell_group.count, ht_dmq_jdoc_cell_group.size);
+ if (ht_dmq_cell_group_flush(node) < 0) {
+ ht_slot_unlock(ht, i);
+ goto error;
+ }
+ }
+
+ it = it->next;
+ }
+ ht_slot_unlock(ht, i);
+ }
+
+skip:
+ ht = ht->next;
}
+
+ if (ht_dmq_cell_group_flush(node) < 0)
+ goto error;
+
+ ht_dmq_cell_group_destroy();
+ return 0;
+
+error:
+ ht_dmq_cell_group_destroy();
+ return -1;
+}
+
+int ht_dmq_handle_sync(srjson_doc_t* jdoc) {
+ LM_DBG("handling sync\n");
+
+ srjson_t* cells;
+ srjson_t* cell;
+ srjson_t* it;
+ str htname;
+ str cname;
+ int type;
+ int_str val;
+ int expire;
+ ht_t* ht;
+ time_t now;
+
+
+ cells = jdoc->root->child;
+ cell = cells->child;
+
+ now = time(NULL);
+
+ while (cell) {
+ for(it=cell->child; it; it = it->next) {
+ if (strcmp(it->string, "htname")==0) {
+ htname.s = it->valuestring;
+ htname.len = strlen(htname.s);
+ } else if (strcmp(it->string, "cname")==0) {
+ cname.s = it->valuestring;
+ cname.len = strlen(cname.s);
+ } else if (strcmp(it->string, "type")==0) {
+ type = SRJSON_GET_INT(it);
+ } else if (strcmp(it->string, "strval")==0) {
+ val.s.s = it->valuestring;
+ val.s.len = strlen(val.s.s);
+ } else if (strcmp(it->string, "intval")==0) {
+ val.n = SRJSON_GET_INT(it);
+ } else if (strcmp(it->string, "expire")==0) {
+ expire = SRJSON_GET_INT(it);
+ } else {
+ LM_WARN("unrecognized field in json object\n");
+ }
+ }
+
+ ht = ht_get_table(&htname);
+ if(ht==NULL)
+ LM_WARN("unable to get table %.*s\n", ht->name.len, ht->name.s);
+
+ if (ht_set_cell_ex(ht, &cname, type, &val, 0, expire - now) < 0)
+ LM_WARN("unable to set cell %.*s in table %.*s\n", cname.len, cname.s, ht->name.len, ht->name.s);
+
+ cell = cell->next;
+ }
+ return 0;
}
/**
diff --git a/src/modules/htable/ht_dmq.h b/src/modules/htable/ht_dmq.h
index 16d3df3cf86..be11d4dc494 100644
--- a/src/modules/htable/ht_dmq.h
+++ b/src/modules/htable/ht_dmq.h
@@ -31,18 +31,22 @@ extern dmq_api_t ht_dmqb;
extern dmq_peer_t* ht_dmq_peer;
extern dmq_resp_cback_t ht_dmq_resp_callback;
+int ht_dmq_init_sync;
+
typedef enum {
HT_DMQ_NONE,
HT_DMQ_SET_CELL,
HT_DMQ_SET_CELL_EXPIRE,
HT_DMQ_DEL_CELL,
- HT_DMQ_RM_CELL_RE
+ HT_DMQ_RM_CELL_RE,
+ HT_DMQ_SYNC
} ht_dmq_action_t;
int ht_dmq_initialize();
int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node);
int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode);
int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode);
+int ht_dmq_request_sync();
int ht_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
#endif
diff --git a/src/modules/htable/htable.c b/src/modules/htable/htable.c
index 1801948e86e..1da1ba79281 100644
--- a/src/modules/htable/htable.c
+++ b/src/modules/htable/htable.c
@@ -52,6 +52,7 @@ MODULE_VERSION
int ht_timer_interval = 20;
int ht_db_expires_flag = 0;
int ht_enable_dmq = 0;
+int ht_dmq_init_sync = 0;
int ht_timer_procs = 0;
static int ht_event_callback_mode = 0;
@@ -153,6 +154,7 @@ static param_export_t params[]={
{"timer_interval", INT_PARAM, &ht_timer_interval},
{"db_expires", INT_PARAM, &ht_db_expires_flag},
{"enable_dmq", INT_PARAM, &ht_enable_dmq},
+ {"dmq_init_sync", INT_PARAM, &ht_dmq_init_sync},
{"timer_procs", PARAM_INT, &ht_timer_procs},
{"event_callback", PARAM_STR, &ht_event_callback},
{"event_callback_mode", PARAM_INT, &ht_event_callback_mode},
@@ -218,7 +220,7 @@ static int mod_init(void)
}
}
- if (ht_enable_dmq>0 && ht_dmq_initialize()!=0) {
+ if (ht_enable_dmq>0 && ht_dmq_initialize(ht_dmq_init_sync)!=0) {
LM_ERR("failed to initialize dmq integration\n");
return -1;
}