Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logqueue: add queue level metrics #4392

Merged
merged 3 commits into from Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 6 additions & 5 deletions lib/driver.c
Expand Up @@ -244,7 +244,7 @@ log_src_driver_free(LogPipe *s)

static LogQueue *
_create_memory_queue(LogDestDriver *self, const gchar *persist_name, gint stats_level,
const StatsClusterKeyBuilder *driver_sck_builder)
const StatsClusterKeyBuilder *driver_sck_builder, StatsClusterKeyBuilder *queue_sck_builder)
{
GlobalConfig *cfg = log_pipe_get_config(&self->super.super);

Expand All @@ -258,16 +258,17 @@ _create_memory_queue(LogDestDriver *self, const gchar *persist_name, gint stats_
"flags(flow-control) option set.) To enable the new behaviour, update the @version string in "
"your configuration and consider lowering the value of log-fifo-size().");

return log_queue_fifo_legacy_new(log_fifo_size, persist_name, stats_level, driver_sck_builder);
return log_queue_fifo_legacy_new(log_fifo_size, persist_name, stats_level, driver_sck_builder, queue_sck_builder);
}

return log_queue_fifo_new(log_fifo_size, persist_name, stats_level, driver_sck_builder);
return log_queue_fifo_new(log_fifo_size, persist_name, stats_level, driver_sck_builder, queue_sck_builder);
}

/* returns a reference */
static LogQueue *
log_dest_driver_acquire_memory_queue(LogDestDriver *self, const gchar *persist_name, gint stats_level,
const StatsClusterKeyBuilder *driver_sck_builder)
const StatsClusterKeyBuilder *driver_sck_builder,
StatsClusterKeyBuilder *queue_sck_builder)
{
GlobalConfig *cfg = log_pipe_get_config(&self->super.super);
LogQueue *queue = NULL;
Expand All @@ -283,7 +284,7 @@ log_dest_driver_acquire_memory_queue(LogDestDriver *self, const gchar *persist_n

if (!queue)
{
queue = _create_memory_queue(self, persist_name, stats_level, driver_sck_builder);
queue = _create_memory_queue(self, persist_name, stats_level, driver_sck_builder, queue_sck_builder);
log_queue_set_throttle(queue, self->throttle);
}
return queue;
Expand Down
8 changes: 5 additions & 3 deletions lib/driver.h
Expand Up @@ -159,7 +159,8 @@ struct _LogDestDriver
LogDriver super;

LogQueue *(*acquire_queue)(LogDestDriver *s, const gchar *persist_name, gint stats_level,
const StatsClusterKeyBuilder *driver_sck_builder);
const StatsClusterKeyBuilder *driver_sck_builder,
StatsClusterKeyBuilder *queue_sck_builder);
void (*release_queue)(LogDestDriver *s, LogQueue *q);

/* queues managed by this LogDestDriver, all constructed queues come
Expand All @@ -174,11 +175,12 @@ struct _LogDestDriver
/* returns a reference */
static inline LogQueue *
log_dest_driver_acquire_queue(LogDestDriver *self, const gchar *persist_name, gint stats_level,
const StatsClusterKeyBuilder *driver_sck_builder)
const StatsClusterKeyBuilder *driver_sck_builder,
StatsClusterKeyBuilder *queue_sck_builder)
{
LogQueue *q;

q = self->acquire_queue(self, persist_name, stats_level, driver_sck_builder);
q = self->acquire_queue(self, persist_name, stats_level, driver_sck_builder, queue_sck_builder);
if (q)
{
self->queues = g_list_prepend(self->queues, q);
Expand Down
11 changes: 7 additions & 4 deletions lib/logqueue-fifo.c
Expand Up @@ -656,14 +656,17 @@ log_queue_fifo_free(LogQueue *s)

LogQueue *
log_queue_fifo_new(gint log_fifo_size, const gchar *persist_name, gint stats_level,
const StatsClusterKeyBuilder *driver_sck_builder)
const StatsClusterKeyBuilder *driver_sck_builder, StatsClusterKeyBuilder *queue_sck_builder)
{
LogQueueFifo *self;

gint max_threads = main_loop_worker_get_max_number_of_threads();
self = g_malloc0(sizeof(LogQueueFifo) + max_threads * sizeof(self->input_queues[0]));

log_queue_init_instance(&self->super, persist_name, stats_level, driver_sck_builder);
if (queue_sck_builder)
stats_cluster_key_builder_set_name_prefix(queue_sck_builder, "memory_queue_");

log_queue_init_instance(&self->super, persist_name, stats_level, driver_sck_builder, queue_sck_builder);
self->super.type = log_queue_fifo_type;
self->super.use_backlog = FALSE;
self->super.get_length = log_queue_fifo_get_length;
Expand Down Expand Up @@ -696,10 +699,10 @@ log_queue_fifo_new(gint log_fifo_size, const gchar *persist_name, gint stats_lev

LogQueue *
log_queue_fifo_legacy_new(gint log_fifo_size, const gchar *persist_name, gint stats_level,
const StatsClusterKeyBuilder *driver_sck_builder)
const StatsClusterKeyBuilder *driver_sck_builder, StatsClusterKeyBuilder *queue_sck_builder)
{
LogQueueFifo *self = (LogQueueFifo *) log_queue_fifo_new(log_fifo_size, persist_name, stats_level,
driver_sck_builder);
driver_sck_builder, queue_sck_builder);
self->use_legacy_fifo_size = TRUE;
return &self->super;
}
Expand Down
6 changes: 4 additions & 2 deletions lib/logqueue-fifo.h
Expand Up @@ -28,9 +28,11 @@
#include "logqueue.h"

LogQueue *log_queue_fifo_new(gint log_fifo_size, const gchar *persist_name, gint stats_level,
const StatsClusterKeyBuilder *driver_sck_builder);
const StatsClusterKeyBuilder *driver_sck_builder,
StatsClusterKeyBuilder *queue_sck_builder);
LogQueue *log_queue_fifo_legacy_new(gint log_fifo_size, const gchar *persist_name, gint stats_level,
const StatsClusterKeyBuilder *driver_sck_builder);
const StatsClusterKeyBuilder *driver_sck_builder,
StatsClusterKeyBuilder *queue_sck_builder);

