Skip to content

Commit

Permalink
Revert "Fix race condition in dbengine (netdata#7533)" (netdata#7560)
Browse files Browse the repository at this point in the history
We are removing this fix for further internal testing, it will be returning after we iron out
some bugs.

This reverts commit 53ab093.
  • Loading branch information
amoss authored and jackyhuang85 committed Jan 1, 2020
1 parent b684aa2 commit 263a5b4
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 116 deletions.
12 changes: 4 additions & 8 deletions daemon/main.c
Expand Up @@ -340,12 +340,10 @@ int help(int exitcode) {
" -W unittest Run internal unittests and exit.\n\n"
#ifdef ENABLE_DBENGINE
" -W createdataset=N Create a DB engine dataset of N seconds and exit.\n\n"
" -W stresstest=A,B,C,D,E,F\n"
" Run a DB engine stress test for A seconds,\n"
" -W stresstest=A,B,C,D,E Run a DB engine stress test for A seconds,\n"
" with B writers and C readers, with a ramp up\n"
" time of D seconds for writers, a page cache\n"
" size of E MiB, an optional disk space limit\n"
" of F MiB and exit.\n\n"
" size of E MiB, and exit.\n\n"
#endif
" -W set section option value\n"
" set netdata.conf option from the command line.\n\n"
Expand Down Expand Up @@ -958,7 +956,7 @@ int main(int argc, char **argv) {
else if(strncmp(optarg, stresstest_string, strlen(stresstest_string)) == 0) {
char *endptr;
unsigned test_duration_sec = 0, dset_charts = 0, query_threads = 0, ramp_up_seconds = 0,
page_cache_mb = 0, disk_space_mb = 0;
page_cache_mb = 0;

optarg += strlen(stresstest_string);
test_duration_sec = (unsigned)strtoul(optarg, &endptr, 0);
Expand All @@ -970,10 +968,8 @@ int main(int argc, char **argv) {
ramp_up_seconds = (unsigned)strtoul(endptr + 1, &endptr, 0);
if (',' == *endptr)
page_cache_mb = (unsigned)strtoul(endptr + 1, &endptr, 0);
if (',' == *endptr)
disk_space_mb = (unsigned)strtoul(endptr + 1, &endptr, 0);
dbengine_stress_test(test_duration_sec, dset_charts, query_threads, ramp_up_seconds,
page_cache_mb, disk_space_mb);
page_cache_mb);
return 0;
}
#endif
Expand Down
83 changes: 27 additions & 56 deletions daemon/unit_test.c
Expand Up @@ -1723,7 +1723,7 @@ int test_dbengine(void)

default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;

fprintf(stderr, "Initializing localhost with hostname 'unittest-dbengine'\n");
debug(D_RRDHOST, "Initializing localhost with hostname 'unittest-dbengine'");
host = dbengine_rrdhost_find_or_create("unittest-dbengine");
if (NULL == host)
return 1;
Expand Down Expand Up @@ -1915,9 +1915,6 @@ static void generate_dbengine_chart(void *arg)
rrdset_done(st);
thread_info->time_max = time_current;
}
for (j = 0; j < DSET_DIMS; ++j) {
rrdeng_store_metric_finalize(rd[j]);
}
}

void generate_dbengine_dataset(unsigned history_seconds)
Expand All @@ -1938,7 +1935,7 @@ void generate_dbengine_dataset(unsigned history_seconds)
default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100;

error_log_limit_unlimited();
fprintf(stderr, "Initializing localhost with hostname 'dbengine-dataset'\n");
debug(D_RRDHOST, "Initializing localhost with hostname 'dbengine-dataset'");

host = dbengine_rrdhost_find_or_create("dbengine-dataset");
if (NULL == host)
Expand Down Expand Up @@ -1989,7 +1986,6 @@ struct dbengine_query_thread {
unsigned history_seconds; /* how far back in the past to go */
volatile long done; /* initialize to 0, set to 1 to stop thread */
unsigned long errors, queries_nr, queried_metrics_nr; /* statistics */
uint8_t delete_old_data; /* if non zero then data are deleted when disk space is exhausted */

struct dbengine_chart_thread *chart_threads[]; /* dset_charts elements */
};
Expand All @@ -1999,7 +1995,7 @@ static void query_dbengine_chart(void *arg)
struct dbengine_query_thread *thread_info = (struct dbengine_query_thread *)arg;
const int DSET_CHARTS = thread_info->dset_charts;
const int DSET_DIMS = thread_info->dset_dims;
time_t time_after, time_before, time_min, time_approx_min, time_max, duration;
time_t time_after, time_before, time_min, time_max, duration;
int i, j, update_every = 1;
RRDSET *st;
RRDDIM *rd;
Expand All @@ -2019,13 +2015,6 @@ static void query_dbengine_chart(void *arg)

time_min = thread_info->time_present - thread_info->history_seconds + 1;
time_max = thread_info->chart_threads[i]->time_max;

if (thread_info->delete_old_data) {
/* A time window of twice the disk space is sufficient for compression space savings of up to 50% */
time_approx_min = time_max - (default_rrdeng_disk_quota_mb * 2 * 1024 * 1024) /
(((uint64_t) DSET_DIMS * DSET_CHARTS) * sizeof(storage_number));
time_min = MAX(time_min, time_approx_min);
}
if (!time_max) {
time_before = time_after = time_min;
} else {
Expand All @@ -2041,64 +2030,52 @@ static void query_dbengine_chart(void *arg)
expected = unpack_storage_number(pack_storage_number((calculated_number) generatedv, SN_EXISTS));

if (unlikely(rd->state->query_ops.is_finished(&handle))) {
if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found data gap, ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, expected);
++thread_info->errors;
}
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found data gap, ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, expected);
++thread_info->errors;
break;
}
n = rd->state->query_ops.next_metric(&handle, &time_retrieved);
if (SN_EMPTY_SLOT == n) {
if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found data gap, ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, expected);
++thread_info->errors;
}
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found data gap, ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, expected);
++thread_info->errors;
break;
}
++thread_info->queried_metrics_nr;
value = unpack_storage_number(n);

