Skip to content

Commit

Permalink
New journal disk based indexing for agent memory reduction (#13885)
Browse files Browse the repository at this point in the history
* Add read only option to netdata_mmap so files are accessed ousing PROT_READ

* Initial functions to write the new journal file and switch to the new indexing

* Cleanup code, add parameters to pg_cache_punch_hole to avoid updating page latets oldest times
pg_cache insert to have parameter if page index locked needs to be done
Page eviction functions will try to deallocate the descriptor as well (pg_cache_punch_hole without page_index time updates)
Cleanup messages during startup

* Cleanup messages during startup

* Disbale extent caching for now, add placeholder for journal indexing and activation while the agent is running

* Add main function to populate descriptors by checking the new journal indexing

* prevent crash

* fix for binary search crash

* Avoid Time-of-check time-of-use filesystem race condition

* always add a page

* populate fixes - it is still incomplete

* pg_cache_insert returns the descriptor that ends up in the page_index

* Add populate next (Fix 1)

* Fix compilation warnings, reactivate extent caching

* Add populate next (Fix 2)

* Add populate next (Fix 3) switch to the next entry or journal file when asking to populate descriptor with next

* Fix resource leak and wrong sizeof

* Rework page population (part 1)

* Additional checksums added / journal validation

* Cleanup (part 1)

* Locking added and Cleanup (part 2)

* Close journal file after new journal index activation

* Skip warning when compiling without NETDATA_INTERNAL_CHECKS

* Ignore empty index file (header and trailer and no metrics)

* Try to remove all evicted descriptors (may prevent slight memory increase)

* Evict pages also when we succesfully do try_reserve

* Precache pages and cleanup

* Add a separate cleanup thread to release unused descriptors

* Check existence of key correctly

* Fix total file size calculation

* Statistics for journal descriptors

* Track and release jourval v2 descriptors

* Do not try to allocate pages for locality if under pressure

* Do not track v2 descriptors when populating the page_index

* Track page descriptors as they are inserted in the page index (per journal file)
Scan journal files for pending items to cleanup
Cleanup v2 descriptors only if they are not populated
Check before adding to page cache to avoid memory allocation /free

* Close journal file that has been processed and migrated to the new index
Check for valid file before trying to truncate / close. This file has been closed during startup

* Better calculation for the number of prefetched data pages based on the query end time
Code cleanup and comments
Add v2 populated descriptor expiration based on journal access time

* Code cleanup

* Faster indexing
Better journal validation (more sanity checks)
Detect new datafile/ journal creation and trigger index generation
Switch to the new index / mark descriptors in memory as needed
Update journal access time when a descriptor is returned
Code cleanup (part 1)

* Re activate descriptor clean
Code cleanup

* Allow locality precaching

* Allow locality precaching for the same page alignment

* Descriptor cleanup internal changed

* Disable locality precaching

* Precache only if not under pressure / internal cleanup at 60 seconds

* Remove unused functions

* Migrate on startup always
Make sure the metric uuid is valid (we have a page_index)
Prevent crash if no datafile is available when logging an error
Remove unused functions

* New warn limit for precaching
Stress test v2 descriptor cleanup
   - Every 1s cleanup if it doesnt exist in cache
   - 60s cache eviction

* Arrayalloc internal checks on free activated with NETDATA_ARRAYALLOC_INTERNAL_CHECKS
Ability to add DESCRIPTOR_EXPIRATION_TIME and DESCRIPTOR_INTERVAL_CLEANUP during compile
Defaults DESCRIPTOR_INTERVAL_CLEANUP = 60 and DESCRIPTOR_EXPIRATION_TIME = 600

* Lookup page index correctly

* Calculate index time once

* Detect a duplicate page when doing cache insert and during flushing of pages

* Better logging

* Descriptor validation (extent vs page index) when building an index file while the agent is running

* Mark invalid entries in the journal v2 file

* Schedule an index rebuild if a descriptor is found without an extent in the timerange we are processing
Release descriptor lock to prevent random shutdown locks

* Proper unlock

* Skip descriptor cleanup when journal file v2 migration is running

* Fix page cache statistics
Remove multiple entries of the page_index from the page cache
Cleanup

* Adjust preload pages on pg_cache_next. Handle invalid descriptor properly
Unlock properly

* Better handling of invalid pages
Journal indexing during runtime will scan all files to find potential ones to index

* Reactivate migration on startup
Evict descriptors to cause migration
Don't count the entries in page index (calculate when processing the extent list)
Check for valid extent since we may set the extent to NULL on startup if it is invalid
Better structure init
Address valgrind issues

* Add don't fork/dump option

* Add separate lock to protect accessing a datafile's extent list
Comment out some unused code (for now)
Abort descriptor cleanup if we are force flushing pages (page cache under pressure)

* Check for index and schedule when data flush completes
Configure max datafile size during compilation
Keep a separate JudyL array for descriptors
Skip quota test if we are deleting descriptors or explicitly flushing pages under pressure

* Fix

* set function when waiters are waken up

* add the line number to trace the deadlock

* add thread id

* add wait list

* init to zero

* disable thread cancelability inside dbengine rrdeng_load_page_next()

* make sure the owner is the thread

* disable thread cancelability for replication as a whole

* Check and queue indexing after first page flush

* Queue indexing after a small delay to allow some time for page flushing

* tracing of waiters only when compiled with internal checks

* Mark descr with extent_entry

* Return page timeout

* Check if a journalfile is ready to be indexed
Migrate the descriptors or evict if possible
Compilation warning fix

* Use page index if indexing during startup
Mark if journalfile should be checked depending on whether we can migrate or delete a page during indexing

* require 3x max message size as sender buffer

* fix for the msg of the adaptive buffer size

* fix for the msg of the duplicate replication commands

* Disable descriptor deletion during migration

* Detect descriptor with same start page time

* sender sorts replication requests before fullfilling them; receiver does not send duplicate replication requests

* dbengine never allows past timestamps to be collected

* do not accept values same as last data point stored in dbengine

* replicate non-overlapping ranges

* a better replication logic to avoid sending overlapping data to parents

* Do not start journal migration in parallel

* Always update page index times

* Fix page index first / last times on load

* internal log when replication responses do not match the requests or when replication commands are sent while there are others inflight

* do not log out of bounds RBEGIN if it is the last replication command we sent

* better checking of past data collection points

* better checking of past data collection points - optimized

* fix corruption during decompression of streaming

* Add config to disable journal indexing
Add config parameter for detailed journal integrity check (Metric chain validation check during startup)
pg cache insert drop check for existing page
Fix crc calculation for metric headers

* children disable compression globally, only when the compression gives an error

* turn boolean member into RRDHOST OPTION

* Compilation warnings

* Remove unused code

* replication sender statistics

* replication sender statistics set to 100% when no replication requests are pending

* Fix casting warning

Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
  • Loading branch information
stelfrag and ktsaou committed Nov 15, 2022
1 parent b4a0298 commit 224b051
Show file tree
Hide file tree
Showing 30 changed files with 2,320 additions and 651 deletions.
32 changes: 30 additions & 2 deletions collectors/plugins.d/pluginsd_parser.c
Expand Up @@ -293,9 +293,25 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
// rrdhost_hostname(host), rrdset_id(st),
// (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);

rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
bool ok = true;
if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {

#ifdef NETDATA_INTERNAL_CHECKS
st->replay.start_streaming = false;
st->replay.after = 0;
st->replay.before = 0;
#endif

rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);

ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child,
last_entry_child, 0, 0);
}
else {
internal_error(true, "RRDSET: not sending duplicate replication request for chart '%s'", rrdset_id(st));
}

bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0);
return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
}

Expand Down Expand Up @@ -875,6 +891,11 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
time_t start_time = strtol(start_time_str, NULL, 0);
time_t end_time = strtol(end_time_str, NULL, 0);

internal_error(
(!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
"REPLAY: received a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, which does not match our request (%ld to %ld).",
rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, st->replay.after, st->replay.before);

if(start_time && end_time) {
if (start_time > end_time) {
error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', but timings are invalid (%ld to %ld). Disabling it.",
Expand Down Expand Up @@ -1135,11 +1156,18 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
st->counter++;
st->counter_done++;

#ifdef NETDATA_INTERNAL_CHECKS
st->replay.start_streaming = false;
st->replay.after = 0;
st->replay.before = 0;
#endif

if (start_streaming) {
if (st->update_every != update_every_child)
rrdset_set_update_every(st, update_every_child);

rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
return PARSER_RC_OK;
}
Expand Down
3 changes: 3 additions & 0 deletions daemon/global_statistics.c
Expand Up @@ -865,6 +865,7 @@ static void dbengine_statistics_charts(void) {

{
static RRDSET *st_long_term_pages = NULL;
static RRDDIM *rd_memory = NULL;
static RRDDIM *rd_total = NULL;
static RRDDIM *rd_insertions = NULL;
static RRDDIM *rd_deletions = NULL;
Expand All @@ -885,6 +886,7 @@ static void dbengine_statistics_charts(void) {
localhost->rrd_update_every,
RRDSET_TYPE_LINE);

rd_memory = rrddim_add(st_long_term_pages, "journal v2 descriptors", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_total = rrddim_add(st_long_term_pages, "total", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_insertions = rrddim_add(st_long_term_pages, "insertions", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_deletions = rrddim_add(st_long_term_pages, "deletions", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL);
Expand All @@ -893,6 +895,7 @@ static void dbengine_statistics_charts(void) {
} else
rrdset_next(st_long_term_pages);

rrddim_set_by_pointer(st_long_term_pages, rd_memory, (collected_number)stats_array[37]);
rrddim_set_by_pointer(st_long_term_pages, rd_total, (collected_number)stats_array[2]);
rrddim_set_by_pointer(st_long_term_pages, rd_insertions, (collected_number)stats_array[5]);
rrddim_set_by_pointer(st_long_term_pages, rd_deletions, (collected_number)stats_array[6]);
Expand Down
3 changes: 3 additions & 0 deletions daemon/main.c
Expand Up @@ -680,6 +680,9 @@ static void get_netdata_configured_variables() {

db_engine_use_malloc = config_get_boolean(CONFIG_SECTION_DB, "dbengine page cache with malloc", CONFIG_BOOLEAN_NO);
default_rrdeng_page_cache_mb = (int) config_get_number(CONFIG_SECTION_DB, "dbengine page cache size MB", default_rrdeng_page_cache_mb);
db_engine_journal_indexing = config_get_boolean(CONFIG_SECTION_DB, "dbengine enable journal indexing", CONFIG_BOOLEAN_YES);
db_engine_journal_check = config_get_boolean(CONFIG_SECTION_DB, "dbengine enable journal integrity check", CONFIG_BOOLEAN_NO);

if(default_rrdeng_page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) {
error("Invalid page cache size %d given. Defaulting to %d.", default_rrdeng_page_cache_mb, RRDENG_MIN_PAGE_CACHE_SIZE_MB);
default_rrdeng_page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
Expand Down
41 changes: 34 additions & 7 deletions database/engine/datafile.c
@@ -1,9 +1,29 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"

void df_extent_delete_all_unsafe(struct rrdengine_datafile *datafile)
{
struct extent_info *extent = datafile->extents.first, *next_extent;

char path[RRDENG_PATH_MAX];

generate_journalfilepath_v2(datafile, path, sizeof(path));
internal_error(true, "Deleting extents of file %s", path);
unsigned count = 0;
while (extent) {
next_extent = extent->next;
freez(extent);
count++;
extent = next_extent;
}
datafile->extents.first = NULL;
internal_error(true, "Deleted %u extents of file %s", count, path);
}

void df_extent_insert(struct extent_info *extent)
{
struct rrdengine_datafile *datafile = extent->datafile;
uv_rwlock_wrlock(&datafile->extent_rwlock);

if (likely(NULL != datafile->extents.last)) {
datafile->extents.last->next = extent;
Expand All @@ -12,23 +32,28 @@ void df_extent_insert(struct extent_info *extent)
datafile->extents.first = extent;
}
datafile->extents.last = extent;

uv_rwlock_wrunlock(&datafile->extent_rwlock);
}

void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
{
uv_rwlock_wrlock(&ctx->datafiles.rwlock);

if (likely(NULL != ctx->datafiles.last)) {
ctx->datafiles.last->next = datafile;
}
if (unlikely(NULL == ctx->datafiles.first)) {
ctx->datafiles.first = datafile;
}
ctx->datafiles.last = datafile;

uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
}

void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
{
struct rrdengine_datafile *next;

next = datafile->next;
fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile));
ctx->datafiles.first = next;
Expand All @@ -44,6 +69,7 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_
datafile->file = (uv_file)0;
datafile->pos = 0;
datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */
fatal_assert(0 == uv_rwlock_init(&datafile->extent_rwlock));
datafile->journalfile = NULL;
datafile->next = NULL;
datafile->ctx = ctx;
Expand Down Expand Up @@ -97,7 +123,7 @@ int unlink_data_file(struct rrdengine_datafile *datafile)
return ret;
}

int destroy_data_file(struct rrdengine_datafile *datafile)
int destroy_data_file_unsafe(struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
Expand Down Expand Up @@ -176,7 +202,7 @@ int create_data_file(struct rrdengine_datafile *datafile)
uv_fs_req_cleanup(&req);
posix_memfree(superblock);
if (ret < 0) {
destroy_data_file(datafile);
destroy_data_file_unsafe(datafile);
return ret;
}

Expand Down Expand Up @@ -304,10 +330,8 @@ static int scan_data_files(struct rrdengine_instance *ctx)

datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name);
ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
if (2 == ret) {
info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name);
datafile = mallocz(sizeof(*datafile));
datafile_init(datafile, ctx, tier, no);
datafiles[matched_files++] = datafile;
Expand Down Expand Up @@ -337,6 +361,7 @@ static int scan_data_files(struct rrdengine_instance *ctx)
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
journalfile->file_index = i;
ret = load_journal_file(ctx, journalfile, datafile);
if (0 != ret) {
if (!must_delete_pair) /* If datafile is still open close it */
Expand All @@ -346,6 +371,7 @@ static int scan_data_files(struct rrdengine_instance *ctx)
if (must_delete_pair) {
char path[RRDENG_PATH_MAX];

// TODO: Also delete the version 2
error("Deleting invalid data and journal file pair.");
ret = unlink_journal_file(journalfile);
if (!ret) {
Expand Down Expand Up @@ -407,7 +433,7 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsi
return 0;

error_after_journalfile:
destroy_data_file(datafile);
destroy_data_file_unsafe(datafile);
freez(journalfile);
error_after_datafile:
freez(datafile);
Expand All @@ -421,6 +447,7 @@ int init_data_files(struct rrdengine_instance *ctx)
{
int ret;

fatal_assert(0 == uv_rwlock_init(&ctx->datafiles.rwlock));
ret = scan_data_files(ctx);
if (ret < 0) {
error("Failed to scan path \"%s\".", ctx->dbfiles_path);
Expand Down
14 changes: 12 additions & 2 deletions database/engine/datafile.h
Expand Up @@ -13,7 +13,13 @@ struct rrdengine_instance;
#define DATAFILE_PREFIX "datafile-"
#define DATAFILE_EXTENSION ".ndf"

#ifndef MAX_DATAFILE_SIZE
#define MAX_DATAFILE_SIZE (1073741824LU)
#endif
#if MIN_DATAFILE_SIZE > MAX_DATAFILE_SIZE
#error MIN_DATAFILE_SIZE > MAX_DATAFILE_SIZE
#endif

#define MIN_DATAFILE_SIZE (4194304LU)
#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */
#define TARGET_DATAFILES (20)
Expand All @@ -26,6 +32,7 @@ struct extent_info {
uint8_t number_of_pages;
struct rrdengine_datafile *datafile;
struct extent_info *next;
uint32_t index; // This is the entent index for version 2
struct rrdeng_page_descr *pages[];
};

Expand All @@ -41,27 +48,30 @@ struct rrdengine_datafile {
unsigned fileno;
uv_file file;
uint64_t pos;
uv_rwlock_t extent_rwlock;
struct rrdengine_instance *ctx;
struct rrdengine_df_extents extents;
struct rrdengine_journalfile *journalfile;
struct rrdengine_datafile *next;
};

struct rrdengine_datafile_list {
uv_rwlock_t rwlock;
struct rrdengine_datafile *first; /* oldest */
struct rrdengine_datafile *last; /* newest */
};

void df_extent_insert(struct extent_info *extent);
void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
int close_data_file(struct rrdengine_datafile *datafile);
int unlink_data_file(struct rrdengine_datafile *datafile);
int destroy_data_file(struct rrdengine_datafile *datafile);
int destroy_data_file_unsafe(struct rrdengine_datafile *datafile);
int create_data_file(struct rrdengine_datafile *datafile);
int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
int init_data_files(struct rrdengine_instance *ctx);
void finalize_data_files(struct rrdengine_instance *ctx);
void df_extent_delete_all_unsafe(struct rrdengine_datafile *datafile);

#endif /* NETDATA_DATAFILE_H */

0 comments on commit 224b051

Please sign in to comment.