From 0ce4310d8cfae43c194b7f48561dcd85fc7fdb99 Mon Sep 17 00:00:00 2001 From: Daniel-Constantin Mierla Date: Tue, 10 Mar 2020 18:02:08 +0100 Subject: [PATCH] db_cluster: support for insert_async db api --- src/modules/db_cluster/db_cluster_mod.c | 1 + src/modules/db_cluster/dbcl_api.c | 153 ++++++++++++++++++++++++ src/modules/db_cluster/dbcl_api.h | 7 ++ 3 files changed, 161 insertions(+) diff --git a/src/modules/db_cluster/db_cluster_mod.c b/src/modules/db_cluster/db_cluster_mod.c index 2163f9da7fb..e3f91f44695 100644 --- a/src/modules/db_cluster/db_cluster_mod.c +++ b/src/modules/db_cluster/db_cluster_mod.c @@ -95,6 +95,7 @@ int db_cluster_bind_api(db_func_t *dbb) dbb->update = db_cluster_update; dbb->replace = db_cluster_replace; dbb->last_inserted_id = db_cluster_last_inserted_id; + dbb->insert_async = db_cluster_insert_async; dbb->insert_update = db_cluster_insert_update; dbb->insert_delayed = db_cluster_insert_delayed; dbb->affected_rows = db_cluster_affected_rows; diff --git a/src/modules/db_cluster/dbcl_api.c b/src/modules/db_cluster/dbcl_api.c index ebe7a9059a3..838f2a24c2d 100644 --- a/src/modules/db_cluster/dbcl_api.c +++ b/src/modules/db_cluster/dbcl_api.c @@ -254,6 +254,148 @@ extern int dbcl_max_query_length; } while(0) +#define DBCL_WRITEX(qfunc, command, qfuncx, commandx) \ + do {\ + int ret;\ + int rc;\ + int rok;\ + int i;\ + int j;\ + int k;\ + unsigned int sec = 0;\ + db1_con_t *dbh=NULL;\ + dbcl_cls_t *cls=NULL;\ + cls = (dbcl_cls_t*)_h->tail;\ + ret = -1;\ + rok = 0;\ + rc = 0;\ + for(i=DBCL_PRIO_SIZE-1; i>0; i--)\ + {\ + if(cls->wlist[i].clen<=0) continue; \ + switch(cls->wlist[i].mode) {\ + case 's':\ + case 'S':\ + for(j=0; jwlist[i].clen; j++)\ + {\ + if(dbcl_valid_con(cls->wlist[i].clist[j])==0)\ + {\ + LM_DBG("serial operation - cluster [%.*s] (%d/%d)\n",\ + cls->name.len, cls->name.s, i, j);\ + sec = get_ticks();\ + dbh = cls->wlist[i].clist[j]->dbh;\ + if(cls->wlist[i].clist[j]->dbf.qfuncx==NULL) {\ + if(cls->wlist[i].clist[j]->dbf.qfunc==NULL) {\ + LM_ERR("unsupported command by db connector\n");\ + return -1;\ + } else {\ + ret = cls->wlist[i].clist[j]->dbf.command;\ + }\ + } else {\ + ret = cls->wlist[i].clist[j]->dbf.commandx;\ + }\ + if (ret==0) {\ + cls->usedcon = cls->wlist[i].clist[j];\ + return 0;\ + } else {\ + LM_DBG("serial operation - failure on cluster"\ + " [%.*s] (%d/%d)\n",\ + cls->name.len, cls->name.s, i, j);\ + sec = get_ticks() - sec;\ + if(sec >= dbcl_max_query_length){\ + dbcl_inactive_con(cls->wlist[i].clist[j]);\ + }\ + }\ + }\ + }\ + break;\ + case 'r':\ + case 'R':\ + for(k=0; kwlist[i].clen; k++)\ + {\ + j = (process_no + k + cls->wlist[i].crt) % cls->wlist[i].clen;\ + if(dbcl_valid_con(cls->wlist[i].clist[j])==0)\ + {\ + LM_DBG("round robin operation - cluster [%.*s] (%d/%d)\n",\ + cls->name.len, cls->name.s, i, j);\ + sec = get_ticks();\ + dbh = cls->wlist[i].clist[j]->dbh;\ + if(cls->wlist[i].clist[j]->dbf.qfuncx==NULL) {\ + if(cls->wlist[i].clist[j]->dbf.qfunc==NULL) {\ + LM_ERR("unsupported command by db connector\n");\ + return -1;\ + } else {\ + ret = cls->wlist[i].clist[j]->dbf.command;\ + }\ + } else {\ + ret = cls->wlist[i].clist[j]->dbf.commandx;\ + }\ + if (ret==0)\ + {\ + cls->usedcon = cls->wlist[i].clist[j];\ + cls->wlist[i].crt = (j+1) % cls->wlist[i].clen;\ + return 0;\ + } else {\ + LM_DBG("round robin operation - failure on cluster"\ + " [%.*s] (%d/%d)\n",\ + cls->name.len, cls->name.s, i, j);\ + sec = get_ticks() - sec;\ + if(sec >= dbcl_max_query_length){\ + dbcl_inactive_con(cls->wlist[i].clist[j]);\ + }\ + }\ + }\ + }\ + break;\ + case 'p':\ + case 'P':\ + for(j=0; jwlist[i].clen; j++)\ + {\ + if(dbcl_valid_con(cls->wlist[i].clist[j])==0)\ + {\ + LM_DBG("parallel operation - cluster [%.*s] (%d/%d)\n",\ + cls->name.len, cls->name.s, i, j);\ + sec = get_ticks();\ + dbh = cls->wlist[i].clist[j]->dbh;\ + if(cls->wlist[i].clist[j]->dbf.qfuncx==NULL) {\ + if(cls->wlist[i].clist[j]->dbf.qfunc==NULL) {\ + LM_ERR("unsupported command by db connector\n");\ + return -1;\ + } else {\ + ret = cls->wlist[i].clist[j]->dbf.command;\ + }\ + } else {\ + ret = cls->wlist[i].clist[j]->dbf.commandx;\ + }\ + if(rc==0) {\ + cls->usedcon = cls->wlist[i].clist[j];\ + rok = 1;\ + } else {\ + LM_DBG("parallel operation - failure on cluster"\ + " [%.*s] (%d/%d)\n",\ + cls->name.len, cls->name.s, i, j);\ + sec = get_ticks() - sec;\ + if(sec >= dbcl_max_query_length){\ + dbcl_inactive_con(cls->wlist[i].clist[j]);\ + }\ + }\ + ret |= rc;\ + }\ + }\ + if (rok==1 && cls->wlist[i].clen>0)\ + return 0;\ + break;\ + default:\ + LM_ERR("invalid mode %c (%d)\n", cls->rlist[i].mode,\ + cls->rlist[i].mode);\ + return -1;\ + }\ + }\ + LM_DBG("no successful write on cluster [%.*s]\n",\ + cls->name.len, cls->name.s);\ + return ret;\ + } while(0) + + /*! \brief * Initialize database connection @@ -380,6 +522,17 @@ int db_cluster_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _ } +/*! \brief + * Async insert a row into table + */ +int db_cluster_insert_async(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v, const int _n) +{ + LM_DBG("executing db cluster insert async command\n"); + DBCL_WRITEX(insert, insert(dbh, _k, _v, _n), insert_async, + insert_async(dbh, _k, _v, _n)); +} + + /*! \brief * Delete a row from table */ diff --git a/src/modules/db_cluster/dbcl_api.h b/src/modules/db_cluster/dbcl_api.h index ae125acb800..6260d466b71 100644 --- a/src/modules/db_cluster/dbcl_api.h +++ b/src/modules/db_cluster/dbcl_api.h @@ -116,6 +116,13 @@ int db_cluster_last_inserted_id(const db1_con_t* _h); int db_cluster_affected_rows(const db1_con_t* _h); +/*! \brief + * Async insert a row into table + */ +int db_cluster_insert_async(const db1_con_t* _h, const db_key_t* _k, + const db_val_t* _v, const int _n); + + /*! \brief * Insert a row into table, update on duplicate key */