Skip to content

Commit

Permalink
Improvements for labels handling (netdata#16172)
Browse files Browse the repository at this point in the history
* Add additional checks
rrdlabels_find_label_with_key_unsafe finds a label with specified key (not only if it exists but with difefrent value)
Quick check if label already exists (avoids JudyLIns)

* Add migration unit test
Add additional unit test to verify that adding a label with the same key will replace its value
  • Loading branch information
stelfrag committed Oct 18, 2023
1 parent a155cca commit 0542661
Showing 1 changed file with 127 additions and 24 deletions.
151 changes: 127 additions & 24 deletions database/rrdlabels.c
Expand Up @@ -671,6 +671,7 @@ void rrdlabels_destroy(RRDLABELS *labels)
freez(labels);
}

// Check in labels to see if we have the key specified in label
static RRDLABEL *rrdlabels_find_label_with_key_unsafe(RRDLABELS *labels, RRDLABEL *label)
{
if (unlikely(!labels))
Expand All @@ -682,7 +683,7 @@ static RRDLABEL *rrdlabels_find_label_with_key_unsafe(RRDLABELS *labels, RRDLABE
RRDLABEL *found = NULL;
while ((PValue = JudyLFirstThenNext(labels->JudyL, &Index, &first_then_next))) {
RRDLABEL *lb = (RRDLABEL *)Index;
if (lb->index.key == label->index.key && lb->index.value != label->index.value) {
if (lb->index.key == label->index.key) {
found = (RRDLABEL *)Index;
break;
}
Expand All @@ -695,39 +696,42 @@ static RRDLABEL *rrdlabels_find_label_with_key_unsafe(RRDLABELS *labels, RRDLABE

static void labels_add_already_sanitized(RRDLABELS *labels, const char *key, const char *value, RRDLABEL_SRC ls)
{
RRDLABEL *label = add_label_name_value(key, value);
RRDLABEL *new_label = add_label_name_value(key, value);

spinlock_lock(&labels->spinlock);

RRDLABEL *old_key = rrdlabels_find_label_with_key_unsafe(labels, label);
RRDLABEL *old_label_with_key = rrdlabels_find_label_with_key_unsafe(labels, new_label);

if (old_label_with_key == new_label) {
spinlock_unlock(&labels->spinlock);
delete_label(new_label);
return;
}

size_t mem_before_judyl = JudyLMemUsed(labels->JudyL);

Pvoid_t *PValue = JudyLIns(&labels->JudyL, (Word_t) label, PJE0);
if(unlikely(!PValue || PValue == PJERR))
Pvoid_t *PValue = JudyLIns(&labels->JudyL, (Word_t)new_label, PJE0);
if (!PValue || PValue == PJERR)
fatal("RRDLABELS: corrupted labels JudyL array");

if (!*PValue) {
RRDLABEL_SRC new_ls;
if (old_key)
new_ls = ((ls & ~(RRDLABEL_FLAG_NEW | RRDLABEL_FLAG_OLD)) | RRDLABEL_FLAG_OLD);
else
new_ls = ((ls & ~(RRDLABEL_FLAG_NEW | RRDLABEL_FLAG_OLD)) | RRDLABEL_FLAG_NEW);
*((RRDLABEL_SRC *)PValue) = new_ls;
RRDLABEL_SRC new_ls = (ls & ~(RRDLABEL_FLAG_NEW | RRDLABEL_FLAG_OLD));
labels->version++;

labels->version++;
if (old_label_with_key) {
(void)JudyLDel(&labels->JudyL, (Word_t)old_label_with_key, PJE0);
new_ls |= RRDLABEL_FLAG_OLD;
} else
new_ls |= RRDLABEL_FLAG_NEW;

if (old_key) {
(void)JudyLDel(&labels->JudyL, (Word_t) old_key, PJE0);
delete_label((RRDLABEL *)old_key);
}
size_t mem_after_judyl = JudyLMemUsed(labels->JudyL);
STATS_PLUS_MEMORY(&dictionary_stats_category_rrdlabels, 0, mem_after_judyl - mem_before_judyl, 0);
}
else
delete_label(label);
*((RRDLABEL_SRC *)PValue) = new_ls;

size_t mem_after_judyl = JudyLMemUsed(labels->JudyL);
STATS_PLUS_MEMORY(&dictionary_stats_category_rrdlabels, 0, mem_after_judyl - mem_before_judyl, 0);

spinlock_unlock(&labels->spinlock);

if (old_label_with_key)
delete_label((RRDLABEL *)old_label_with_key);
}

void rrdlabels_add(RRDLABELS *labels, const char *name, const char *value, RRDLABEL_SRC ls)
Expand Down Expand Up @@ -984,7 +988,7 @@ int rrdlabels_walkthrough_read(RRDLABELS *labels, int (*callback)(const char *na
// migrate an existing label list to a new list

void rrdlabels_migrate_to_these(RRDLABELS *dst, RRDLABELS *src) {
if (!dst || !src)
if (!dst || !src || (dst == src))
return;

spinlock_lock(&dst->spinlock);
Expand Down Expand Up @@ -1025,7 +1029,7 @@ void rrdlabels_migrate_to_these(RRDLABELS *dst, RRDLABELS *src) {

void rrdlabels_copy(RRDLABELS *dst, RRDLABELS *src)
{
if (!dst || !src)
if (!dst || !src || (dst == src))
return;

RRDLABEL *label;
Expand Down Expand Up @@ -1265,6 +1269,9 @@ void rrdlabels_to_buffer_json_members(RRDLABELS *labels, BUFFER *wb)

size_t rrdlabels_entries(RRDLABELS *labels __maybe_unused)
{
if (unlikely(!labels))
return 0;

size_t count;
spinlock_lock(&labels->spinlock);
count = JudyLCount(labels->JudyL, 0, -1, PJE0);
Expand All @@ -1274,6 +1281,9 @@ size_t rrdlabels_entries(RRDLABELS *labels __maybe_unused)

size_t rrdlabels_version(RRDLABELS *labels __maybe_unused)
{
if (unlikely(!labels))
return 0;

return (size_t) labels->version;
}

Expand Down Expand Up @@ -1405,6 +1415,97 @@ int rrdlabels_unittest_add_pairs() {
return errors;
}

int rrdlabels_unittest_double_check() {
fprintf(stderr, "\n%s() tests\n", __FUNCTION__);

int errors = 1;
int ret = 0;
RRDLABELS *labels = rrdlabels_create();

const char *pair = "key1=value1";

struct rrdlabels_unittest_add_a_pair tmp = {
.pair = pair,
.expected_name = "key1",
.expected_value = NULL,
.errors = 0
};

fprintf(stderr, "rrdlabels_add_pair(labels, %s) ...\n ", pair);

rrdlabels_add_pair(labels, pair, RRDLABEL_SRC_CONFIG);
size_t count = rrdlabels_entries(labels);
fprintf(stderr, "Added one key with \"value1\", entries found %zu\n", count);
tmp.expected_value = "value1";
ret = rrdlabels_walkthrough_read(labels, rrdlabels_unittest_add_a_pair_callback, &tmp);

fprintf(stderr, "Adding key with same value \"value1\" (collision check)\n");
rrdlabels_add_pair(labels, pair, RRDLABEL_SRC_CONFIG);
count = rrdlabels_entries(labels);
fprintf(stderr, "Added same key again \"value1\", entries found %zu\n", count);

ret = rrdlabels_walkthrough_read(labels, rrdlabels_unittest_add_a_pair_callback, &tmp);

// Add same key with different value
pair = "key1=value2";
rrdlabels_add_pair(labels, pair, RRDLABEL_SRC_CONFIG);
count = rrdlabels_entries(labels);
fprintf(stderr, "Added same key again with \"value2\", entries found %zu\n", count);

tmp.expected_value = "value2";
ret = rrdlabels_walkthrough_read(labels, rrdlabels_unittest_add_a_pair_callback, &tmp);

fprintf(stderr, "Adding key with same value \"value2\" (collision check)\n");
rrdlabels_add_pair(labels, pair, RRDLABEL_SRC_CONFIG);
count = rrdlabels_entries(labels);
fprintf(stderr, "Added same key again with \"value2\", entries found %zu\n", count);

ret = rrdlabels_walkthrough_read(labels, rrdlabels_unittest_add_a_pair_callback, &tmp);
errors = tmp.errors;
if(ret != 1) {
fprintf(stderr, "failed to get \"%s\" label", "key1");
errors++;
}

if(!errors)
fprintf(stderr, " OK, name='%s' and value='%s'\n", tmp.name, tmp.value?tmp.value:"(null)");
else
fprintf(stderr, " FAILED\n");

rrdlabels_destroy(labels);

return errors;
}

int rrdlabels_unittest_migrate_check() {
fprintf(stderr, "\n%s() tests\n", __FUNCTION__);

RRDLABELS *labels1 = NULL;
RRDLABELS *labels2 = NULL;

labels1 = rrdlabels_create();
labels2 = rrdlabels_create();

rrdlabels_add(labels1, "key1", "value1", RRDLABEL_SRC_CONFIG);
rrdlabels_add(labels1, "key1", "value2", RRDLABEL_SRC_CONFIG);

rrdlabels_add(labels2, "new_key1", "value2", RRDLABEL_SRC_CONFIG);
rrdlabels_add(labels2, "new_key2", "value2", RRDLABEL_SRC_CONFIG);
rrdlabels_add(labels2, "key1", "value2", RRDLABEL_SRC_CONFIG);

fprintf(stderr, "Labels1 entries found %zu (should be 1)\n", rrdlabels_entries(labels1));
fprintf(stderr, "Labels2 entries found %zu (should be 3)\n", rrdlabels_entries(labels2));

rrdlabels_migrate_to_these(labels1, labels2);
fprintf(stderr, "labels1 (migrated) entries found %zu (should be 3)\n", rrdlabels_entries(labels1));
size_t entries = rrdlabels_entries(labels1);

rrdlabels_destroy(labels1);
rrdlabels_destroy(labels2);

return entries != 3;
}

int rrdlabels_unittest_check_simple_pattern(RRDLABELS *labels, const char *pattern, bool expected) {
fprintf(stderr, "rrdlabels_match_simple_pattern(labels, \"%s\") ... ", pattern);

Expand Down Expand Up @@ -1497,6 +1598,8 @@ int rrdlabels_unittest(void) {
errors += rrdlabels_unittest_sanitization();
errors += rrdlabels_unittest_add_pairs();
errors += rrdlabels_unittest_simple_pattern();
errors += rrdlabels_unittest_double_check();
errors += rrdlabels_unittest_migrate_check();

fprintf(stderr, "%d errors found\n", errors);
return errors;
Expand Down

0 comments on commit 0542661

Please sign in to comment.