same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0;
if (!same) {
if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT
", ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, expected, value);
++thread_info->errors;
}
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, expected, value);
++thread_info->errors;
}
if (time_retrieved != time_now) {
if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
fprintf(stderr,
" DB-engine stresstest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, (unsigned long) time_retrieved);
++thread_info->errors;
}
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, (unsigned long) time_retrieved);
++thread_info->errors;
}
}
rd->state->query_ops.finalize(&handle);
} while(!thread_info->done);
}

void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsigned QUERY_THREADS,
unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB, unsigned DISK_SPACE_MB)
unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB)
{
const unsigned DSET_DIMS = 128;
const uint64_t EXPECTED_COMPRESSION_RATIO = 20;
const unsigned HISTORY_SECONDS = 3600 * 24 * 365 * 50; /* 50 years of history */
const unsigned HISTORY_SECONDS = 3600 * 24 * 365; /* 1 year of history */
RRDHOST *host = NULL;
struct dbengine_chart_thread **chart_threads;
struct dbengine_query_thread **query_threads;
unsigned i, j;
time_t time_start, time_end;

error_log_limit_unlimited();

if (!TEST_DURATION_SEC)
TEST_DURATION_SEC = 10;
if (!DSET_CHARTS)
Expand All @@ -2110,18 +2087,13 @@ void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsi

default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
default_rrdeng_page_cache_mb = PAGE_CACHE_MB;
if (DISK_SPACE_MB) {
fprintf(stderr, "By setting disk space limit data are allowed to be deleted. "
"Data validation is turned off for this run.\n");
default_rrdeng_disk_quota_mb = DISK_SPACE_MB;
} else {
// Worst case for uncompressible data
default_rrdeng_disk_quota_mb =
(((uint64_t) DSET_DIMS * DSET_CHARTS) * sizeof(storage_number) * HISTORY_SECONDS) / (1024 * 1024);
default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100;
}
// Worst case for uncompressible data
default_rrdeng_disk_quota_mb = (((uint64_t)DSET_DIMS * DSET_CHARTS) * sizeof(storage_number) * HISTORY_SECONDS) /
(1024 * 1024);
default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100;

fprintf(stderr, "Initializing localhost with hostname 'dbengine-stress-test'\n");
error_log_limit_unlimited();
debug(D_RRDHOST, "Initializing localhost with hostname 'dbengine-stress-test'");

host = dbengine_rrdhost_find_or_create("dbengine-stress-test");
if (NULL == host)
Expand All @@ -2140,7 +2112,7 @@ void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsi
"%u MiB of page cache.\n",
RAMP_UP_SECONDS, TEST_DURATION_SEC, DSET_CHARTS, QUERY_THREADS, PAGE_CACHE_MB);