QueueType log_queue_fifo_get_type(void);

Expand Down
91 changes: 77 additions & 14 deletions lib/logqueue.c
Expand Up @@ -32,53 +32,53 @@ void
log_queue_memory_usage_add(LogQueue *self, gsize value)
{
stats_counter_add(self->metrics.shared.memory_usage, value);
atomic_gssize_add(&self->metrics.owned.memory_usage, value);
stats_counter_add(self->metrics.owned.memory_usage, value);
}

void
log_queue_memory_usage_sub(LogQueue *self, gsize value)
{
stats_counter_sub(self->metrics.shared.memory_usage, value);
atomic_gssize_sub(&self->metrics.owned.memory_usage, value);
stats_counter_sub(self->metrics.owned.memory_usage, value);
}

void
log_queue_queued_messages_add(LogQueue *self, gsize value)
{
stats_counter_add(self->metrics.shared.queued_messages, value);
atomic_gssize_add(&self->metrics.owned.queued_messages, value);
stats_counter_add(self->metrics.owned.queued_messages, value);
}

void
log_queue_queued_messages_sub(LogQueue *self, gsize value)
{
stats_counter_sub(self->metrics.shared.queued_messages, value);
atomic_gssize_sub(&self->metrics.owned.queued_messages, value);
stats_counter_sub(self->metrics.owned.queued_messages, value);
}

void
log_queue_queued_messages_inc(LogQueue *self)
{
stats_counter_inc(self->metrics.shared.queued_messages);
atomic_gssize_inc(&self->metrics.owned.queued_messages);
stats_counter_inc(self->metrics.owned.queued_messages);
}

void
log_queue_queued_messages_dec(LogQueue *self)
{
stats_counter_dec(self->metrics.shared.queued_messages);
atomic_gssize_dec(&self->metrics.owned.queued_messages);
stats_counter_dec(self->metrics.owned.queued_messages);
}

void
log_queue_queued_messages_reset(LogQueue *self)
{
stats_counter_sub(self->metrics.shared.queued_messages,
atomic_gssize_get_unsigned(&self->metrics.owned.queued_messages));
stats_counter_get(self->metrics.owned.queued_messages));

atomic_gssize_set_and_get(&self->metrics.owned.queued_messages, log_queue_get_length(self));
stats_counter_set(self->metrics.owned.queued_messages, log_queue_get_length(self));
stats_counter_add(self->metrics.shared.queued_messages,
atomic_gssize_get_unsigned(&self->metrics.owned.queued_messages));
stats_counter_get(self->metrics.owned.queued_messages));
}

