From fc75b0975781d4eb4b99e48590cf7dcc4d704639 Mon Sep 17 00:00:00 2001 From: Julien Chavanton Date: Sat, 18 Mar 2017 13:16:15 -0700 Subject: [PATCH 1/2] db_postgress: insert_update() with DO UPDATE --- src/modules/db_postgres/db_postgres.c | 1 + src/modules/db_postgres/db_postgres.h | 2 + src/modules/db_postgres/km_dbase.c | 216 ++++++++++++++++++++++++++ src/modules/db_postgres/km_dbase.h | 15 ++ src/modules/db_postgres/km_pg_con.c | 4 + src/modules/db_postgres/km_res.c | 1 + src/modules/db_postgres/pg_mod.c | 4 + 7 files changed, 243 insertions(+) diff --git a/src/modules/db_postgres/db_postgres.c b/src/modules/db_postgres/db_postgres.c index e6a09663102..b42bcd3a2ee 100644 --- a/src/modules/db_postgres/db_postgres.c +++ b/src/modules/db_postgres/db_postgres.c @@ -90,6 +90,7 @@ int db_postgres_bind_api(db_func_t *dbb) dbb->raw_query = db_postgres_raw_query; dbb->free_result = db_postgres_free_result; dbb->insert = db_postgres_insert; + dbb->insert_update = db_postgres_insert_update; dbb->delete = db_postgres_delete; dbb->update = db_postgres_update; dbb->replace = db_postgres_replace; diff --git a/src/modules/db_postgres/db_postgres.h b/src/modules/db_postgres/db_postgres.h index 9d2bfc2c0a3..633b596e0e9 100644 --- a/src/modules/db_postgres/db_postgres.h +++ b/src/modules/db_postgres/db_postgres.h @@ -38,4 +38,6 @@ int pg_init_lock_set(int sz); void pg_destroy_lock_set(void); +int pg_alloc_buffer(void); + #endif /* _KM_DB_POSTGRES_H */ diff --git a/src/modules/db_postgres/km_dbase.c b/src/modules/db_postgres/km_dbase.c index 43bdab759f3..17d8dab37ff 100644 --- a/src/modules/db_postgres/km_dbase.c +++ b/src/modules/db_postgres/km_dbase.c @@ -2,6 +2,7 @@ * Copyright (C) 2003 August.Net Services, LLC * Copyright (C) 2006 Norman Brandinger * Copyright (C) 2008 1&1 Internet AG + * Copyright (C) 2017 Julien Chavanton, Flowroute * * This file is part of Kamailio, a free SIP server. * @@ -40,6 +41,7 @@ #include "../../lib/srdb1/db_query.h" #include "../../core/locking.h" #include "../../core/hashes.h" +#include "../../core/clist.h" #include "km_dbase.h" #include "km_pg_con.h" #include "km_val.h" @@ -49,6 +51,9 @@ static gen_lock_set_t *_pg_lock_set = NULL; static unsigned int _pg_lock_size = 0; +extern unsigned int sql_buffer_size; +static char *postgres_sql_buf = NULL; + /*! * \brief init lock set used to implement SQL REPLACE via UPDATE/INSERT * \param sz power of two to compute the lock set size @@ -82,6 +87,21 @@ void pg_destroy_lock_set(void) } } +int pg_alloc_buffer(void) +{ + if (postgres_sql_buf != NULL) { + LM_DBG("postgres_sql_buf not NULL on init\n"); + return 0; + } + LM_DBG("About to allocate postgres_sql_buf size = %d\n", sql_buffer_size); + postgres_sql_buf = pkg_malloc(sql_buffer_size); + if (postgres_sql_buf == NULL) { + LM_ERR("failed to allocate postgres_sql_buf\n"); + return -1; + } + return 1; +} + static void db_postgres_free_query(const db1_con_t* _con); @@ -626,6 +646,202 @@ int db_postgres_delete(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _ return ret; } +static pg_constraint_t *pg_constraint = NULL; + +/*! + * \brief add/save a detected constraint to the list in memory + * \param pg_constraint_t constraint + */ +static void db_postgres_constraint_add(pg_constraint_t *c) { + if (!pg_constraint) { + pg_constraint = c; + LM_DBG("adding init constraint [%s][%s][%s]\n", c->database.s, c->table.s, c->unique.s); + clist_init(pg_constraint, next, prev); + } else { + LM_DBG("adding append constraint [%s][%s][%s]\n", c->database.s, c->table.s, c->unique.s); + clist_append(pg_constraint, c, next, prev); + } +} + +static void db_postgres_constraint_destroy(pg_constraint_t *c) { + if (!c) + return; + if (c->database.s) + pkg_free(c->database.s); + if (c->table.s) + pkg_free(c->table.s); + if (c->unique.s) + pkg_free(c->unique.s); + pkg_free(c); + c = NULL; +} + +static pg_constraint_t *db_postgres_constraint_new(const char *db, const str *table, const char *unique) { + pg_constraint_t *c = pkg_malloc(sizeof(pg_constraint_t)); + if (!c) + return NULL; + memset(c, 0, sizeof(pg_constraint_t)); + + c->database.len = strlen(db); + c->database.s = pkg_malloc(c->database.len+1); + if (!c->database.s) goto error; + strcpy(c->database.s, db); + + c->table.len = table->len; + c->table.s = pkg_malloc(c->table.len+1); + if (!c->table.s) goto error; + strcpy(c->table.s, table->s); + + c->unique.len = strlen(unique); + c->unique.s = pkg_malloc(c->unique.len+1); + if (!c->unique.s) goto error; + strcpy(c->unique.s, unique); + + db_postgres_constraint_add(c); + return c; +error: + db_postgres_constraint_destroy(c); + return NULL; +} + +static pg_constraint_t *db_postgres_constraint_search(char *db, char *table) { + pg_constraint_t *c; + if (!pg_constraint) + return NULL; + clist_foreach(pg_constraint, c, next){ + LM_DBG("searching[%s][%s][%s]\n", c->database.s, c->table.s, c->unique.s); + if (strcmp(db, c->database.s) == 0 && strcmp(table, c->table.s) == 0) { + return c; + } + } + return NULL; +} + +static str sql_str = {0,0}; + +/*! + * \brief search for saved contraint or query pg_constraint to get the unique constraint + * \param _h structure representing database connection + */ +static char * db_postgres_constraint_get(const db1_con_t* _h) { + pg_constraint_t *constraint = db_postgres_constraint_search(PQdb(CON_CONNECTION(_h)), CON_TABLE(_h)->s); + if (constraint) { + return constraint->unique.s; + } + db1_res_t *res = NULL; + int ret; + ret = snprintf(postgres_sql_buf, sql_buffer_size, + "select conname, contype from pg_constraint where conrelid = " + "(select oid from pg_class where relname like '%s%.*s%s')", + CON_TQUOTESZ(_h), CON_TABLE(_h)->len, CON_TABLE(_h)->s, CON_TQUOTESZ(_h)); + + if (ret < 0 || ret >= sql_buffer_size) { + LM_ERR("error creating pg_constraint query, invalid size[%d]\n", ret); + return NULL; + } + + sql_str.len = ret; + sql_str.s = postgres_sql_buf; + + if (db_postgres_raw_query(_h, &sql_str, &res) < 0) { + LM_ERR("error executing pg_constraint query !\n"); + return NULL; + } + + struct db_row* rows = RES_ROWS(res); + const char *val = NULL; + const char *type = NULL; + int x; + for (x=0;xunique.s; +} + +/*! + * Insert a row into a specified table, update on duplicate key. + * \param _h structure representing database connection + * \param _k key names + * \param _v values of the keys + * \param _n number of key=value pairs + * + * As explained in the following article the design of "UPSERT" in PostgreSQL is requiring to be explicit about the constraint on which we accept to do update + * modification to Kamailio database framework/API would be required to expose which specific key constraint should be handled as an update + * http://pgeoghegan.blogspot.com/2015/10/avoid-naming-constraint-directly-when.html + */ + +int db_postgres_insert_update(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v, + const int _n) +{ + int off, ret; + + if ((!_h) || (!_k) || (!_v) || (!_n)) { + LM_ERR("invalid parameter value\n"); + return -1; + } + char * constraint = db_postgres_constraint_get(_h); + + ret = snprintf(postgres_sql_buf, sql_buffer_size, "insert into %s%.*s%s (", + CON_TQUOTESZ(_h), CON_TABLE(_h)->len, CON_TABLE(_h)->s, CON_TQUOTESZ(_h)); + if (ret < 0 || ret >= sql_buffer_size) goto error; + off = ret; + + ret = db_print_columns(postgres_sql_buf + off, sql_buffer_size - off, _k, _n, CON_TQUOTESZ(_h)); + if (ret < 0) return -1; + off += ret; + + ret = snprintf(postgres_sql_buf + off, sql_buffer_size - off, ") values ("); + if (ret < 0 || ret >= (sql_buffer_size - off)) goto error; + off += ret; + ret = db_print_values(_h, postgres_sql_buf + off, sql_buffer_size - off, _v, _n, db_postgres_val2str); + if (ret < 0) return -1; + off += ret; + + *(postgres_sql_buf + off++) = ')'; + + if (constraint) { + ret = snprintf(postgres_sql_buf + off, sql_buffer_size - off, + " on conflict on constraint %s do update set ", constraint); + if (ret < 0 || ret >= (sql_buffer_size - off)) goto error; + off += ret; + + ret = db_print_set(_h, postgres_sql_buf + off, sql_buffer_size - off, _k, _v, _n, db_postgres_val2str); + if (ret < 0) { + LM_ERR("error building query\n"); + return -1; + } + off += ret; + if (off + 1 > sql_buffer_size) goto error; + postgres_sql_buf[off] = '\0'; + } else { + ret = snprintf(postgres_sql_buf + off, sql_buffer_size - off, " on conflict do nothing "); + if (ret < 0 || ret >= (sql_buffer_size - off)) goto error; + off += ret; + } + + sql_str.s = postgres_sql_buf; + sql_str.len = off; + LM_DBG("query : %s\n", sql_str.s); + if (db_postgres_submit_query(_h, &sql_str) < 0) { + LM_ERR("error while submitting query\n"); + return -2; + } + return 0; + +error: + LM_ERR("error while preparing insert_update operation\n"); + return -1; +} /*! * Update some rows in the specified table diff --git a/src/modules/db_postgres/km_dbase.h b/src/modules/db_postgres/km_dbase.h index 9ab6e7039c9..e1caf9dba79 100644 --- a/src/modules/db_postgres/km_dbase.h +++ b/src/modules/db_postgres/km_dbase.h @@ -35,6 +35,16 @@ #include "../../lib/srdb1/db_op.h" #include "../../lib/srdb1/db_val.h" +/* + * Storage for all the unique constraints found by insert_update + */ +typedef struct db_pg_constraint_list { + struct db_pg_constraint_list* next; + struct db_pg_constraint_list* prev; + str database; + str table; + str unique; +} pg_constraint_t; /* * Initialize database connection @@ -91,6 +101,11 @@ int db_postgres_raw_query(const db1_con_t* _h, const str* _s, db1_res_t** _r); int db_postgres_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v, const int _n); +/* + * Insert and update ON CONFLICT + */ +int db_postgres_insert_update(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v, + const int _n); /* * Delete a row from table diff --git a/src/modules/db_postgres/km_pg_con.c b/src/modules/db_postgres/km_pg_con.c index 086eb38737b..456e7e15708 100644 --- a/src/modules/db_postgres/km_pg_con.c +++ b/src/modules/db_postgres/km_pg_con.c @@ -116,6 +116,10 @@ struct pg_con* db_postgres_new_connection(struct db_id* id) goto err; } + if (PQserverVersion(ptr->con) < 90500) { + LM_WARN("server version < 9.5 does not support insert_update\n"); + } + ptr->connected = 1; ptr->timestamp = time(0); ptr->id = id; diff --git a/src/modules/db_postgres/km_res.c b/src/modules/db_postgres/km_res.c index cd541d600bf..8e9b6288ff3 100644 --- a/src/modules/db_postgres/km_res.c +++ b/src/modules/db_postgres/km_res.c @@ -149,6 +149,7 @@ int db_postgres_get_columns(const db1_con_t* _h, db1_res_t* _r) case BOOLOID: case CHAROID: case VARCHAROID: + case NAMEOID: case BPCHAROID: LM_DBG("use DB1_STRING result type\n"); RES_TYPES(_r)[col] = DB1_STRING; diff --git a/src/modules/db_postgres/pg_mod.c b/src/modules/db_postgres/pg_mod.c index 8eca332ea29..1e88f9de5e7 100644 --- a/src/modules/db_postgres/pg_mod.c +++ b/src/modules/db_postgres/pg_mod.c @@ -534,6 +534,10 @@ int pg_test(void) int mod_register(char *path, int *dlflags, void *p1, void *p2) { + if(!pg_alloc_buffer()) { + LM_ERR("failed too allocate buffer"); + return -1; + } if(db_api_init()<0) return -1; return 0; From de0af95da9b253b88b983b853201fbce2df5b463 Mon Sep 17 00:00:00 2001 From: Julien Chavanton Date: Sat, 18 Mar 2017 13:16:15 -0700 Subject: [PATCH 2/2] usrloc: adding params db_load: enable/disable loading from the database on mod_init db_insert_update: insert into table, update on duplicate key --- src/modules/usrloc/doc/usrloc_admin.xml | 42 +++++++++++++++++++++++++ src/modules/usrloc/ucontact.c | 16 +++++++--- src/modules/usrloc/usrloc_mod.c | 7 ++++- src/modules/usrloc/usrloc_mod.h | 1 + 4 files changed, 61 insertions(+), 5 deletions(-) diff --git a/src/modules/usrloc/doc/usrloc_admin.xml b/src/modules/usrloc/doc/usrloc_admin.xml index 34aebee5b9a..bc4ab56f59c 100644 --- a/src/modules/usrloc/doc/usrloc_admin.xml +++ b/src/modules/usrloc/doc/usrloc_admin.xml @@ -687,6 +687,48 @@ modparam("usrloc", "db_mode", 2) +
+ <varname>db_load</varname> (integer) + + Determine if the usrloc module should load contacts from the database storage during module initialization + A value of 0 disable the loading from the database + + + + Default value is 1. + + + + Set <varname>db_load</varname> parameter + +... +modparam("usrloc", "db_load", "0") +... + + +
+ +
+ <varname>db_insert_update</varname> (integer) + + Determine if the usrloc module should do an update when a duplicate key is found while inserting + A value of 1 will activate update on duplicate key + + + + Default value is 0. + + + + Set <varname>db_insert_update</varname> parameter + +... +modparam("usrloc", "db_insert_update", "1") +... + + +
+
<varname>matching_mode</varname> (integer) diff --git a/src/modules/usrloc/ucontact.c b/src/modules/usrloc/ucontact.c index 630a4bd8c57..54cac0e4e01 100644 --- a/src/modules/usrloc/ucontact.c +++ b/src/modules/usrloc/ucontact.c @@ -679,10 +679,18 @@ int db_insert_ucontact(ucontact_t* _c) return -1; } - if (ul_dbf.insert(ul_dbh, keys, vals, nr_cols) < 0) { - LM_ERR("inserting contact in db failed %.*s (%.*s)\n", - _c->aor->len, ZSW(_c->aor->s), _c->ruid.len, ZSW(_c->ruid.s)); - return -1; + if (db_insert_update && ul_dbf.insert_update) { + if (ul_dbf.insert_update(ul_dbh, keys, vals, nr_cols) < 0) { + LM_ERR("inserting with update contact in db failed %.*s (%.*s)\n", + _c->aor->len, ZSW(_c->aor->s), _c->ruid.len, ZSW(_c->ruid.s)); + return -1; + } + } else { + if (ul_dbf.insert(ul_dbh, keys, vals, nr_cols) < 0) { + LM_ERR("inserting contact in db failed %.*s (%.*s)\n", + _c->aor->len, ZSW(_c->aor->s), _c->ruid.len, ZSW(_c->ruid.s)); + return -1; + } } if (ul_xavp_contact_name.s) { diff --git a/src/modules/usrloc/usrloc_mod.c b/src/modules/usrloc/usrloc_mod.c index 81346de631f..d43b714fd08 100644 --- a/src/modules/usrloc/usrloc_mod.c +++ b/src/modules/usrloc/usrloc_mod.c @@ -104,6 +104,7 @@ static int ul_preload_param(modparam_t type, void* val); extern int bind_usrloc(usrloc_api_t* api); int ul_db_update_as_insert = 0; + int ul_timer_procs = 0; int ul_db_check_update = 0; int ul_keepalive_timeout = 0; @@ -155,6 +156,8 @@ str ulattrs_last_mod_col = str_init(ULATTRS_LAST_MOD_COL); /*!< Name of column c str db_url = str_init(DEFAULT_DB_URL); /*!< Database URL */ int timer_interval = 60; /*!< Timer interval in seconds */ int db_mode = 0; /*!< Database sync scheme: 0-no db, 1-write through, 2-write back, 3-only db */ +int db_load = 1; /*!< Database load after restart: 1- true, 0- false (only the db_mode allows it) */ +int db_insert_update = 0; /*!< Database : update on duplicate key instead of error */ int use_domain = 0; /*!< Whether usrloc should use domain part of aor */ int desc_time_order = 0; /*!< By default do not enable timestamp ordering */ int handle_lost_tcp = 0; /*!< By default do not remove contacts before expiration time */ @@ -202,6 +205,8 @@ static param_export_t params[] = { {"db_url", PARAM_STR, &db_url }, {"timer_interval", INT_PARAM, &timer_interval }, {"db_mode", INT_PARAM, &db_mode }, + {"db_load", INT_PARAM, &db_load }, + {"db_insert_update", INT_PARAM, &db_insert_update }, {"use_domain", INT_PARAM, &use_domain }, {"desc_time_order", INT_PARAM, &desc_time_order }, {"user_agent_column", PARAM_STR, &user_agent_col}, @@ -416,7 +421,7 @@ static int child_init(int _rank) return -1; } /* _rank==PROC_SIPINIT is used even when fork is disabled */ - if (_rank==PROC_SIPINIT && db_mode!=DB_ONLY) { + if (_rank==PROC_SIPINIT && db_mode!=DB_ONLY && db_load) { /* if cache is used, populate domains from DB */ for( ptr=root ; ptr ; ptr=ptr->next) { if (preload_udomain(ul_dbh, ptr->d) < 0) { diff --git a/src/modules/usrloc/usrloc_mod.h b/src/modules/usrloc/usrloc_mod.h index 72a5a10b210..ccd9540ecfd 100644 --- a/src/modules/usrloc/usrloc_mod.h +++ b/src/modules/usrloc/usrloc_mod.h @@ -76,6 +76,7 @@ extern str ulattrs_last_mod_col; extern str db_url; extern int timer_interval; extern int db_mode; +extern int db_insert_update; extern int use_domain; extern int desc_time_order; extern int cseq_delay;