Skip to content

Commit

Permalink
Queue dimension sets dimension liveness and queues the exact payload …
Browse files Browse the repository at this point in the history
…to store in the database
  • Loading branch information
stelfrag committed May 16, 2022
1 parent b36f0e8 commit d42e947
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 47 deletions.
115 changes: 69 additions & 46 deletions database/sqlite/sqlite_aclk_chart.c
Expand Up @@ -172,7 +172,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat
size_t size;
char *payload = generate_chart_instance_updated(&size, &chart_payload);
if (likely(payload))
rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL);
rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL, 1);
freez(payload);
chart_instance_updated_destroy(&chart_payload);
}
Expand Down Expand Up @@ -210,7 +210,7 @@ static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *w
dim_payload.last_timestamp.tv_sec = last_time;
char *payload = generate_chart_dimension_updated(&size, &dim_payload);
if (likely(payload))
rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status);
rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status, 1);
freez(payload);
return rc;
}
Expand Down Expand Up @@ -284,39 +284,22 @@ void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, str

int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
int rc = 0;
int rc = 1;
CHECK_SQLITE_CONNECTION(db_meta);

char *claim_id = is_agent_claimed();

RRDDIM *rd = cmd.data;

if (likely(claim_id)) {
time_t send_status = 0;
time_t now = now_realtime_sec();

time_t first_t = rd->state->query_ops.oldest_time(rd);
time_t last_t = rd->state->query_ops.latest_time(rd);
struct aclk_chart_dimension_data *aclk_cd_data = cmd.data;

int live = ((now - last_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every));

rc = aclk_upd_dimension_event(
wc,
claim_id,
&rd->state->metric_uuid,
rd->id,
rd->name,
rd->rrdset->id,
first_t,
live ? 0 : last_t,
&send_status);
char *claim_id = is_agent_claimed();
if (!claim_id)
goto cleanup;

if (!send_status)
rd->state->aclk_live_status = live;
rc = aclk_add_chart_payload(wc, &aclk_cd_data->uuid, claim_id, ACLK_PAYLOAD_DIMENSION,
(void *) aclk_cd_data->payload, aclk_cd_data->payload_size, NULL, 0);

freez(claim_id);
}
rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK);
freez(claim_id);
cleanup:
freez(aclk_cd_data->payload);
freez(aclk_cd_data);
return rc;
}

Expand Down Expand Up @@ -1103,16 +1086,63 @@ void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc)
return;
}

void queue_dimension_to_aclk(RRDDIM *rd)
void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated)
{
if (rrddim_flag_check(rd, RRDDIM_FLAG_ACLK))
int live = !last_updated;

if (likely(rd->state->aclk_live_status == live))
return;

rd->state->aclk_live_status = live;

struct aclk_database_worker_config *wc = rd->rrdset->rrdhost->dbsync_worker;
if (unlikely(!wc))
return;

char *claim_id = is_agent_claimed();
if (unlikely(!claim_id))
return;

struct chart_dimension_updated dim_payload;
memset(&dim_payload, 0, sizeof(dim_payload));
dim_payload.node_id = wc->node_id;
dim_payload.claim_id = claim_id;
dim_payload.name = rd->name;
dim_payload.id = rd->id;
dim_payload.chart_id = rd->rrdset->id;
dim_payload.created_at.tv_sec = rd->state->query_ops.oldest_time(rd);
dim_payload.last_timestamp.tv_sec = last_updated;

size_t size = 0;
char *payload = generate_chart_dimension_updated(&size, &dim_payload);

freez(claim_id);
if (unlikely(!payload))
return;

rrddim_flag_set(rd, RRDDIM_FLAG_ACLK);
int rc = sql_queue_chart_payload((struct aclk_database_worker_config *) rd->rrdset->rrdhost->dbsync_worker,
rd, ACLK_DATABASE_ADD_DIMENSION);
if (unlikely(rc))
rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK);
time_t date_submitted = payload_sent(wc->uuid_str, &rd->state->metric_uuid, payload, size);
if (date_submitted) {
freez(payload);
return;
}

struct aclk_chart_dimension_data *aclk_cd_data = mallocz(sizeof(*aclk_cd_data));
uuid_copy(aclk_cd_data->uuid, rd->state->metric_uuid);
aclk_cd_data->payload = payload;
aclk_cd_data->payload_size = size;

struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));

cmd.opcode = ACLK_DATABASE_ADD_DIMENSION;
cmd.data = aclk_cd_data;
int rc = aclk_database_enq_cmd_noblock(wc, &cmd);

if (unlikely(rc)) {
freez(aclk_cd_data->payload);
freez(aclk_cd_data);
rd->state->aclk_live_status = !live;
}
return;
}

Expand Down Expand Up @@ -1282,15 +1312,8 @@ void sql_check_chart_liveness(RRDSET *st) {

debug(D_ACLK_SYNC,"Check chart liveness [%s] scanning dimensions", st->name);
rrddim_foreach_read(rd, st) {
if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) {
int live = (mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every;
if (unlikely(live != rd->state->aclk_live_status)) {
debug(D_ACLK_SYNC,"Dimension change [%s] on [%s] from live %d --> %d", rd->id, rd->rrdset->name, rd->state->aclk_live_status, live);
queue_dimension_to_aclk(rd);
}
else
debug(D_ACLK_SYNC,"Dimension check [%s] on [%s] liveness matches", rd->id, st->name);
}
if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN))
queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark));
}
rrdset_unlock(st);
}
Expand Down
12 changes: 11 additions & 1 deletion database/sqlite/sqlite_aclk_chart.h
Expand Up @@ -16,10 +16,20 @@ extern sqlite3 *db_meta;
#define RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER (3)
#endif

#ifndef RRDSET_MINIMUM_DIM_OFFLINE_MULTIPLIER
#define RRDSET_MINIMUM_DIM_OFFLINE_MULTIPLIER (30)
#endif

#ifndef ACLK_MAX_DIMENSION_CLEANUP
#define ACLK_MAX_DIMENSION_CLEANUP (500)
#endif

struct aclk_chart_dimension_data {
uuid_t uuid;
char *payload;
size_t payload_size;
};

struct aclk_chart_sync_stats {
int updates;
uint64_t batch_id;
Expand All @@ -37,7 +47,7 @@ struct aclk_chart_sync_stats {
};

extern int queue_chart_to_aclk(RRDSET *st);
extern void queue_dimension_to_aclk(RRDDIM *rd);
extern void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated);
extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
Expand Down

0 comments on commit d42e947

Please sign in to comment.