diff --git a/src/modules/db_redis/redis_connection.c b/src/modules/db_redis/redis_connection.c index 6743a97cf16..ad2921f1d57 100644 --- a/src/modules/db_redis/redis_connection.c +++ b/src/modules/db_redis/redis_connection.c @@ -23,6 +23,7 @@ #include "db_redis_mod.h" #include "redis_connection.h" #include "redis_table.h" +#include "redis_dbase.h" extern int db_redis_verbosity; @@ -170,6 +171,31 @@ int db_redis_connect(km_redis_con_t *con) { freeReplyObject(reply); reply = NULL; LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s); + reply = redisCommand(con->con, "SCRIPT LOAD %s", SREM_KEY_LUA); + if (!reply) { + LM_ERR("failed to load LUA script to server %.*s: %s\n", + con->id->url.len, con->id->url.s, con->con->errstr); + goto err; + } + if (reply->type == REDIS_REPLY_ERROR) { + LM_ERR("failed to load LUA script to server %.*s: %s\n", + con->id->url.len, con->id->url.s, reply->str); + goto err; + } + if (reply->type != REDIS_REPLY_STRING) { + LM_ERR("failed to load LUA script to server %.*s: %i\n", + con->id->url.len, con->id->url.s, reply->type); + goto err; + } + if (reply->len >= sizeof(con->srem_key_lua)) { + LM_ERR("failed to load LUA script to server %.*s: %i >= %i\n", + con->id->url.len, con->id->url.s, (int) reply->len, (int) sizeof(con->srem_key_lua)); + goto err; + } + strcpy(con->srem_key_lua, reply->str); + freeReplyObject(reply); reply = NULL; + LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s); + return 0; err: diff --git a/src/modules/db_redis/redis_connection.h b/src/modules/db_redis/redis_connection.h index b137d80e6b3..22392838c56 100644 --- a/src/modules/db_redis/redis_connection.h +++ b/src/modules/db_redis/redis_connection.h @@ -66,6 +66,7 @@ typedef struct km_redis_con { redis_command_t *command_queue; unsigned int append_counter; struct str_hash_table tables; + char srem_key_lua[41]; // sha-1 hex string } km_redis_con_t; diff --git a/src/modules/db_redis/redis_dbase.c b/src/modules/db_redis/redis_dbase.c index 97fd19ee010..919cde71c37 100644 --- a/src/modules/db_redis/redis_dbase.c +++ b/src/modules/db_redis/redis_dbase.c @@ -27,6 +27,8 @@ #include "redis_dbase.h" #include "redis_table.h" +#define TIMESTAMP_STR_LENGTH 19 + static void db_redis_dump_reply(redisReply *reply) { int i; if (reply->type == REDIS_REPLY_STRING) { @@ -107,14 +109,14 @@ static int db_redis_val2str(const db_val_t *v, str *_str) { LM_DBG("converting bigint value %lld to str\n", VAL_BIGINT(v)); _str->s = (char*)pkg_malloc(_str->len); if (!_str->s) goto memerr; - snprintf(_str->s, _str->len, "%lld", VAL_BIGINT(v)); + snprintf(_str->s, _str->len, "%010lld", VAL_BIGINT(v)); _str->len = strlen(_str->s); break; case DB1_UBIGINT: LM_DBG("converting ubigint value %llu to str\n", VAL_UBIGINT(v)); _str->s = (char*)pkg_malloc(_str->len); if (!_str->s) goto memerr; - snprintf(_str->s, _str->len, "%llu", VAL_UBIGINT(v)); + snprintf(_str->s, _str->len, "%010llu", VAL_UBIGINT(v)); _str->len = strlen(_str->s); break; case DB1_STRING: @@ -279,8 +281,9 @@ static int db_redis_build_entry_manual_keys(redis_table_t *table, const db_key_t } static int db_redis_find_query_key(redis_key_t *key, const str *table_name, - str *type_name, const db_key_t *_k, const db_val_t *_v, const int _n, - str *key_name, int *key_found) { + redis_table_t *table, + str *type_name, const db_key_t *_k, const db_val_t *_v, const db_op_t *_op, const int _n, + str *key_name, int *key_found, uint64_t *ts_scan_start) { unsigned int len; str val = {NULL, 0}; @@ -297,11 +300,19 @@ static int db_redis_find_query_key(redis_key_t *key, const str *table_name, for (i = 0; i < _n; ++i) { const db_key_t k = _k[i]; const db_val_t v = _v[i]; + const db_op_t op = _op ? _op[i] : NULL; if (VAL_NULL(&v)) { LM_DBG("Skipping null value for given key '%.*s'\n", k->len, k->s); break; + } else if (op && strcmp(op, OP_EQ) + && !((VAL_TYPE(&v) == DB1_DATETIME || VAL_TYPE(&v) == DB1_BIGINT + || VAL_TYPE(&v) == DB1_UBIGINT) + && (!strcmp(op, OP_LT) || !strcmp(op, OP_GT)))) { + LM_DBG("Skipping non-EQ op (%s) for given key '%.*s'\n", + op, k->len, k->s); + break; } else if (!str_strcmp(&key->key, (str*)k)) { LM_DBG("found key in entry key\n"); if (db_redis_val2str(&v, &val) != 0) goto err; @@ -311,14 +322,15 @@ static int db_redis_find_query_key(redis_key_t *key, const str *table_name, break; } if (!key_name->len) { - // ::: - len = table_name->len + 1 + type_name->len + 2 + val.len + 1; //snprintf writes term 0 char + // :::: + len = table->version_code.len + table_name->len + 1 + type_name->len + 2 + val.len + 1; //snprintf writes term 0 char key_name->s = (char*)pkg_malloc(len); if (!key_name->s) { LM_ERR("Failed to allocate key memory\n"); goto err; } - snprintf(key_name->s, len, "%.*s:%.*s::%.*s", + snprintf(key_name->s, len, "%.*s%.*s:%.*s::%.*s", + table->version_code.len, table->version_code.s, table_name->len, table_name->s, type_name->len, type_name->s, val.len, val.s); @@ -335,6 +347,39 @@ static int db_redis_find_query_key(redis_key_t *key, const str *table_name, val.len, val.s); key_name->len += (1 + val.len); } + if (op && (VAL_TYPE(&v) == DB1_DATETIME || VAL_TYPE(&v) == DB1_BIGINT + || VAL_TYPE(&v) == DB1_UBIGINT) + && (!strcmp(op, OP_LT) || !strcmp(op, OP_GT))) { + // Special case: we support matching < or > against timestamps and ints using a special + // key scanning method. We do this only for a single timestamp/int occurance, and we + // still do a table scan, just not a full table scan. + if (!ts_scan_start) { + LM_DBG("key '%.*s' for type '%.*s' found as timestamp or int, but table scans " + "not supported, unable to use this type\n", + key->key.len, key->key.s, type_name->len, type_name->s); + break; + } + // ts_scan_start is: 31 bits of current full key length, 31 bits of this value length, + // one bit of directionality, one bit of length variable indicator + if (VAL_TYPE(&v) == DB1_DATETIME && *ts_scan_start == 0 && val.len == TIMESTAMP_STR_LENGTH) { + *ts_scan_start = key_name->len | ((uint64_t) TIMESTAMP_STR_LENGTH << 31); + if (!strcmp(op, OP_LT)) + *ts_scan_start |= 0x8000000000000000ULL; + LM_DBG("preparing for timestamp range scan at key offset %llx\n", + (unsigned long long) *ts_scan_start); + *key_found = 0; // this forces a table scan using the new match key + } + else if ((VAL_TYPE(&v) == DB1_BIGINT + || VAL_TYPE(&v) == DB1_UBIGINT) && *ts_scan_start == 0) { + *ts_scan_start = key_name->len | ((uint64_t) val.len << 31); + *ts_scan_start |= 0x4000000000000000ULL; // length is variable + if (!strcmp(op, OP_LT)) + *ts_scan_start |= 0x8000000000000000ULL; + LM_DBG("preparing for int range scan at key offset %llx\n", + (unsigned long long) *ts_scan_start); + *key_found = 0; // this forces a table scan using the new match key + } + } LM_DBG("entry key so far is '%.*s'\n", key_name->len, key_name->s); subkey_found = 1; pkg_free(val.s); @@ -355,6 +400,22 @@ static int db_redis_find_query_key(redis_key_t *key, const str *table_name, } } + // for value-less master keys + if (!key_name->len) { + // :: + len = table->version_code.len + table_name->len + 1 + type_name->len + 1; + key_name->s = (char*)pkg_malloc(len); + if (!key_name->s) { + LM_ERR("Failed to allocate key memory\n"); + goto err; + } + snprintf(key_name->s, len, "%.*s%.*s:%.*s", + table->version_code.len, table->version_code.s, + table_name->len, table_name->s, + type_name->len, type_name->s); + key_name->len = len-1; + } + return 0; err: @@ -389,7 +450,7 @@ static int db_redis_build_entry_keys(km_redis_con_t *con, const str *table_name, } table = (redis_table_t*)table_e->u.p; key = table->entry_keys; - if (db_redis_find_query_key(key, table_name, &type_name, _k, _v, _n, &keyname, &key_found) != 0) { + if (db_redis_find_query_key(key, table_name, table, &type_name, _k, _v, NULL, _n, &keyname, &key_found, NULL) != 0) { goto err; } if (key_found) { @@ -400,11 +461,12 @@ static int db_redis_build_entry_keys(km_redis_con_t *con, const str *table_name, LM_DBG("found suitable entry key '%.*s' for query\n", (*keys)->key.len, (*keys)->key.s); *keys_count = 1; - pkg_free(keyname.s); } else { LM_ERR("Failed to create direct entry key, no matching key definition\n"); goto err; } + if (keyname.s) + pkg_free(keyname.s); return 0; @@ -453,7 +515,7 @@ static int db_redis_get_keys_for_all_types(km_redis_con_t *con, const str *table static int db_redis_build_type_keys(km_redis_con_t *con, const str *table_name, const db_key_t *_k, const db_val_t *_v, const int _n, - redis_key_t **keys, int *keys_count) { + redis_key_t **keys, redis_key_t **set_keys, int *keys_count) { struct str_hash_entry *table_e; redis_table_t *table; @@ -479,7 +541,7 @@ static int db_redis_build_type_keys(km_redis_con_t *con, const str *table_name, str keyname = {NULL, 0}; key = type->keys; - if (db_redis_find_query_key(key, table_name, &type->type, _k, _v, _n, &keyname, &key_found) != 0) { + if (db_redis_find_query_key(key, table_name, table, &type->type, _k, _v, NULL, _n, &keyname, &key_found, NULL) != 0) { goto err; } if (key_found) { @@ -491,8 +553,29 @@ static int db_redis_build_type_keys(km_redis_con_t *con, const str *table_name, LM_DBG("found key '%.*s' for type '%.*s'\n", keyname.len, keyname.s, type_name->len, type_name->s); - pkg_free(keyname.s); + + if (set_keys) { + // add key for parent set + // :::index:: + pkg_free(keyname.s); + keyname.len = table->version_code.len + table_name->len + 9 + type->type.len; + keyname.s = pkg_malloc(keyname.len + 1); + if (!keyname.s) { + LM_ERR("Failed to allocate memory for parent set key\n"); + goto err; + } + sprintf(keyname.s, "%.*s%.*s::index::%.*s", + table->version_code.len, table->version_code.s, + table_name->len, table_name->s, + type->type.len, type->type.s); + if (db_redis_key_add_str(set_keys, &keyname) != 0) { + LM_ERR("Failed to add query key to set key list\n"); + goto err; + } + } } + if (keyname.s) + pkg_free(keyname.s); } return 0; @@ -506,7 +589,7 @@ static int db_redis_build_type_keys(km_redis_con_t *con, const str *table_name, static int db_redis_build_query_keys(km_redis_con_t *con, const str *table_name, const db_key_t *_k, const db_val_t *_v, const db_op_t *_op, const int _n, redis_key_t **query_keys, int *query_keys_count, int **manual_keys, int *manual_keys_count, - int *do_table_scan) { + int *do_table_scan, uint64_t *ts_scan_start, str *ts_scan_key) { struct str_hash_entry *table_e; redis_table_t *table; @@ -536,7 +619,7 @@ static int db_redis_build_query_keys(km_redis_con_t *con, const str *table_name, keyname.len = 0; key = table->entry_keys; - if (db_redis_find_query_key(key, table_name, &typename, _k, _v, _n, &keyname, &key_found) != 0) { + if (db_redis_find_query_key(key, table_name, table, &typename, _k, _v, _op, _n, &keyname, &key_found, NULL) != 0) { goto err; } if (key_found) { @@ -550,11 +633,15 @@ static int db_redis_build_query_keys(km_redis_con_t *con, const str *table_name, pkg_free(keyname.s); keyname.s = NULL; } else { + if (keyname.s) + pkg_free(keyname.s); + keyname.s = NULL; LM_DBG("no direct entry key found, checking type keys\n"); for (type = table->types; type; type = type->next) { key = type->keys; LM_DBG("checking type '%.*s'\n", type->type.len, type->type.s); - if (db_redis_find_query_key(key, table_name, &type->type, _k, _v, _n, &keyname, &key_found) != 0) { + if (db_redis_find_query_key(key, table_name, table, &type->type, _k, _v, _op, _n, &keyname, + &key_found, ts_scan_start) != 0) { goto err; } if (key_found) { @@ -592,7 +679,7 @@ static int db_redis_build_query_keys(km_redis_con_t *con, const str *table_name, redisReply *subreply = reply->element[i]; if (subreply->type == REDIS_REPLY_STRING) { LM_DBG("adding resulting entry key '%s' from type query\n", subreply->str); - if (db_redis_key_add_string(query_keys, subreply->str, strlen(subreply->str)) != 0) { + if (db_redis_key_prepend_string(query_keys, subreply->str, strlen(subreply->str)) != 0) { LM_ERR("Failed to add query key\n"); goto err; } @@ -610,6 +697,16 @@ static int db_redis_build_query_keys(km_redis_con_t *con, const str *table_name, db_redis_free_reply(&reply); break; } + else if (keyname.s && *ts_scan_start) { + LM_DBG("will use key '%.*s' at offset %llx for timestamp/int range scan\n", + keyname.len, keyname.s, (unsigned long long) *ts_scan_start); + *ts_scan_key = keyname; + keyname.s = NULL; + } + else if (keyname.s) { + pkg_free(keyname.s); + keyname.s = NULL; + } } } @@ -639,36 +736,30 @@ static int db_redis_build_query_keys(km_redis_con_t *con, const str *table_name, return -1; } -static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name, - const db_key_t *_k, const int _n, +static int db_redis_scan_query_keys_pattern(km_redis_con_t *con, const str *match_pattern, + const int _n, redis_key_t **query_keys, int *query_keys_count, - int **manual_keys, int *manual_keys_count) { + int **manual_keys, int *manual_keys_count, unsigned int match_count_start_val) { size_t i = 0; redis_key_t *query_v = NULL; - char cursor_str[32] = ""; redisReply *reply = NULL; - unsigned long cursor = 0; - char *match = NULL; + redisReply *keys_list = NULL; size_t j; int l; - str match_pattern = {":entry::*", strlen(":entry::*")}; - *query_keys = NULL; - *query_keys_count = 0; - *manual_keys = NULL; - *manual_keys_count = 0; +#undef USE_SCAN + +#ifdef USE_SCAN + + char cursor_str[32] = ""; + unsigned long cursor = 0; + unsigned int match_count = match_count_start_val; + char match_count_str[16]; do { snprintf(cursor_str, sizeof(cursor_str), "%lu", cursor); - match = (char*)pkg_malloc(table_name->len + match_pattern.len + 1); - if (!match) { - LM_ERR("Failed to allocate memory for match pattern\n"); - goto err; - } - snprintf(match, table_name->len + match_pattern.len + 1, "%s%s\n", - table_name->s, match_pattern.s); if (db_redis_key_add_string(&query_v, "SCAN", 4) != 0) { LM_ERR("Failed to add scan command to scan query\n"); @@ -682,7 +773,7 @@ static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name, LM_ERR("Failed to add match command to scan query\n"); goto err; } - if (db_redis_key_add_string(&query_v, match, strlen(match)) != 0) { + if (db_redis_key_add_string(&query_v, match_pattern->s, match_pattern->len) != 0) { LM_ERR("Failed to add match pattern to scan query\n"); goto err; } @@ -690,23 +781,27 @@ static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name, LM_ERR("Failed to add count command to scan query\n"); goto err; } - if (db_redis_key_add_string(&query_v, "1000", 5) != 0) { + l = snprintf(match_count_str, sizeof(match_count_str), "%u", match_count); + if (l <= 0) { + LM_ERR("Failed to print integer for scan query\n"); + goto err; + } + if (db_redis_key_add_string(&query_v, match_count_str, l) != 0) { LM_ERR("Failed to add count value to scan query\n"); goto err; } - pkg_free(match); match = NULL; reply = db_redis_command_argv(con, query_v); db_redis_key_free(&query_v); db_redis_check_reply(con, reply, err); if (reply->type != REDIS_REPLY_ARRAY) { LM_ERR("Invalid reply type for scan on table '%.*s', expected array\n", - table_name->len, table_name->s); + match_pattern->len, match_pattern->s); goto err; } if (reply->elements != 2) { LM_ERR("Invalid number of reply elements for scan on table '%.*s', expected 2, got %lu\n", - table_name->len, table_name->s, reply->elements); + match_pattern->len, match_pattern->s, reply->elements); goto err; } @@ -717,34 +812,50 @@ static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name, cursor = reply->element[0]->integer; } else { LM_ERR("Invalid cursor type for scan on table '%.*s', expected string or integer\n", - table_name->len, table_name->s); + match_pattern->len, match_pattern->s); goto err; } LM_DBG("cursor is %lu\n", cursor); - if (reply->element[1]->type != REDIS_REPLY_ARRAY) { + keys_list = reply->element[1]; + +#else // use KEYS + + if (db_redis_key_add_string(&query_v, "KEYS", 4) != 0) { + LM_ERR("Failed to add scan command to scan query\n"); + goto err; + } + if (db_redis_key_add_string(&query_v, match_pattern->s, match_pattern->len) != 0) { + LM_ERR("Failed to add match pattern to scan query\n"); + goto err; + } + + reply = db_redis_command_argv(con, query_v); + db_redis_key_free(&query_v); + db_redis_check_reply(con, reply, err); + + keys_list = reply; + +#endif + + if (keys_list->type != REDIS_REPLY_ARRAY) { LM_ERR("Invalid content type for scan on table '%.*s', expected array\n", - table_name->len, table_name->s); + match_pattern->len, match_pattern->s); goto err; } - if (reply->element[1]->elements == 0) { - LM_DBG("no matching entries found for scan on table '%.*s'\n", - table_name->len, table_name->s); - return 0; - } - *query_keys_count += reply->element[1]->elements; + *query_keys_count += keys_list->elements; - for (j = 0; j < reply->element[1]->elements; ++i, ++j) { - redisReply *key = reply->element[1]->element[j]; + for (j = 0; j < keys_list->elements; ++i, ++j) { + redisReply *key = keys_list->element[j]; if (!key) { LM_ERR("Invalid null key at cursor result index %lu while scanning table '%.*s'\n", - j, table_name->len, table_name->s); + j, match_pattern->len, match_pattern->s); goto err; } if (key->type != REDIS_REPLY_STRING) { LM_ERR("Invalid key type at cursor result index %lu while scanning table '%.*s', expected string\n", - j, table_name->len, table_name->s); + j, match_pattern->len, match_pattern->s); goto err; } if (db_redis_key_prepend_string(query_keys, key->str, strlen(key->str)) != 0) { @@ -752,19 +863,34 @@ static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name, goto err; } } + +#ifdef USE_SCAN + // exponential increase and falloff, hovering around 1000 results + if (keys_list->elements > 1300 && match_count > 500) + match_count /= 2; + else if (keys_list->elements < 700 && match_count < 500000) + match_count *= 2; +#endif + db_redis_free_reply(&reply); + +#ifdef USE_SCAN } while (cursor > 0); +#endif // for full table scans, we have to manually match all given keys - *manual_keys_count = _n; - *manual_keys = (int*)pkg_malloc(*manual_keys_count * sizeof(int)); - if (! *manual_keys) { - LM_ERR("Failed to allocate memory for manual keys\n"); - goto err; - } - memset(*manual_keys, 0, *manual_keys_count * sizeof(int)); - for (l = 0; l < _n; ++l) { - (*manual_keys)[l] = l; + // but only do this once for repeated invocations + if (!*manual_keys) { + *manual_keys_count = _n; + *manual_keys = (int*)pkg_malloc(*manual_keys_count * sizeof(int)); + if (! *manual_keys) { + LM_ERR("Failed to allocate memory for manual keys\n"); + goto err; + } + memset(*manual_keys, 0, *manual_keys_count * sizeof(int)); + for (l = 0; l < _n; ++l) { + (*manual_keys)[l] = l; + } } if (reply) { @@ -775,8 +901,6 @@ static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name, return 0; err: - if (match) - pkg_free(match); if (reply) db_redis_free_reply(&reply); db_redis_key_free(&query_v); @@ -789,6 +913,235 @@ static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name, return -1; } +static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name, + const int _n, + redis_key_t **query_keys, int *query_keys_count, + int **manual_keys, int *manual_keys_count, uint64_t ts_scan_start, const str *ts_scan_key) { + + struct str_hash_entry *table_e; + redis_table_t *table; + char *match = NULL; + int ret; + redisReply *reply = NULL; + + *query_keys = NULL; + *query_keys_count = 0; + *manual_keys = NULL; + *manual_keys_count = 0; + redis_key_t *set_keys = NULL; + int set_keys_count = 0; + + table_e = str_hash_get(&con->tables, table_name->s, table_name->len); + if (!table_e) { + LM_ERR("query to undefined table '%.*s', define it in schema file!\n", + table_name->len, table_name->s); + return -1; + } + table = (redis_table_t*)table_e->u.p; + + if (!ts_scan_start) { + // full table scan + match = (char*)pkg_malloc(table->version_code.len + + table_name->len + 10); // length of ':entry::*' plus \0 + if (!match) { + LM_ERR("Failed to allocate memory for match pattern\n"); + return -1; + } + int len = sprintf(match, "%.*s%.*s:entry::*", + table->version_code.len, table->version_code.s, + table_name->len, table_name->s); + str match_pattern = {match, len}; + ret = db_redis_scan_query_keys_pattern(con, &match_pattern, _n, query_keys, query_keys_count, + manual_keys, manual_keys_count, 1000); + pkg_free(match); + return ret; + } + + // timestamp range scan + // ex: 2019-07-17 17:33:16 + // if >, we match: [3-9]???-??-?? ??:??:??, 2[1-9]??-??-?? ??:??:??, 20[2-9]?-??-?? ??:??:??, etc + // if <, we match: [0-1]???-??-?? ??:??:??, 200..., 201[0-8]..., etc + // the maximum match string length is ts_scan_key->len with one character replaced by 5 ('[a-b]') + // + // int range scan + // ex: 12345 + // if >, we match: 2????, 1[3-9]???, ..., plus ?????* + // if <. we match: ?, ??, ???, ????, 1[0-1]???, 12[0-2]??, etc + // ... however we expect a minimum length of 10 digits as per BIGINT printf format + + match = pkg_malloc(ts_scan_key->len + 6); + if (!match) { + LM_ERR("Failed to allocate memory for match pattern\n"); + return -1; + } + + int scan_lt = (ts_scan_start & 0x8000000000000000ULL) ? 1 : 0; + int scan_len_variable = (ts_scan_start & 0x4000000000000000ULL) ? 1 : 0; + unsigned int scan_offset = ts_scan_start & 0x7fffffffULL; + unsigned int scan_length = (ts_scan_start >> 31) & 0x7fffffffULL; + scan_offset -= scan_length; + const char *suffix = ts_scan_key->s + scan_offset + scan_length; + + LM_DBG("running timestamp/int range matching: lt %i, lv %i, off %u, len %u\n", + scan_lt, scan_len_variable, scan_offset, scan_length); + + if (scan_lt && scan_len_variable) { + // match shorter strings + + // copy unchanged prefix + memcpy(match, ts_scan_key->s, scan_offset); + + // append a number of ?. minimum string length is 10 digits + for (int i = 0; i < scan_length - 1; i++) { + int len = scan_offset + i; + char match_char = ts_scan_key->s[len]; + // skip non-numbers + if (match_char < '0' || match_char > '9') { + match[len] = match_char; + continue; + } + // append a single ? + match[len] = '?'; + // append unchanged suffix + strcpy(match + len + 1, suffix); + len = strlen(match); + + // minimum bigint printf string length + if (i < 10) + continue; + + str match_pattern = {match, len}; + LM_DBG("running timestamp/int range matching using pattern '%.*s'\n", len, match); + + ret = db_redis_scan_query_keys_pattern(con, &match_pattern, _n, &set_keys, &set_keys_count, + manual_keys, manual_keys_count, 5000); + if (ret) + goto out; + } + } + + for (int i = 0; i < scan_length; i++) { + int len = scan_offset + i; + char match_char = ts_scan_key->s[len]; + // skip non-numbers + if (match_char < '0' || match_char > '9') + continue; + // skip numbers that are at the edge of their match range + if (match_char == '0' && scan_lt) + continue; + if (match_char == '1' && scan_lt && i == 0) // no leading 0 + continue; + if (match_char == '9' && !scan_lt) + continue; + + // copy unchanged prefix + memcpy(match, ts_scan_key->s, len); + // append range matcher + if (scan_lt) + len += sprintf(match + len, "[0-%c]", match_char - 1); + else + len += sprintf(match + len, "[%c-9]", match_char + 1); + // finish with trailing ?s + for (int j = i + 1; j < scan_length; j++) { + match_char = ts_scan_key->s[scan_offset + j]; + // skip non-numbers + if (match_char < '0' || match_char > '9') { + match[len++] = match_char; + continue; + } + match[len++] = '?'; + } + // append unchanged suffix + strcpy(match + len, suffix); + len = strlen(match); + + str match_pattern = {match, len}; + LM_DBG("running timestamp/int range matching using pattern '%.*s'\n", len, match); + + ret = db_redis_scan_query_keys_pattern(con, &match_pattern, _n, &set_keys, &set_keys_count, + manual_keys, manual_keys_count, 5000); + if (ret) + goto out; + } + + if (!scan_lt && scan_len_variable) { + // match longer strings + int len = sprintf(match, "%.*s*%s", scan_offset + scan_length, ts_scan_key->s, suffix); + + str match_pattern = {match, len}; + LM_DBG("running timestamp/int range matching using pattern '%.*s'\n", len, match); + + ret = db_redis_scan_query_keys_pattern(con, &match_pattern, _n, &set_keys, &set_keys_count, + manual_keys, manual_keys_count, 5000); + if (ret) + goto out; + } + + // we not have a list of matching type keys in set_keys. now we have to iterate through them + // and retrieve the set members, and finally build our actual key list + + ret = -1; + + for (redis_key_t *set_key = set_keys; set_key; set_key = set_key->next) { + LM_DBG("pulling set members from key '%.*s'\n", set_key->key.len, set_key->key.s); + + redis_key_t *query_v = NULL; + if (db_redis_key_add_string(&query_v, "SMEMBERS", 8) != 0) { + LM_ERR("Failed to add smembers command to query\n"); + db_redis_key_free(&query_v); + goto out; + } + if (db_redis_key_add_str(&query_v, &set_key->key) != 0) { + LM_ERR("Failed to add key name to smembers query\n"); + db_redis_key_free(&query_v); + goto out; + } + + reply = db_redis_command_argv(con, query_v); + db_redis_key_free(&query_v); + db_redis_check_reply(con, reply, out); + + if (reply->type != REDIS_REPLY_ARRAY) { + LM_ERR("Unexpected reply for type query, expecting an array\n"); + goto out; + } + + LM_DBG("adding %i keys returned from set", (int) reply->elements); + + for (int i = 0; i < reply->elements; i++) { + if (reply->element[i]->type != REDIS_REPLY_STRING) { + LM_ERR("Unexpected entry key type in type query, expecting a string\n"); + goto out; + } + if (db_redis_key_prepend_string(query_keys, reply->element[i]->str, strlen(reply->element[i]->str)) + != 0) { + LM_ERR("Failed to prepend redis key\n"); + goto out; + } + LM_DBG("adding key '%s'\n", reply->element[i]->str); + } + *query_keys_count += reply->elements; + + db_redis_free_reply(&reply); + } + + ret = 0; + +out: + pkg_free(match); + db_redis_key_free(&set_keys); + db_redis_free_reply(&reply); + if (ret) { + db_redis_key_free(query_keys); + *query_keys_count = 0; + if (*manual_keys) { + pkg_free(*manual_keys); + *manual_keys = NULL; + } + } + return ret; +} + static int db_redis_compare_column(db_key_t k, db_val_t *v, db_op_t op, redisReply *reply) { int i_value; long long ll_value; @@ -1073,7 +1426,8 @@ static int db_redis_perform_query(const db1_con_t* _h, km_redis_con_t *con, cons const db_val_t* _v, const db_op_t *_op, const db_key_t* _c, const int _n, const int _nc, db1_res_t** _r, redis_key_t **keys, int *keys_count, - int **manual_keys, int *manual_keys_count, int do_table_scan) { + int **manual_keys, int *manual_keys_count, int do_table_scan, uint64_t ts_scan_start, + const str *ts_scan_key) { redisReply *reply = NULL; redis_key_t *query_v = NULL; @@ -1101,9 +1455,9 @@ static int db_redis_perform_query(const db1_con_t* _h, km_redis_con_t *con, cons LM_WARN(" scan key %d is '%.*s'\n", i, _k[i]->len, _k[i]->s); } - if (db_redis_scan_query_keys(con, CON_TABLE(_h), _k, _n, + if (db_redis_scan_query_keys(con, CON_TABLE(_h), _n, keys, keys_count, - manual_keys, manual_keys_count) != 0) { + manual_keys, manual_keys_count, ts_scan_start, ts_scan_key) != 0) { LM_ERR("failed to scan query keys\n"); goto error; } @@ -1253,7 +1607,8 @@ static int db_redis_perform_query(const db1_con_t* _h, km_redis_con_t *con, cons static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, const db_key_t* _k, const db_val_t* _v, const db_op_t *_op, const int _n, redis_key_t **keys, int *keys_count, - int **manual_keys, int *manual_keys_count, int do_table_scan) { + int **manual_keys, int *manual_keys_count, int do_table_scan, uint64_t ts_scan_start, + const str *ts_scan_key) { int i = 0, j = 0; redis_key_t *k = NULL; @@ -1264,21 +1619,29 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con redisReply *reply = NULL; redis_key_t *query_v = NULL; redis_key_t *type_keys = NULL; + redis_key_t *set_keys = NULL; redis_key_t *all_type_keys = NULL; db_val_t *db_vals = NULL; db_key_t *db_keys = NULL; redis_key_t *type_key; + redis_key_t *set_key; if (!*keys_count && do_table_scan) { - LM_WARN("performing full table scan on table '%.*s' while performing delete\n", - CON_TABLE(_h)->len, CON_TABLE(_h)->s); + if (!ts_scan_start) + LM_WARN("performing full table scan on table '%.*s' while performing delete\n", + CON_TABLE(_h)->len, CON_TABLE(_h)->s); + else + LM_WARN("performing table scan on table '%.*s' while performing delete using match key " + "'%.*s' at offset %llx\n", + CON_TABLE(_h)->len, CON_TABLE(_h)->s, + ts_scan_key->len, ts_scan_key->s, (unsigned long long) ts_scan_start); for(i = 0; i < _n; ++i) { LM_WARN(" scan key %d is '%.*s'\n", i, _k[i]->len, _k[i]->s); } - if (db_redis_scan_query_keys(con, CON_TABLE(_h), _k, _n, + if (db_redis_scan_query_keys(con, CON_TABLE(_h), _n, keys, keys_count, - manual_keys, manual_keys_count) != 0) { + manual_keys, manual_keys_count, ts_scan_start, ts_scan_key) != 0) { LM_ERR("failed to scan query keys\n"); goto error; } @@ -1412,7 +1775,7 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con } } if (db_redis_build_type_keys(con, CON_TABLE(_h), db_keys, db_vals, all_type_keys_count, - &type_keys, &type_keys_count) != 0) { + &type_keys, &set_keys, &type_keys_count) != 0) { LM_ERR("failed to build type keys\n"); goto error; } @@ -1437,8 +1800,18 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con db_redis_check_reply(con, reply, error); db_redis_free_reply(&reply); - for (type_key = type_keys; type_key; type_key = type_key->next) { - if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) { + for (type_key = type_keys, set_key = set_keys; type_key; + type_key = type_key->next, set_key = set_key->next) { + + if (db_redis_key_add_string(&query_v, "EVALSHA", 7) != 0) { + LM_ERR("Failed to add srem command to post-delete query\n"); + goto error; + } + if (db_redis_key_add_string(&query_v, con->srem_key_lua, strlen(con->srem_key_lua)) != 0) { + LM_ERR("Failed to add srem command to post-delete query\n"); + goto error; + } + if (db_redis_key_add_string(&query_v, "3", 1) != 0) { LM_ERR("Failed to add srem command to post-delete query\n"); goto error; } @@ -1446,6 +1819,10 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con LM_ERR("Failed to add key to delete query\n"); goto error; } + if (db_redis_key_add_str(&query_v, &set_key->key) != 0) { + LM_ERR("Failed to add key to delete query\n"); + goto error; + } if (db_redis_key_add_str(&query_v, key) != 0) { LM_ERR("Failed to add key to delete query\n"); goto error; @@ -1457,6 +1834,7 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con } LM_DBG("done with loop '%.*s'\n", k->key.len, k->key.s); db_redis_key_free(&type_keys); + db_redis_key_free(&set_keys); } db_redis_key_free(&all_type_keys); db_redis_key_free(&query_v); @@ -1473,6 +1851,7 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con pkg_free(db_vals); db_redis_key_free(&query_v); db_redis_key_free(&type_keys); + db_redis_key_free(&set_keys); db_redis_key_free(&all_type_keys); return -1; } @@ -1481,7 +1860,8 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con const db_val_t* _v, const db_op_t *_op, const db_key_t* _uk, const db_val_t *_uv, const int _n, const int _nu, redis_key_t **keys, int *keys_count, - int **manual_keys, int *manual_keys_count, int do_table_scan) { + int **manual_keys, int *manual_keys_count, int do_table_scan, uint64_t ts_scan_start, + const str *ts_scan_key) { redisReply *reply = NULL; redis_key_t *query_v = NULL; @@ -1490,6 +1870,16 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con int i; int j; size_t col; + redis_key_t *all_type_keys = NULL; + int all_type_keys_count = 0; + db_val_t *db_vals = NULL; + db_key_t *db_keys = NULL; + redis_key_t *type_keys = NULL; + redis_key_t *set_keys = NULL; + int type_keys_count = 0; + redis_key_t *new_type_keys = NULL; + int new_type_keys_count = 0; + redis_key_t *all_type_key; if (!(*keys_count) && do_table_scan) { LM_WARN("performing full table scan on table '%.*s' while performing update\n", @@ -1498,18 +1888,35 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con LM_WARN(" scan key %d is '%.*s'\n", i, _k[i]->len, _k[i]->s); } - if (db_redis_scan_query_keys(con, CON_TABLE(_h), _k, _n, + if (db_redis_scan_query_keys(con, CON_TABLE(_h), _n, keys, keys_count, - manual_keys, manual_keys_count) != 0) { + manual_keys, manual_keys_count, ts_scan_start, ts_scan_key) != 0) { LM_ERR("failed to scan query keys\n"); goto error; } } + // TODO: this should be moved to redis_connection structure + // and be parsed at startup: + // + // fetch list of keys in all types + if (db_redis_get_keys_for_all_types(con, CON_TABLE(_h), + &all_type_keys, &all_type_keys_count) != 0) { + LM_ERR("failed to get full list of type keys\n"); + goto error; + } + + if (db_redis_build_type_keys(con, CON_TABLE(_h), _uk, _uv, _nu, + &new_type_keys, NULL, &new_type_keys_count) != 0) { + LM_ERR("failed to build type keys\n"); + goto error; + } + LM_DBG("%i new type keys\n", new_type_keys_count); + for (key = *keys; key; key = key->next) { str *keyname = &key->key; - LM_DBG("fetching row for '%.*s' from redis\n", keyname->len, keyname->s); + LM_DBG("fetching row for '%.*s' from redis for update\n", keyname->len, keyname->s); if (db_redis_key_add_string(&query_v, "EXISTS", 6) != 0) { @@ -1527,19 +1934,9 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con db_redis_key_free(&query_v); // construct HMGET query - if ((*manual_keys_count) == 0) { - if (db_redis_key_add_string(&query_v, "HGETALL", 7) != 0) { - LM_ERR("Failed to set hgetall command to pre-update hget query\n"); - goto error; - } - // TODO: actually we wouldn't have to fetch it at all, but then we'd - // have to mark this key telling to not fetch reply of HMGET after - // EXISTS returns false! - } else { - if (db_redis_key_add_string(&query_v, "HMGET", 5) != 0) { - LM_ERR("Failed to set hgetall command to pre-update hget query\n"); - goto error; - } + if (db_redis_key_add_string(&query_v, "HMGET", 5) != 0) { + LM_ERR("Failed to set hgetall command to pre-update hget query\n"); + goto error; } if (db_redis_key_add_str(&query_v, keyname) != 0) { LM_ERR("Failed to add key name to pre-update exists query\n"); @@ -1554,6 +1951,13 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con goto error; } } + // add all type keys to query + for (all_type_key = all_type_keys; all_type_key; all_type_key = all_type_key->next) { + if (db_redis_key_add_str(&query_v, &all_type_key->key) != 0) { + LM_ERR("Failed to add type key to pre-update query\n"); + goto error; + } + } if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) { LM_ERR("Failed to append redis command\n"); @@ -1580,9 +1984,13 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con for (key = *keys; key; key = key->next) { + redis_key_t *tmp = NULL; + redis_key_t *type_key; + redis_key_t *set_key; + redis_key_t *new_type_key; int row_match; - LM_DBG("fetching replies for '%.*s' from redis\n", key->key.len, key->key.s); + LM_DBG("fetching replies for '%.*s' from redis for update\n", key->key.len, key->key.s); // get reply for EXISTS query if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) { @@ -1639,13 +2047,50 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con } } } - db_redis_free_reply(&reply); if (!row_match) { continue; } else { LM_DBG("row matches manual filtering, proceed with update\n"); } + db_keys = (db_key_t*) pkg_malloc(all_type_keys_count * sizeof(db_key_t)); + if (!db_keys) { + LM_ERR("Failed to allocate memory for db type keys\n"); + goto error; + } + for (j = 0, tmp = all_type_keys; tmp; ++j, tmp = tmp->next) { + db_keys[j] = &tmp->key; + } + + db_vals = (db_val_t*) pkg_malloc(all_type_keys_count * sizeof(db_val_t)); + if (!db_vals) { + LM_ERR("Failed to allocate memory for manual db vals\n"); + goto error; + } + + for (j = 0, all_type_key = all_type_keys; all_type_key; ++j, all_type_key = all_type_key->next) { + db_val_t *v = &(db_vals[j]); + str *key = &all_type_key->key; + char *value = reply->element[*manual_keys_count + j]->str; + int coltype = db_redis_schema_get_column_type(con, CON_TABLE(_h), key); + if (value == NULL) { + VAL_NULL(v) = 1; + } else if (db_str2val(coltype, v, value, strlen(value), 0) != 0) { + LM_ERR("Failed to convert redis reply column to db value\n"); + goto error; + } + } + if (db_redis_build_type_keys(con, CON_TABLE(_h), db_keys, db_vals, all_type_keys_count, + &type_keys, &set_keys, &type_keys_count) != 0) { + LM_ERR("failed to build type keys\n"); + goto error; + } + pkg_free(db_keys); + db_keys = NULL; + pkg_free(db_vals); + db_vals = NULL; + db_redis_free_reply(&reply); + if (db_redis_key_add_string(&query_v, "HMSET", 5) != 0) { LM_ERR("Failed to add hmset command to update query\n"); goto error; @@ -1681,6 +2126,108 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con } db_redis_key_free(&query_v); + + for (type_key = type_keys, set_key = set_keys; type_key; + type_key = type_key->next, set_key = set_key->next) { + + LM_DBG("checking for update of type key '%.*s'\n", + type_key->key.len, type_key->key.s); + char *prefix = ser_memmem(type_key->key.s, "::", type_key->key.len, 2); + if (!prefix || prefix == type_key->key.s) { + LM_DBG("Invalid key without :: '%.*s'\n", + type_key->key.len, type_key->key.s); + continue; + } + for (new_type_key = new_type_keys; new_type_key; new_type_key = new_type_key->next) { + // compare prefix to see if this is the same key + if (memcmp(new_type_key->key.s, type_key->key.s, prefix - type_key->key.s)) + continue; + LM_DBG("checking for update of type key against '%.*s'\n", + new_type_key->key.len, new_type_key->key.s); + if (!str_strcmp(&new_type_key->key, &type_key->key)) + continue; + + // add to new set key and delete from old + + if (db_redis_key_add_string(&query_v, "SADD", 4) != 0) { + LM_ERR("Failed to set sadd command to post-update query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &new_type_key->key) != 0) { + LM_ERR("Failed to add map key to post-update query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &key->key) != 0) { + LM_ERR("Failed to set entry key to post-update query\n"); + goto error; + } + + update_queries++; + if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) { + LM_ERR("Failed to append redis command\n"); + goto error; + } + + db_redis_key_free(&query_v); + + if (db_redis_key_add_string(&query_v, "SADD", 4) != 0) { + LM_ERR("Failed to set sadd command to post-update query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &set_key->key) != 0) { + LM_ERR("Failed to add map key to post-update query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &new_type_key->key) != 0) { + LM_ERR("Failed to set entry key to post-update query\n"); + goto error; + } + + update_queries++; + if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) { + LM_ERR("Failed to append redis command\n"); + goto error; + } + + db_redis_key_free(&query_v); + + if (db_redis_key_add_string(&query_v, "EVAL", 4) != 0) { + LM_ERR("Failed to add srem command to post-delete query\n"); + goto error; + } + if (db_redis_key_add_string(&query_v, SREM_KEY_LUA, strlen(SREM_KEY_LUA)) != 0) { + LM_ERR("Failed to add srem command to post-delete query\n"); + goto error; + } + if (db_redis_key_add_string(&query_v, "3", 1) != 0) { + LM_ERR("Failed to add srem command to post-delete query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &type_key->key) != 0) { + LM_ERR("Failed to add key to delete query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &set_key->key) != 0) { + LM_ERR("Failed to add key to delete query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &key->key) != 0) { + LM_ERR("Failed to add key to delete query\n"); + goto error; + } + + update_queries++; + if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) { + LM_ERR("Failed to append redis command\n"); + goto error; + } + + db_redis_key_free(&query_v); + } + } + + db_redis_key_free(&type_keys); + db_redis_key_free(&set_keys); } LM_DBG("getting replies for %d queries\n", update_queries); @@ -1697,6 +2244,8 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con LM_DBG("done performing update\n"); + db_redis_key_free(&all_type_keys); + db_redis_key_free(&new_type_keys); return 0; error: @@ -1704,6 +2253,10 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con if (reply) db_redis_free_reply(&reply); db_redis_key_free(&query_v); + db_redis_key_free(&all_type_keys); + db_redis_key_free(&type_keys); + db_redis_key_free(&set_keys); + db_redis_key_free(&new_type_keys); return -1; } @@ -1726,6 +2279,8 @@ int db_redis_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op, km_redis_con_t *con = NULL; int free_op = 0; int do_table_scan = 0; + uint64_t ts_scan_start = 0; + str ts_scan_key = {0,}; redis_key_t *keys = NULL; int keys_count = 0; @@ -1796,7 +2351,8 @@ int db_redis_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op, if (_n > 0) { if (db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n, - &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan) != 0) { + &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan, &ts_scan_start, + &ts_scan_key) != 0) { LM_ERR("failed to build query keys\n"); goto error; } @@ -1814,7 +2370,7 @@ int db_redis_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op, } if (db_redis_perform_query(_h, con, _k, _v, query_ops, _c, _n, _nc, _r, - &keys, &keys_count, &manual_keys, &manual_keys_count, do_table_scan) != 0) { + &keys, &keys_count, &manual_keys, &manual_keys_count, do_table_scan, ts_scan_start, &ts_scan_key) != 0) { goto error; } @@ -1828,6 +2384,8 @@ int db_redis_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op, if (manual_keys) { pkg_free(manual_keys); } + if (ts_scan_key.s) + pkg_free(ts_scan_key.s); db_redis_consume_replies(con); return 0; @@ -1841,6 +2399,8 @@ int db_redis_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op, if (manual_keys) { pkg_free(manual_keys); } + if (ts_scan_key.s) + pkg_free(ts_scan_key.s); db_redis_consume_replies(con); @@ -1869,11 +2429,13 @@ int db_redis_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v, redis_key_t *key = NULL; int keys_count = 0; redis_key_t *type_keys = NULL; + redis_key_t *set_keys = NULL; int type_keys_count = 0; redis_key_t *query_v = NULL; redisReply *reply = NULL; int i; redis_key_t *k; + redis_key_t *set_key; con = REDIS_CON(_h); if (con && con->con == NULL) { @@ -1900,7 +2462,7 @@ int db_redis_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v, goto error; } if (db_redis_build_type_keys(con, CON_TABLE(_h), _k, _v, _n, - &type_keys, &type_keys_count) != 0) { + &type_keys, &set_keys, &type_keys_count) != 0) { LM_ERR("failed to build type keys\n"); goto error; } @@ -1939,7 +2501,7 @@ int db_redis_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v, db_redis_check_reply(con, reply, error); db_redis_free_reply(&reply); - for (k = type_keys; k; k = k->next) { + for (k = type_keys, set_key = set_keys; k; k = k->next, set_key = set_key->next) { str *type_key = &k->key; LM_DBG("inserting entry key '%.*s' to type map '%.*s'\n", @@ -1962,10 +2524,29 @@ int db_redis_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v, db_redis_key_free(&query_v); db_redis_check_reply(con, reply, error); db_redis_free_reply(&reply); + + if (db_redis_key_add_string(&query_v, "SADD", 4) != 0) { + LM_ERR("Failed to set sadd command to post-insert query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &set_key->key) != 0) { + LM_ERR("Failed to add map key to post-insert query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, type_key) != 0) { + LM_ERR("Failed to set entry key to post-insert query\n"); + goto error; + } + + reply = db_redis_command_argv(con, query_v); + db_redis_key_free(&query_v); + db_redis_check_reply(con, reply, error); + db_redis_free_reply(&reply); } db_redis_key_free(&key); db_redis_key_free(&type_keys); + db_redis_key_free(&set_keys); db_redis_consume_replies(con); return 0; @@ -1973,6 +2554,7 @@ int db_redis_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v, error: db_redis_key_free(&key); db_redis_key_free(&type_keys); + db_redis_key_free(&set_keys); db_redis_key_free(&query_v); if (reply) @@ -2002,6 +2584,8 @@ int db_redis_delete(const db1_con_t* _h, const db_key_t* _k, int manual_keys_count = 0; int free_op = 0; int do_table_scan = 0; + uint64_t ts_scan_start = 0; + str ts_scan_key = {0,}; db_op_t *query_ops = NULL; int i; @@ -2046,7 +2630,8 @@ int db_redis_delete(const db1_con_t* _h, const db_key_t* _k, if (_n > 0) { if (db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n, - &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan) != 0) { + &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan, &ts_scan_start, + &ts_scan_key) != 0) { LM_ERR("failed to build query keys\n"); goto error; } @@ -2063,7 +2648,7 @@ int db_redis_delete(const db1_con_t* _h, const db_key_t* _k, } if (db_redis_perform_delete(_h, con, _k, _v, query_ops, _n, - &keys, &keys_count, &manual_keys, &manual_keys_count, do_table_scan) != 0) { + &keys, &keys_count, &manual_keys, &manual_keys_count, do_table_scan, ts_scan_start, &ts_scan_key) != 0) { goto error; } @@ -2075,6 +2660,8 @@ int db_redis_delete(const db1_con_t* _h, const db_key_t* _k, db_redis_key_free(&keys); if (manual_keys) pkg_free(manual_keys); + if (ts_scan_key.s) + pkg_free(ts_scan_key.s); db_redis_consume_replies(con); return 0; @@ -2087,6 +2674,8 @@ int db_redis_delete(const db1_con_t* _h, const db_key_t* _k, db_redis_key_free(&keys); if (manual_keys) pkg_free(manual_keys); + if (ts_scan_key.s) + pkg_free(ts_scan_key.s); db_redis_consume_replies(con); return -1; } @@ -2109,6 +2698,8 @@ int db_redis_update(const db1_con_t* _h, const db_key_t* _k, km_redis_con_t *con = NULL; int free_op = 0; int do_table_scan = 0; + uint64_t ts_scan_start = 0; + str ts_scan_key = {0,}; redis_key_t *keys = NULL; int keys_count = 0; @@ -2158,7 +2749,8 @@ int db_redis_update(const db1_con_t* _h, const db_key_t* _k, if (_n > 0) { if (db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n, - &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan) != 0) { + &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan, &ts_scan_start, + &ts_scan_key) != 0) { LM_ERR("failed to build query keys\n"); goto error; } @@ -2175,7 +2767,7 @@ int db_redis_update(const db1_con_t* _h, const db_key_t* _k, } if (db_redis_perform_update(_h, con, _k, _v, query_ops, _uk, _uv, _n, _nu, - &keys, &keys_count, &manual_keys, &manual_keys_count, do_table_scan) != 0) { + &keys, &keys_count, &manual_keys, &manual_keys_count, do_table_scan, ts_scan_start, &ts_scan_key) != 0) { goto error; } @@ -2189,6 +2781,8 @@ int db_redis_update(const db1_con_t* _h, const db_key_t* _k, if (manual_keys) { pkg_free(manual_keys); } + if (ts_scan_key.s) + pkg_free(ts_scan_key.s); db_redis_consume_replies(con); return 0; @@ -2201,6 +2795,8 @@ int db_redis_update(const db1_con_t* _h, const db_key_t* _k, if (manual_keys) { pkg_free(manual_keys); } + if (ts_scan_key.s) + pkg_free(ts_scan_key.s); db_redis_consume_replies(con); return -1; } diff --git a/src/modules/db_redis/redis_dbase.h b/src/modules/db_redis/redis_dbase.h index f41be1a2794..ae40db6968b 100644 --- a/src/modules/db_redis/redis_dbase.h +++ b/src/modules/db_redis/redis_dbase.h @@ -25,6 +25,9 @@ #include "db_redis_mod.h" +#define SREM_KEY_LUA "redis.call('SREM', KEYS[1], KEYS[3]); if redis.call('SCARD', KEYS[1]) == 0 then redis.call('SREM', KEYS[2], KEYS[1]) end" + + /* * Initialize database connection */ @@ -85,4 +88,4 @@ int db_redis_replace(const db1_con_t* handle, const db_key_t* keys, const db_val */ int db_redis_use_table(db1_con_t* _h, const str* _t); -#endif /* _REDIS_BASE_H_ */ \ No newline at end of file +#endif /* _REDIS_BASE_H_ */ diff --git a/src/modules/db_redis/redis_table.c b/src/modules/db_redis/redis_table.c index fe634901742..4bed4e1c6dc 100644 --- a/src/modules/db_redis/redis_table.c +++ b/src/modules/db_redis/redis_table.c @@ -487,13 +487,14 @@ static struct str_hash_entry* db_redis_create_column(str *col, str *type) { } int db_redis_parse_keys(km_redis_con_t *con) { - char *p; + char *p, *q; char *start; char *end; str table_name; str type_name; str column_name; + str version_code; struct str_hash_entry *table_entry; redis_table_t *table; @@ -533,6 +534,16 @@ int db_redis_parse_keys(km_redis_con_t *con) { } table_name.s = start; table_name.len = p - start; + + version_code = (str){"",0}; + q = memchr(table_name.s, ':', table_name.len); + if (q) { + version_code = table_name; + version_code.len = q - table_name.s + 1; + table_name.s = q + 1; + table_name.len -= version_code.len; + } + state = DBREDIS_KEYS_TYPE_ST; start = ++p; LM_DBG("found table name '%.*s'\n", table_name.len, table_name.s); @@ -544,6 +555,7 @@ int db_redis_parse_keys(km_redis_con_t *con) { goto err; } table = table_entry->u.p; + table->version_code = version_code; break; case DBREDIS_KEYS_TYPE_ST: while(p != end && *p != ':') @@ -593,6 +605,10 @@ int db_redis_parse_keys(km_redis_con_t *con) { column_name.s = start; column_name.len = p - start; start = ++p; + + if (!column_name.len) + break; + /* LM_DBG("found column name '%.*s' in type '%.*s' for table '%.*s'\n", column_name.len, column_name.s, diff --git a/src/modules/db_redis/redis_table.h b/src/modules/db_redis/redis_table.h index 9e71d2a2ba2..a866b166dc3 100644 --- a/src/modules/db_redis/redis_table.h +++ b/src/modules/db_redis/redis_table.h @@ -42,6 +42,7 @@ struct redis_type { typedef struct redis_table redis_table_t; struct redis_table { int version; + str version_code; redis_key_t *entry_keys; redis_type_t *types; struct str_hash_table columns;