void
Expand Down Expand Up @@ -260,14 +260,46 @@ _register_shared_counters(LogQueue *self, gint stats_level, const StatsClusterKe
stats_cluster_key_builder_free(local_builder);
}

static void
_register_owned_counters(LogQueue *self, StatsClusterKeyBuilder *builder)
{
if (!builder)
return;

stats_cluster_key_builder_set_name(builder, "events");
self->metrics.owned.events_sc_key = stats_cluster_key_builder_build_single(builder);

stats_cluster_key_builder_set_name(builder, "memory_usage_bytes");
self->metrics.owned.memory_usage_sc_key = stats_cluster_key_builder_build_single(builder);

stats_lock();
{
stats_register_counter(STATS_LEVEL1, self->metrics.owned.events_sc_key, SC_TYPE_SINGLE_VALUE,
&self->metrics.owned.queued_messages);
stats_register_counter(STATS_LEVEL1, self->metrics.owned.memory_usage_sc_key, SC_TYPE_SINGLE_VALUE,
&self->metrics.owned.memory_usage);
}
stats_unlock();
}

static void
_register_counters(LogQueue *self, gint stats_level, const StatsClusterKeyBuilder *driver_sck_builder,
StatsClusterKeyBuilder *queue_sck_builder)
{
g_assert(!driver_sck_builder || queue_sck_builder);

_register_shared_counters(self, stats_level, driver_sck_builder);
_register_owned_counters(self, queue_sck_builder);
}

