diff --git a/src/modules/mqueue/Makefile b/src/modules/mqueue/Makefile
index 7121f598f5c..2c52191250f 100644
--- a/src/modules/mqueue/Makefile
+++ b/src/modules/mqueue/Makefile
@@ -6,4 +6,6 @@ auto_gen=
NAME=mqueue.so
LIBS=
+SERLIBPATH=../../lib
+SER_LIBS+=$(SERLIBPATH)/srdb1/srdb1
include ../../Makefile.modules
diff --git a/src/modules/mqueue/doc/mqueue.xml b/src/modules/mqueue/doc/mqueue.xml
index 771d8216341..8a8e23ef6e3 100644
--- a/src/modules/mqueue/doc/mqueue.xml
+++ b/src/modules/mqueue/doc/mqueue.xml
@@ -46,7 +46,7 @@
Elena-Ramona Modroiu (asipto.com)
- 2018
+ 2018-2020
Julien chavanton, Flowroute
diff --git a/src/modules/mqueue/doc/mqueue_admin.xml b/src/modules/mqueue/doc/mqueue_admin.xml
index 42542a392cc..53090980581 100644
--- a/src/modules/mqueue/doc/mqueue_admin.xml
+++ b/src/modules/mqueue/doc/mqueue_admin.xml
@@ -61,6 +61,32 @@
Parameters
+
+ db_url (str)
+
+ The URL to connect to database for loading values in mqueue table at start up and/or saving values at shutdown.
+
+
+
+ Default value is NULL (do not connect).
+
+
+
+ Set db_url parameter
+
+...
+modparam("mqueue", "db_url", "&defaultdb;")
+
+# Example of table in sqlite, you have the set the fields to support the length according to the data that will be present in the mqueue
+CREATE TABLE mqueue_name (
+id INTEGER PRIMARY KEY AUTOINCREMENT,
+key character varying(64) DEFAULT "" NOT NULL,
+val character varying(4096) DEFAULT "" NOT NULL
+);
+...
+
+
+
mqueue (string)
@@ -95,6 +121,16 @@
If not set the queue will be limitless.
+
+
+ dbmode: If set to 1, the content of the queue
+ is written to database table when the SIP server is stopped
+ (i.e., ensure persistency over restarts).
+ If set to 2, it is written at shutdown but not read at startup.
+ If set to 3, it is read at sartup but not written at shutdown.
+ Default value is 0 (no db table interaction).
+
+
diff --git a/src/modules/mqueue/mqueue_api.c b/src/modules/mqueue/mqueue_api.c
index 39042f08a19..93204bba3c7 100644
--- a/src/modules/mqueue/mqueue_api.c
+++ b/src/modules/mqueue/mqueue_api.c
@@ -34,7 +34,7 @@
#include "../../core/fmsg.h"
#include "mqueue_api.h"
-
+#include "mqueue_db.h"
/**
*
@@ -71,6 +71,11 @@ void mq_destroy(void)
mh = _mq_head_list;
while(mh!=NULL)
{
+ if(mh->dbmode == 1 || mh->dbmode == 3)
+ {
+ LM_INFO("mqueue[%.*s] dbmode[%d]\n", mh->name.len, mh->name.s, mh->dbmode);
+ mqueue_db_save_queue(&mh->name);
+ }
mi = mh->ifirst;
while(mi!=NULL)
{
@@ -180,6 +185,27 @@ mq_head_t *mq_head_get(str *name)
return NULL;
}
+/**
+ *
+ */
+int mq_set_dbmode(str *name, int dbmode)
+{
+ mq_head_t *mh = NULL;
+
+ mh = _mq_head_list;
+ while(mh!=NULL)
+ {
+ if(name->len == mh->name.len
+ && strncmp(mh->name.s, name->s, name->len)==0)
+ {
+ mh->dbmode = dbmode;
+ return 0;
+ }
+ mh = mh->next;
+ }
+ return -1;
+}
+
/**
*
*/
diff --git a/src/modules/mqueue/mqueue_api.h b/src/modules/mqueue/mqueue_api.h
index 51d2bfc54ea..843a7d79671 100644
--- a/src/modules/mqueue/mqueue_api.h
+++ b/src/modules/mqueue/mqueue_api.h
@@ -44,6 +44,7 @@ typedef struct _mq_head
str name;
int msize;
int csize;
+ int dbmode;
gen_lock_t lock;
mq_item_t *ifirst;
mq_item_t *ilast;
@@ -61,7 +62,6 @@ typedef struct _mq_pv
} mq_pv_t;
mq_pv_t *mq_pv_get(str *name);
-
int pv_parse_mq_name(pv_spec_p sp, str *in);
int pv_get_mqk(struct sip_msg *msg, pv_param_t *param,
pv_value_t *res);
@@ -79,6 +79,7 @@ void mq_pv_free(str *name);
int mq_item_add(str *qname, str *key, str *val);
int _mq_get_csize(str *);
+int mq_set_dbmode(str *, int dbmode);
#endif
diff --git a/src/modules/mqueue/mqueue_db.c b/src/modules/mqueue/mqueue_db.c
new file mode 100644
index 00000000000..c4e836ed1c8
--- /dev/null
+++ b/src/modules/mqueue/mqueue_db.c
@@ -0,0 +1,324 @@
+/**
+ * Copyright (C) 2020 Julien Chavanton
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ */
+
+#include "../../lib/srdb1/db.h"
+#include "mqueue_api.h"
+
+/** database connection */
+db1_con_t *mqueue_db_con = NULL;
+db_func_t mq_dbf;
+
+/** db parameters */
+str mqueue_db_url = {0, 0};
+str mq_db_key_column = str_init("key");
+str mq_db_val_column = str_init("val");
+str mq_db_id_column = str_init("id");
+
+/**
+ * initialize database connection
+ */
+int mqueue_db_init_con(void)
+{
+ if (mqueue_db_url.len<=0) {
+ LM_ERR("failed to connect to the database, no db url\n");
+ return -1;
+ }
+ /* binding to DB module */
+ if (db_bind_mod(&mqueue_db_url, &mq_dbf))
+ {
+ LM_ERR("database module not found\n");
+ return -1;
+ }
+
+ if (!DB_CAPABILITY(mq_dbf, DB_CAP_ALL))
+ {
+ LM_ERR("database module does not "
+ "implement all functions needed by the module\n");
+ return -1;
+ }
+ return 0;
+}
+
+/**
+ * open database connection
+ */
+int mqueue_db_open_con(void) {
+ if (mqueue_db_init_con()==0) {
+ mqueue_db_con = mq_dbf.init(&mqueue_db_url);
+ if (mqueue_db_con==NULL) {
+ LM_ERR("failed to connect to the database\n");
+ return -1;
+ }
+ LM_DBG("database connection opened successfully\n");
+ return 0;
+ }
+ return 0;
+}
+
+/**
+ * close database connection
+ */
+int mqueue_db_close_con(void)
+{
+ if (mqueue_db_con!=NULL && mq_dbf.close!=NULL)
+ mq_dbf.close(mqueue_db_con);
+ mqueue_db_con=NULL;
+ return 0;
+}
+
+int mqueue_db_load_queue(str *name)
+{
+ int ncols=2;
+ db1_res_t* db_res = NULL;
+ db_key_t db_cols[2] = {&mq_db_key_column, &mq_db_val_column};
+ db_key_t db_ord = &mq_db_id_column;
+ int mq_fetch_rows = 100;
+ int ret = 0;
+ str val = str_init("");
+ str key = str_init("");
+ int i;
+ int cnt=0;
+
+ if (mqueue_db_open_con() != 0) {
+ LM_ERR("no db connection\n");
+ return -1;
+ }
+
+ if (mq_dbf.use_table(mqueue_db_con, name) < 0)
+ {
+ LM_ERR("failed to use_table\n");
+ goto error;
+ }
+
+ LM_INFO("=============== loading queue table [%.*s] from database\n",
+ name->len, name->s);
+
+ if (DB_CAPABILITY(mq_dbf, DB_CAP_FETCH)) {
+ if(mq_dbf.query(mqueue_db_con,0,0,0,db_cols,0,ncols,db_ord,0) < 0)
+ {
+ LM_ERR("Error while querying db\n");
+ goto error;
+ }
+ if(mq_dbf.fetch_result(mqueue_db_con, &db_res, mq_fetch_rows)<0)
+ {
+ LM_ERR("Error while fetching result\n");
+ if (db_res)
+ mq_dbf.free_result(mqueue_db_con, db_res);
+ goto error;
+ } else {
+ if(RES_ROW_N(db_res)==0)
+ {
+ mq_dbf.free_result(mqueue_db_con, db_res);
+ LM_DBG("Nothing to be loaded in queue\n");
+ mqueue_db_close_con();
+ return 0;
+ }
+ }
+ } else {
+ if((ret=mq_dbf.query(mqueue_db_con, NULL, NULL, NULL, db_cols,
+ 0, ncols, 0, &db_res))!=0
+ || RES_ROW_N(db_res)<=0 )
+ {
+ if(ret==0)
+ {
+ mq_dbf.free_result(mqueue_db_con, db_res);
+ mqueue_db_close_con();
+ return 0;
+ } else {
+ goto error;
+ }
+ }
+ }
+
+ do {
+ for(i=0; ilen, name->s, i);
+ goto error;
+ }
+ if(VAL_NULL(&RES_ROWS(db_res)[i].values[1])) {
+ LM_ERR("mqueue [%.*s] row [%d] has NULL value string\n",
+ name->len, name->s, i);
+ goto error;
+ }
+ switch(RES_ROWS(db_res)[i].values[0].type) {
+ case DB1_STR:
+ key.s = (RES_ROWS(db_res)[i].values[0].val.str_val.s);
+ if(key.s==NULL) {
+ LM_ERR("mqueue [%.*s] row [%d] has NULL key\n",
+ name->len, name->s, i);
+ goto error;
+ }
+ key.len = (RES_ROWS(db_res)[i].values[0].val.str_val.len);
+ break;
+ case DB1_BLOB:
+ key.s = (RES_ROWS(db_res)[i].values[0].val.blob_val.s);
+ if(key.s==NULL) {
+ LM_ERR("mqueue [%.*s] row [%d] has NULL key\n",
+ name->len, name->s, i);
+ goto error;
+ }
+ key.len = (RES_ROWS(db_res)[i].values[0].val.blob_val.len);
+ break;
+ case DB1_STRING:
+ key.s = (char*)(RES_ROWS(db_res)[i].values[0].val.string_val);
+ if(key.s==NULL) {
+ LM_ERR("mqueue [%.*s] row [%d] has NULL key\n",
+ name->len, name->s, i);
+ goto error;
+ }
+ key.len = strlen(key.s);
+ break;
+ default:
+ LM_ERR("key type must be string (type=%d)\n",
+ RES_ROWS(db_res)[i].values[0].type);
+ goto error;
+ }
+ switch(RES_ROWS(db_res)[i].values[1].type) {
+ case DB1_STR:
+ val.s = (RES_ROWS(db_res)[i].values[1].val.str_val.s);
+ if(val.s==NULL) {
+ LM_ERR("mqueue [%.*s] row [%d] has NULL value\n",
+ name->len, name->s, i);
+ goto error;
+ }
+ val.len = (RES_ROWS(db_res)[i].values[1].val.str_val.len);
+ break;
+ case DB1_BLOB:
+ val.s = (RES_ROWS(db_res)[i].values[1].val.blob_val.s);
+ if(val.s==NULL) {
+ LM_ERR("mqueue [%.*s] row [%d] has NULL value\n",
+ name->len, name->s, i);
+ goto error;
+ }
+ val.len = (RES_ROWS(db_res)[i].values[1].val.blob_val.len);
+ break;
+ case DB1_STRING:
+ val.s = (char*)(RES_ROWS(db_res)[i].values[1].val.string_val);
+ if(val.s==NULL) {
+ LM_ERR("mqueue [%.*s] row [%d] has NULL value\n",
+ name->len, name->s, i);
+ goto error;
+ }
+ val.len = strlen(val.s);
+ break;
+ default:
+ LM_ERR("key type must be string (type=%d)\n",
+ RES_ROWS(db_res)[i].values[1].type);
+ goto error;
+ }
+ cnt++;
+ LM_DBG("adding item[%d] key[%.*s] value[%.*s]\n", cnt, key.len, key.s, val.len, val.s);
+ mq_item_add(name, &key, &val);
+ }
+
+ if (DB_CAPABILITY(mq_dbf, DB_CAP_FETCH)) {
+ if(mq_dbf.fetch_result(mqueue_db_con, &db_res, mq_fetch_rows)<0) {
+ LM_ERR("Error while fetching!\n");
+ goto error;
+ }
+ } else {
+ break;
+ }
+ } while(RES_ROW_N(db_res)>0);
+
+ mq_dbf.free_result(mqueue_db_con, db_res);
+
+ if (mq_dbf.delete(mqueue_db_con, 0, 0, 0, 0) < 0) {
+ LM_ERR("failed to clear table\n");
+ goto error;
+ }
+
+ LM_DBG("loaded %d values in queue\n", cnt);
+ mqueue_db_close_con();
+ return 0;
+error:
+ mqueue_db_close_con();
+ return -1;
+}
+
+int mqueue_db_save_queue(str *name)
+{
+ int ncols=2;
+ db_key_t db_cols[2] = {&mq_db_key_column, &mq_db_val_column};
+ db_val_t db_vals[2];
+ int i;
+ int mqueue_sz = 0;
+ int ret = 0;
+
+ if (mqueue_db_open_con() != 0) {
+ LM_ERR("no db connection\n");
+ return -1;
+ }
+
+ if (mq_dbf.use_table(mqueue_db_con, name) < 0)
+ {
+ LM_ERR("failed to use_table\n");
+ goto error;
+ }
+
+ if (name->len <= 0 || name->s == NULL) {
+ LM_ERR("bad mqueue name\n");
+ goto error;
+ }
+
+ mqueue_sz = _mq_get_csize(name);
+
+ if (mqueue_sz < 0) {
+ LM_ERR("no such mqueue\n");
+ goto error;
+ }
+ for(i=0;ilen, name->s, key->len, key->s, val->len, val->s);
+ db_vals[0].type = DB1_STR;
+ db_vals[0].nul = 0;
+ db_vals[0].val.str_val.s = key->s;
+ db_vals[0].val.str_val.len = key->len;
+ db_vals[1].type = DB1_STR;
+ db_vals[1].nul = 0;
+ db_vals[1].val.str_val.s = val->s;
+ db_vals[1].val.str_val.len = val->len;
+ if(mq_dbf.insert(mqueue_db_con, db_cols, db_vals, ncols) < 0)
+ {
+ LM_ERR("failed to store key [%.*s] val [%.*s]\n",
+ key->len, key->s,
+ val->len, val->s);
+ }
+ }
+
+ LM_INFO("queue [%.*s] saved in db\n",
+ name->len, name->s);
+ mqueue_db_close_con();
+ return 0;
+error:
+ mqueue_db_close_con();
+ return -1;
+}
diff --git a/src/modules/mqueue/mqueue_db.h b/src/modules/mqueue/mqueue_db.h
new file mode 100644
index 00000000000..3fbbe5db27f
--- /dev/null
+++ b/src/modules/mqueue/mqueue_db.h
@@ -0,0 +1,33 @@
+/**
+ * Copyright (C) 2020 Julien Chavanton
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * This file is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ *
+ * This file is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ */
+
+#ifndef _MQUEUE_DB_H_
+#define _MQUEUE_DB_H_
+
+#include "../../lib/srdb1/db.h"
+#include "mqueue_api.h"
+
+extern str mqueue_db_url;
+
+int mqueue_db_load_queue(str *name);
+int mqueue_db_save_queue(str *name);
+#endif
diff --git a/src/modules/mqueue/mqueue_mod.c b/src/modules/mqueue/mqueue_mod.c
index 2ee1b3afed9..860014cca94 100644
--- a/src/modules/mqueue/mqueue_mod.c
+++ b/src/modules/mqueue/mqueue_mod.c
@@ -37,6 +37,7 @@
#include "../../core/kemi.h"
#include "mqueue_api.h"
+#include "mqueue_db.h"
#include "api.h"
MODULE_VERSION
@@ -54,6 +55,7 @@ static int bind_mq(mq_api_t* api);
static int mqueue_rpc_init(void);
+
static pv_export_t mod_pvs[] = {
{ {"mqk", sizeof("mqk")-1}, PVT_OTHER, pv_get_mqk, 0,
pv_parse_mq_name, 0, 0, 0 },
@@ -80,6 +82,7 @@ static cmd_export_t cmds[]={
};
static param_export_t params[]={
+ {"db_url", PARAM_STR, &mqueue_db_url},
{"mqueue", PARAM_STRING|USE_FUNC_PARAM, (void*)mq_param},
{0, 0, 0}
};
@@ -205,6 +208,7 @@ int mq_param(modparam_t type, void *val)
param_t *pit=NULL;
str qname = {0, 0};
int msize = 0;
+ int dbmode = 0;
if(val==NULL)
return -1;
@@ -229,6 +233,9 @@ int mq_param(modparam_t type, void *val)
} else if(pit->name.len==4
&& strncasecmp(pit->name.s, "size", 4)==0) {
str2sint(&pit->body, &msize);
+ } else if(pit->name.len==6
+ && strncasecmp(pit->name.s, "dbmode", 6)==0) {
+ str2sint(&pit->body, &dbmode);
} else {
LM_ERR("unknown param: %.*s\n", pit->name.len, pit->name.s);
free_params(params_list);
@@ -247,6 +254,16 @@ int mq_param(modparam_t type, void *val)
free_params(params_list);
return -1;
}
+ LM_INFO("mqueue param: [%.*s|%d]\n", qname.len, qname.s, dbmode);
+ if(dbmode == 1 || dbmode == 2) {
+ if(mqueue_db_load_queue(&qname)<0)
+ {
+ LM_ERR("error loading mqueue: %.*s from DB\n", qname.len, qname.s);
+ free_params(params_list);
+ return -1;
+ }
+ }
+ mq_set_dbmode(&qname, dbmode);
free_params(params_list);
return 0;
}