Skip to content

Commit

Permalink
dispatcher: new paraemter ds_db_extra_attrs
Browse files Browse the repository at this point in the history
- allow specifying database table columns to be loaded in the attrs
field
  • Loading branch information
miconda committed Jun 18, 2018
1 parent 927ddcf commit 433577d
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 11 deletions.
69 changes: 59 additions & 10 deletions src/modules/dispatcher/dispatch.c
Expand Up @@ -90,6 +90,7 @@ extern str ds_event_callback;
extern int ds_ping_latency_stats;
extern float ds_latency_estimator_alpha;
extern int ds_attrs_none;
extern param_t *ds_db_extra_attrs_list;

static db_func_t ds_dbf;
static db1_con_t *ds_db_handle = NULL;
Expand Down Expand Up @@ -913,17 +914,35 @@ int ds_load_db(void)
db1_res_t *res;
db_val_t *values;
db_row_t *rows;

db_key_t query_cols[5] = {&ds_set_id_col, &ds_dest_uri_col,
&ds_dest_flags_col, &ds_dest_priority_col, &ds_dest_attrs_col};
#define DS_DB_MAX_COLS 32
db_key_t query_cols[DS_DB_MAX_COLS];
param_t *pit=NULL;
int nc;
int plen;
#define DS_ATTRS_MAXSIZE 1024
char ds_attrs_buf[DS_ATTRS_MAXSIZE];

query_cols[0] = &ds_set_id_col;
query_cols[1] = &ds_dest_uri_col;
query_cols[2] = &ds_dest_flags_col;
query_cols[3] = &ds_dest_priority_col;
query_cols[4] = &ds_dest_attrs_col;

nrcols = 2;
if(_ds_table_version == DS_TABLE_VERSION2)
if(_ds_table_version == DS_TABLE_VERSION2) {
nrcols = 3;
else if(_ds_table_version == DS_TABLE_VERSION3)
} else if(_ds_table_version == DS_TABLE_VERSION3) {
nrcols = 4;
else if(_ds_table_version == DS_TABLE_VERSION4)
} else if(_ds_table_version == DS_TABLE_VERSION4) {
nrcols = 5;
for(pit = ds_db_extra_attrs_list; pit!=NULL; pit=pit->next) {
if(nrcols>=DS_DB_MAX_COLS) {
LM_ERR("too many db columns: %d\n", nrcols);
return -1;
}
query_cols[nrcols++] = &pit->body;
}
}

if((*crt_idx) != (*next_idx)) {
LM_WARN("load command already generated, aborting reload...\n");
Expand All @@ -940,6 +959,8 @@ int ds_load_db(void)
return -1;
}

LM_DBG("loading dispatcher db records - nrcols: %d\n", nrcols);

/*select the whole table and all the columns*/
if(ds_dbf.query(ds_db_handle, 0, 0, 0, query_cols, 0, nrcols, 0, &res)
< 0) {
Expand All @@ -949,8 +970,9 @@ int ds_load_db(void)

nr_rows = RES_ROW_N(res);
rows = RES_ROWS(res);
if(nr_rows == 0)
if(nr_rows == 0) {
LM_WARN("no dispatching data in the db -- empty destination set\n");
}

setn = 0;
*next_idx = (*crt_idx + 1) % 2;
Expand All @@ -971,10 +993,37 @@ int ds_load_db(void)

attrs.s = 0;
attrs.len = 0;
if(nrcols >= 5 && !VAL_NULL(values + 4)) {
attrs.s = VAL_STR(values + 4).s;
if(attrs.s) attrs.len = strlen(attrs.s);
if(nrcols >= 5) {
if(!VAL_NULL(values + 4)) {
attrs.s = VAL_STR(values + 4).s;
if(attrs.s) attrs.len = strlen(attrs.s);
}
if(ds_db_extra_attrs_list!=NULL && nrcols > 5) {
if(attrs.len>0) {
memcpy(ds_attrs_buf, attrs.s, attrs.len);
if(ds_attrs_buf[attrs.len-1]!=';') {
ds_attrs_buf[attrs.len++] = ';';
}
}
attrs.s = ds_attrs_buf;
pit = ds_db_extra_attrs_list;
for(nc = 5; nc<nrcols && pit!=NULL; nc++) {
if(!VAL_NULL(values + nc) && strlen(VAL_STRING(values + nc))>0) {
plen = snprintf(attrs.s + attrs.len,
DS_ATTRS_MAXSIZE - attrs.len - 1,
"%.*s=%s;", pit->name.len, pit->name.s,
VAL_STRING(values + nc));
if(plen<=0 || plen>=DS_ATTRS_MAXSIZE - attrs.len - 1) {
LM_ERR("cannot build attrs buffer\n");
goto err2;
}
attrs.len+=plen;
}
pit = pit->next;
}
}
}
LM_DBG("attributes string: [%.*s]\n", attrs.len, (attrs.s)?attrs.s:"");
if(add_dest2list(id, uri, flags, priority, &attrs, *next_idx, &setn)
!= 0) {
dest_errs++;
Expand Down
23 changes: 22 additions & 1 deletion src/modules/dispatcher/dispatcher.c
Expand Up @@ -137,6 +137,8 @@ str ds_attrs_pvname = STR_NULL;
pv_spec_t ds_attrs_pv;

str ds_event_callback = STR_NULL;
str ds_db_extra_attrs = STR_NULL;
param_t *ds_db_extra_attrs_list = NULL;

/** module functions */
static int mod_init(void);
Expand Down Expand Up @@ -255,11 +257,12 @@ static param_export_t params[]={
{"ds_hash_expire", INT_PARAM, &ds_hash_expire},
{"ds_hash_initexpire", INT_PARAM, &ds_hash_initexpire},
{"ds_hash_check_interval", INT_PARAM, &ds_hash_check_interval},
{"outbound_proxy", PARAM_STR, &ds_outbound_proxy},
{"outbound_proxy", PARAM_STR, &ds_outbound_proxy},
{"ds_default_socket", PARAM_STR, &ds_default_socket},
{"ds_timer_mode", PARAM_INT, &ds_timer_mode},
{"event_callback", PARAM_STR, &ds_event_callback},
{"ds_attrs_none", PARAM_INT, &ds_attrs_none},
{"ds_db_extra_attrs", PARAM_STR, &ds_db_extra_attrs},
{0,0,0}
};

Expand Down Expand Up @@ -288,6 +291,8 @@ static int mod_init(void)
{
str host;
int port, proto;
param_hooks_t phooks;
param_t *pit = NULL;

if(ds_ping_active_init() < 0) {
return -1;
Expand Down Expand Up @@ -344,6 +349,22 @@ static int mod_init(void)
return -1;

if(ds_db_url.s) {
if(ds_db_extra_attrs.s!=NULL && ds_db_extra_attrs.len>2) {
if(ds_db_extra_attrs.s[ds_db_extra_attrs.len-1]==';') {
ds_db_extra_attrs.len--;
}
if (parse_params(&ds_db_extra_attrs, CLASS_ANY, &phooks,
&ds_db_extra_attrs_list)<0) {
LM_ERR("failed to parse extra attrs parameter\n");
return -1;
}
for(pit = ds_db_extra_attrs_list; pit!=NULL; pit=pit->next) {
if(pit->body.s==NULL || pit->body.len<=0) {
LM_ERR("invalid db extra attrs parameter\n");
return -1;
}
}
}
if(init_ds_db() != 0) {
LM_ERR("could not initiate a connect to the database\n");
return -1;
Expand Down

0 comments on commit 433577d

Please sign in to comment.