static void
_unregister_shared_counters(LogQueue *self)
{
stats_lock();
{
if (self->metrics.shared.output_events_sc_key)
{
log_queue_queued_messages_sub(self, atomic_gssize_get_unsigned(&self->metrics.owned.queued_messages));
log_queue_queued_messages_sub(self, stats_counter_get(self->metrics.owned.queued_messages));
stats_unregister_counter(self->metrics.shared.output_events_sc_key, SC_TYPE_QUEUED,
&self->metrics.shared.queued_messages);
stats_unregister_counter(self->metrics.shared.output_events_sc_key, SC_TYPE_DROPPED,
Expand All @@ -278,7 +310,7 @@ _unregister_shared_counters(LogQueue *self)

if (self->metrics.shared.memory_usage_sc_key)
{
log_queue_memory_usage_sub(self, atomic_gssize_get_unsigned(&self->metrics.owned.memory_usage));
log_queue_memory_usage_sub(self, stats_counter_get(self->metrics.owned.memory_usage));
stats_unregister_counter(self->metrics.shared.memory_usage_sc_key, SC_TYPE_SINGLE_VALUE,
&self->metrics.shared.memory_usage);

Expand All @@ -288,23 +320,54 @@ _unregister_shared_counters(LogQueue *self)
stats_unlock();
}

static void
_unregister_owned_counters(LogQueue *self)
{
stats_lock();
{
if (self->metrics.owned.events_sc_key)
{
stats_unregister_counter(self->metrics.owned.events_sc_key, SC_TYPE_SINGLE_VALUE,
&self->metrics.owned.queued_messages);

stats_cluster_key_free(self->metrics.owned.events_sc_key);
}

if (self->metrics.owned.memory_usage_sc_key)
{
stats_unregister_counter(self->metrics.owned.memory_usage_sc_key, SC_TYPE_SINGLE_VALUE,
&self->metrics.owned.memory_usage);

stats_cluster_key_free(self->metrics.owned.memory_usage_sc_key);
}
}
stats_unlock();
}

static void
_unregister_counters(LogQueue *self)
{
_unregister_shared_counters(self);
_unregister_owned_counters(self);
}

void
log_queue_init_instance(LogQueue *self, const gchar *persist_name, gint stats_level,
const StatsClusterKeyBuilder *driver_sck_builder)
const StatsClusterKeyBuilder *driver_sck_builder, StatsClusterKeyBuilder *queue_sck_builder)
{
g_atomic_counter_set(&self->ref_cnt, 1);
self->free_fn = log_queue_free_method;

self->persist_name = persist_name ? g_strdup(persist_name) : NULL;
g_mutex_init(&self->lock);

_register_shared_counters(self, stats_level, driver_sck_builder);
_register_counters(self, stats_level, driver_sck_builder, queue_sck_builder);
}

void
log_queue_free_method(LogQueue *self)
{
_unregister_shared_counters(self);
_unregister_counters(self);
g_mutex_clear(&self->lock);
g_free(self->persist_name);
g_free(self);
Expand Down
10 changes: 7 additions & 3 deletions lib/logqueue.h
Expand Up @@ -49,8 +49,11 @@ typedef struct _LogQueueMetrics

struct
{
atomic_gssize memory_usage;
atomic_gssize queued_messages;
StatsClusterKey *events_sc_key;
StatsClusterKey *memory_usage_sc_key;

StatsCounterItem *memory_usage;
StatsCounterItem *queued_messages;
} owned;
} LogQueueMetrics;

Expand Down Expand Up @@ -233,7 +236,8 @@ void log_queue_set_parallel_push(LogQueue *self, LogQueuePushNotifyFunc parallel
gboolean log_queue_check_items(LogQueue *self, gint *timeout, LogQueuePushNotifyFunc parallel_push_notify,
gpointer user_data, GDestroyNotify user_data_destroy);
void log_queue_init_instance(LogQueue *self, const gchar *persist_name, gint stats_level,
const StatsClusterKeyBuilder *driver_sck_builder);
const StatsClusterKeyBuilder *driver_sck_builder,
StatsClusterKeyBuilder *queue_sck_builder);

void log_queue_free_method(LogQueue *self);

Expand Down
20 changes: 19 additions & 1 deletion lib/logthrdest/logthrdestdrv.c
Expand Up @@ -743,11 +743,29 @@ log_threaded_dest_worker_start(LogThreadedDestWorker *self)
return main_loop_threaded_worker_start(&self->thread);
}

static void
_init_queue_sck_builder(LogThreadedDestWorker *self, StatsClusterKeyBuilder *builder)
{
stats_cluster_key_builder_add_label(builder, stats_cluster_label("id", self->owner->super.super.id ? : ""));
stats_cluster_key_builder_add_label(builder, stats_cluster_label("driver_instance",
self->owner->format_stats_instance(self->owner)));

gchar worker_index_str[8];
g_snprintf(worker_index_str, sizeof(worker_index_str), "%d", self->worker_index);
stats_cluster_key_builder_add_label(builder, stats_cluster_label("worker", worker_index_str));
}

static gboolean
_acquire_worker_queue(LogThreadedDestWorker *self, gint stats_level, const StatsClusterKeyBuilder *driver_sck_builder)
{
gchar *persist_name = _format_queue_persist_name(self);
self->queue = log_dest_driver_acquire_queue(&self->owner->super, persist_name, stats_level, driver_sck_builder);
StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new();
_init_queue_sck_builder(self, queue_sck_builder);

self->queue = log_dest_driver_acquire_queue(&self->owner->super, persist_name, stats_level, driver_sck_builder,
queue_sck_builder);

stats_cluster_key_builder_free(queue_sck_builder);
g_free(persist_name);

if (!self->queue)
Expand Down
7 changes: 7 additions & 0 deletions lib/logwriter.c
Expand Up @@ -1756,6 +1756,13 @@ log_writer_init_driver_sck_builder(LogWriter *self, StatsClusterKeyBuilder *buil
self->stats_instance);
}

void
log_writer_init_queue_sck_builder(LogWriter *self, StatsClusterKeyBuilder *builder)
{
stats_cluster_key_builder_add_label(builder, stats_cluster_label("id", self->stats_id));
stats_cluster_key_builder_add_label(builder, stats_cluster_label("driver_instance", self->stats_instance));
}

void
log_writer_set_options(LogWriter *self, LogPipe *control, LogWriterOptions *options,
const gchar *stats_id, const gchar *stats_instance)
Expand Down
1 change: 1 addition & 0 deletions lib/logwriter.h
Expand Up @@ -89,6 +89,7 @@ LogWriter *log_writer_new(guint32 flags, GlobalConfig *cfg);
void log_writer_msg_rewind(LogWriter *self);

void log_writer_init_driver_sck_builder(LogWriter *self, StatsClusterKeyBuilder *builder);
void log_writer_init_queue_sck_builder(LogWriter *self, StatsClusterKeyBuilder *builder);

void log_writer_options_set_template_escape(LogWriterOptions *options, gboolean enable);
void log_writer_options_defaults(LogWriterOptions *options);
Expand Down