From d42e94732aa5f420a5dd6afe281c30f85bdb052d Mon Sep 17 00:00:00 2001 From: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Date: Mon, 16 May 2022 18:35:56 +0300 Subject: [PATCH] Queue dimension sets dimension liveness and queues the exact payload to store in the database --- database/sqlite/sqlite_aclk_chart.c | 115 +++++++++++++++++----------- database/sqlite/sqlite_aclk_chart.h | 12 ++- 2 files changed, 80 insertions(+), 47 deletions(-) diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c index 2aa6e9deae3a1c..c69d04e6575447 100644 --- a/database/sqlite/sqlite_aclk_chart.c +++ b/database/sqlite/sqlite_aclk_chart.c @@ -172,7 +172,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat size_t size; char *payload = generate_chart_instance_updated(&size, &chart_payload); if (likely(payload)) - rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL); + rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL, 1); freez(payload); chart_instance_updated_destroy(&chart_payload); } @@ -210,7 +210,7 @@ static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *w dim_payload.last_timestamp.tv_sec = last_time; char *payload = generate_chart_dimension_updated(&size, &dim_payload); if (likely(payload)) - rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status); + rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status, 1); freez(payload); return rc; } @@ -284,39 +284,22 @@ void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, str int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) { - int rc = 0; + int rc = 1; CHECK_SQLITE_CONNECTION(db_meta); - char *claim_id = is_agent_claimed(); - - RRDDIM *rd = cmd.data; - - if (likely(claim_id)) { - time_t send_status = 0; - time_t now = now_realtime_sec(); - - time_t first_t = rd->state->query_ops.oldest_time(rd); - time_t last_t = rd->state->query_ops.latest_time(rd); + struct aclk_chart_dimension_data *aclk_cd_data = cmd.data; - int live = ((now - last_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every)); - - rc = aclk_upd_dimension_event( - wc, - claim_id, - &rd->state->metric_uuid, - rd->id, - rd->name, - rd->rrdset->id, - first_t, - live ? 0 : last_t, - &send_status); + char *claim_id = is_agent_claimed(); + if (!claim_id) + goto cleanup; - if (!send_status) - rd->state->aclk_live_status = live; + rc = aclk_add_chart_payload(wc, &aclk_cd_data->uuid, claim_id, ACLK_PAYLOAD_DIMENSION, + (void *) aclk_cd_data->payload, aclk_cd_data->payload_size, NULL, 0); - freez(claim_id); - } - rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK); + freez(claim_id); +cleanup: + freez(aclk_cd_data->payload); + freez(aclk_cd_data); return rc; } @@ -1103,16 +1086,63 @@ void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc) return; } -void queue_dimension_to_aclk(RRDDIM *rd) +void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated) { - if (rrddim_flag_check(rd, RRDDIM_FLAG_ACLK)) + int live = !last_updated; + + if (likely(rd->state->aclk_live_status == live)) + return; + + rd->state->aclk_live_status = live; + + struct aclk_database_worker_config *wc = rd->rrdset->rrdhost->dbsync_worker; + if (unlikely(!wc)) + return; + + char *claim_id = is_agent_claimed(); + if (unlikely(!claim_id)) + return; + + struct chart_dimension_updated dim_payload; + memset(&dim_payload, 0, sizeof(dim_payload)); + dim_payload.node_id = wc->node_id; + dim_payload.claim_id = claim_id; + dim_payload.name = rd->name; + dim_payload.id = rd->id; + dim_payload.chart_id = rd->rrdset->id; + dim_payload.created_at.tv_sec = rd->state->query_ops.oldest_time(rd); + dim_payload.last_timestamp.tv_sec = last_updated; + + size_t size = 0; + char *payload = generate_chart_dimension_updated(&size, &dim_payload); + + freez(claim_id); + if (unlikely(!payload)) return; - rrddim_flag_set(rd, RRDDIM_FLAG_ACLK); - int rc = sql_queue_chart_payload((struct aclk_database_worker_config *) rd->rrdset->rrdhost->dbsync_worker, - rd, ACLK_DATABASE_ADD_DIMENSION); - if (unlikely(rc)) - rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK); + time_t date_submitted = payload_sent(wc->uuid_str, &rd->state->metric_uuid, payload, size); + if (date_submitted) { + freez(payload); + return; + } + + struct aclk_chart_dimension_data *aclk_cd_data = mallocz(sizeof(*aclk_cd_data)); + uuid_copy(aclk_cd_data->uuid, rd->state->metric_uuid); + aclk_cd_data->payload = payload; + aclk_cd_data->payload_size = size; + + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + + cmd.opcode = ACLK_DATABASE_ADD_DIMENSION; + cmd.data = aclk_cd_data; + int rc = aclk_database_enq_cmd_noblock(wc, &cmd); + + if (unlikely(rc)) { + freez(aclk_cd_data->payload); + freez(aclk_cd_data); + rd->state->aclk_live_status = !live; + } return; } @@ -1282,15 +1312,8 @@ void sql_check_chart_liveness(RRDSET *st) { debug(D_ACLK_SYNC,"Check chart liveness [%s] scanning dimensions", st->name); rrddim_foreach_read(rd, st) { - if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) { - int live = (mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every; - if (unlikely(live != rd->state->aclk_live_status)) { - debug(D_ACLK_SYNC,"Dimension change [%s] on [%s] from live %d --> %d", rd->id, rd->rrdset->name, rd->state->aclk_live_status, live); - queue_dimension_to_aclk(rd); - } - else - debug(D_ACLK_SYNC,"Dimension check [%s] on [%s] liveness matches", rd->id, st->name); - } + if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) + queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark)); } rrdset_unlock(st); } diff --git a/database/sqlite/sqlite_aclk_chart.h b/database/sqlite/sqlite_aclk_chart.h index 75aff3af30a08b..f98cf55c5c62e9 100644 --- a/database/sqlite/sqlite_aclk_chart.h +++ b/database/sqlite/sqlite_aclk_chart.h @@ -16,10 +16,20 @@ extern sqlite3 *db_meta; #define RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER (3) #endif +#ifndef RRDSET_MINIMUM_DIM_OFFLINE_MULTIPLIER +#define RRDSET_MINIMUM_DIM_OFFLINE_MULTIPLIER (30) +#endif + #ifndef ACLK_MAX_DIMENSION_CLEANUP #define ACLK_MAX_DIMENSION_CLEANUP (500) #endif +struct aclk_chart_dimension_data { + uuid_t uuid; + char *payload; + size_t payload_size; +}; + struct aclk_chart_sync_stats { int updates; uint64_t batch_id; @@ -37,7 +47,7 @@ struct aclk_chart_sync_stats { }; extern int queue_chart_to_aclk(RRDSET *st); -extern void queue_dimension_to_aclk(RRDDIM *rd); +extern void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated); extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);