time_start = now_realtime_sec() + HISTORY_SECONDS; /* move history to the future */
time_start = now_realtime_sec();
for (i = 0 ; i < DSET_CHARTS ; ++i) {
chart_threads[i]->host = host;
chart_threads[i]->chartname = "random";
Expand Down Expand Up @@ -2174,7 +2146,6 @@ void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsi
for (j = 0 ; j < DSET_CHARTS ; ++j) {
query_threads[i]->chart_threads[j] = chart_threads[j];
}
query_threads[i]->delete_old_data = DISK_SPACE_MB ? 1 : 0;
assert(0 == uv_thread_create(&query_threads[i]->thread, query_dbengine_chart, query_threads[i]));
}
sleep(TEST_DURATION_SEC);
Expand Down
2 changes: 1 addition & 1 deletion daemon/unit_test.h
Expand Up @@ -12,7 +12,7 @@ extern int unit_test_buffer(void);
extern int test_dbengine(void);
extern void generate_dbengine_dataset(unsigned history_seconds);
extern void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsigned QUERY_THREADS,
unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB, unsigned DISK_SPACE_MB);
unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB);

#endif

Expand Down
34 changes: 13 additions & 21 deletions database/engine/pagecache.c
Expand Up @@ -101,13 +101,6 @@ void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
uv_cond_broadcast(&pg_cache_descr->cond);
}

void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
{
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_wake_up_waiters_unsafe(descr);
rrdeng_page_descr_mutex_unlock(ctx, descr);
}

/*
* The caller must hold page descriptor lock.
* The lock will be released and re-acquired. The descriptor is not guaranteed
Expand Down Expand Up @@ -142,34 +135,36 @@ unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_

/*
* The caller must hold page descriptor lock.
* Gets a reference to the page descriptor.
* Returns 1 on success and 0 on failure.
*/
int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;

if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
(exclusive_access && pg_cache_descr->refcnt)) {
return 0;
}
if (exclusive_access)
pg_cache_descr->flags |= RRD_PAGE_LOCKED;
++pg_cache_descr->refcnt;

return 1;
}

/*
* The caller must hold page descriptor lock.
* Gets a reference to the page descriptor.
* Returns 1 on success and 0 on failure.
* Same return values as pg_cache_try_get_unsafe() without doing anything.
*/
int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;

if (!pg_cache_can_get_unsafe(descr, exclusive_access))
if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
(exclusive_access && pg_cache_descr->refcnt)) {
return 0;

if (exclusive_access)
pg_cache_descr->flags |= RRD_PAGE_LOCKED;
++pg_cache_descr->refcnt;
}

return 1;
}
Expand Down Expand Up @@ -434,11 +429,8 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
}
pg_cache_put(ctx, descr);
rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */
(void)sleep_usec(1000); /* 1 msec */
}
if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED)
rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
destroy:
freez(descr);
pg_cache_update_metric_times(page_index);
Expand Down
1 change: 0 additions & 1 deletion database/engine/pagecache.h
Expand Up @@ -148,7 +148,6 @@ struct page_cache { /* TODO: add statistics */
};

extern void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr);
extern void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr);
extern unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
Expand Down
10 changes: 5 additions & 5 deletions database/engine/rrdengine.c
Expand Up @@ -121,19 +121,19 @@ void read_extent_cb(uv_fs_t* req)
} else {
(void) memcpy(page, uncompressed_buf + page_offset, descr->page_length);
}
pg_cache_replaceQ_insert(ctx, descr);
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
pg_cache_descr->page = page;
pg_cache_descr->flags |= RRD_PAGE_POPULATED;
pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING;
rrdeng_page_descr_mutex_unlock(ctx, descr);
pg_cache_replaceQ_insert(ctx, descr);
debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
if (xt_io_descr->release_descr) {
pg_cache_put(ctx, descr);
pg_cache_put_unsafe(descr);
} else {
debug(D_RRDENGINE, "%s: Waking up waiters.", __func__);
pg_cache_wake_up_waiters(ctx, descr);
pg_cache_wake_up_waiters_unsafe(descr);
}
rrdeng_page_descr_mutex_unlock(ctx, descr);
}
if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) {
freez(uncompressed_buf);
Expand Down
10 changes: 5 additions & 5 deletions database/engine/rrdengineapi.c
Expand Up @@ -473,16 +473,16 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
/* We need to get a new page */
if (descr) {
/* Drop old page's reference */
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
pg_cache_put(ctx, descr);
handle->descr = NULL;
handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1;
if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
goto no_more_metrics;
}
next_page_time = handle->next_page_time * USEC_PER_SEC;
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
#endif
pg_cache_put(ctx, descr);
handle->descr = NULL;
}
descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id,
next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
Expand Down

0 comments on commit 263a5b4

Please sign in to comment.