diff --git a/lib/driver.c b/lib/driver.c index 5e0350a65a..e040af3a71 100644 --- a/lib/driver.c +++ b/lib/driver.c @@ -244,7 +244,7 @@ log_src_driver_free(LogPipe *s) static LogQueue * _create_memory_queue(LogDestDriver *self, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder) + const StatsClusterKeyBuilder *driver_sck_builder, StatsClusterKeyBuilder *queue_sck_builder) { GlobalConfig *cfg = log_pipe_get_config(&self->super.super); @@ -258,16 +258,17 @@ _create_memory_queue(LogDestDriver *self, const gchar *persist_name, gint stats_ "flags(flow-control) option set.) To enable the new behaviour, update the @version string in " "your configuration and consider lowering the value of log-fifo-size()."); - return log_queue_fifo_legacy_new(log_fifo_size, persist_name, stats_level, driver_sck_builder); + return log_queue_fifo_legacy_new(log_fifo_size, persist_name, stats_level, driver_sck_builder, queue_sck_builder); } - return log_queue_fifo_new(log_fifo_size, persist_name, stats_level, driver_sck_builder); + return log_queue_fifo_new(log_fifo_size, persist_name, stats_level, driver_sck_builder, queue_sck_builder); } /* returns a reference */ static LogQueue * log_dest_driver_acquire_memory_queue(LogDestDriver *self, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder) + const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder) { GlobalConfig *cfg = log_pipe_get_config(&self->super.super); LogQueue *queue = NULL; @@ -283,7 +284,7 @@ log_dest_driver_acquire_memory_queue(LogDestDriver *self, const gchar *persist_n if (!queue) { - queue = _create_memory_queue(self, persist_name, stats_level, driver_sck_builder); + queue = _create_memory_queue(self, persist_name, stats_level, driver_sck_builder, queue_sck_builder); log_queue_set_throttle(queue, self->throttle); } return queue; diff --git a/lib/driver.h b/lib/driver.h index f806759e08..222223e913 100644 --- a/lib/driver.h +++ b/lib/driver.h @@ -159,7 +159,8 @@ struct _LogDestDriver LogDriver super; LogQueue *(*acquire_queue)(LogDestDriver *s, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder); + const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder); void (*release_queue)(LogDestDriver *s, LogQueue *q); /* queues managed by this LogDestDriver, all constructed queues come @@ -174,11 +175,12 @@ struct _LogDestDriver /* returns a reference */ static inline LogQueue * log_dest_driver_acquire_queue(LogDestDriver *self, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder) + const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder) { LogQueue *q; - q = self->acquire_queue(self, persist_name, stats_level, driver_sck_builder); + q = self->acquire_queue(self, persist_name, stats_level, driver_sck_builder, queue_sck_builder); if (q) { self->queues = g_list_prepend(self->queues, q); diff --git a/lib/logqueue-fifo.c b/lib/logqueue-fifo.c index ae189c4a01..245d378600 100644 --- a/lib/logqueue-fifo.c +++ b/lib/logqueue-fifo.c @@ -656,14 +656,17 @@ log_queue_fifo_free(LogQueue *s) LogQueue * log_queue_fifo_new(gint log_fifo_size, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder) + const StatsClusterKeyBuilder *driver_sck_builder, StatsClusterKeyBuilder *queue_sck_builder) { LogQueueFifo *self; gint max_threads = main_loop_worker_get_max_number_of_threads(); self = g_malloc0(sizeof(LogQueueFifo) + max_threads * sizeof(self->input_queues[0])); - log_queue_init_instance(&self->super, persist_name, stats_level, driver_sck_builder); + if (queue_sck_builder) + stats_cluster_key_builder_set_name_prefix(queue_sck_builder, "memory_queue_"); + + log_queue_init_instance(&self->super, persist_name, stats_level, driver_sck_builder, queue_sck_builder); self->super.type = log_queue_fifo_type; self->super.use_backlog = FALSE; self->super.get_length = log_queue_fifo_get_length; @@ -696,10 +699,10 @@ log_queue_fifo_new(gint log_fifo_size, const gchar *persist_name, gint stats_lev LogQueue * log_queue_fifo_legacy_new(gint log_fifo_size, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder) + const StatsClusterKeyBuilder *driver_sck_builder, StatsClusterKeyBuilder *queue_sck_builder) { LogQueueFifo *self = (LogQueueFifo *) log_queue_fifo_new(log_fifo_size, persist_name, stats_level, - driver_sck_builder); + driver_sck_builder, queue_sck_builder); self->use_legacy_fifo_size = TRUE; return &self->super; } diff --git a/lib/logqueue-fifo.h b/lib/logqueue-fifo.h index b4170b5bdf..08348ff9cf 100644 --- a/lib/logqueue-fifo.h +++ b/lib/logqueue-fifo.h @@ -28,9 +28,11 @@ #include "logqueue.h" LogQueue *log_queue_fifo_new(gint log_fifo_size, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder); + const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder); LogQueue *log_queue_fifo_legacy_new(gint log_fifo_size, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder); + const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder); QueueType log_queue_fifo_get_type(void); diff --git a/lib/logqueue.c b/lib/logqueue.c index e228bac8c7..b951fd9036 100644 --- a/lib/logqueue.c +++ b/lib/logqueue.c @@ -32,53 +32,53 @@ void log_queue_memory_usage_add(LogQueue *self, gsize value) { stats_counter_add(self->metrics.shared.memory_usage, value); - atomic_gssize_add(&self->metrics.owned.memory_usage, value); + stats_counter_add(self->metrics.owned.memory_usage, value); } void log_queue_memory_usage_sub(LogQueue *self, gsize value) { stats_counter_sub(self->metrics.shared.memory_usage, value); - atomic_gssize_sub(&self->metrics.owned.memory_usage, value); + stats_counter_sub(self->metrics.owned.memory_usage, value); } void log_queue_queued_messages_add(LogQueue *self, gsize value) { stats_counter_add(self->metrics.shared.queued_messages, value); - atomic_gssize_add(&self->metrics.owned.queued_messages, value); + stats_counter_add(self->metrics.owned.queued_messages, value); } void log_queue_queued_messages_sub(LogQueue *self, gsize value) { stats_counter_sub(self->metrics.shared.queued_messages, value); - atomic_gssize_sub(&self->metrics.owned.queued_messages, value); + stats_counter_sub(self->metrics.owned.queued_messages, value); } void log_queue_queued_messages_inc(LogQueue *self) { stats_counter_inc(self->metrics.shared.queued_messages); - atomic_gssize_inc(&self->metrics.owned.queued_messages); + stats_counter_inc(self->metrics.owned.queued_messages); } void log_queue_queued_messages_dec(LogQueue *self) { stats_counter_dec(self->metrics.shared.queued_messages); - atomic_gssize_dec(&self->metrics.owned.queued_messages); + stats_counter_dec(self->metrics.owned.queued_messages); } void log_queue_queued_messages_reset(LogQueue *self) { stats_counter_sub(self->metrics.shared.queued_messages, - atomic_gssize_get_unsigned(&self->metrics.owned.queued_messages)); + stats_counter_get(self->metrics.owned.queued_messages)); - atomic_gssize_set_and_get(&self->metrics.owned.queued_messages, log_queue_get_length(self)); + stats_counter_set(self->metrics.owned.queued_messages, log_queue_get_length(self)); stats_counter_add(self->metrics.shared.queued_messages, - atomic_gssize_get_unsigned(&self->metrics.owned.queued_messages)); + stats_counter_get(self->metrics.owned.queued_messages)); } void @@ -260,6 +260,38 @@ _register_shared_counters(LogQueue *self, gint stats_level, const StatsClusterKe stats_cluster_key_builder_free(local_builder); } +static void +_register_owned_counters(LogQueue *self, StatsClusterKeyBuilder *builder) +{ + if (!builder) + return; + + stats_cluster_key_builder_set_name(builder, "events"); + self->metrics.owned.events_sc_key = stats_cluster_key_builder_build_single(builder); + + stats_cluster_key_builder_set_name(builder, "memory_usage_bytes"); + self->metrics.owned.memory_usage_sc_key = stats_cluster_key_builder_build_single(builder); + + stats_lock(); + { + stats_register_counter(STATS_LEVEL1, self->metrics.owned.events_sc_key, SC_TYPE_SINGLE_VALUE, + &self->metrics.owned.queued_messages); + stats_register_counter(STATS_LEVEL1, self->metrics.owned.memory_usage_sc_key, SC_TYPE_SINGLE_VALUE, + &self->metrics.owned.memory_usage); + } + stats_unlock(); +} + +static void +_register_counters(LogQueue *self, gint stats_level, const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder) +{ + g_assert(!driver_sck_builder || queue_sck_builder); + + _register_shared_counters(self, stats_level, driver_sck_builder); + _register_owned_counters(self, queue_sck_builder); +} + static void _unregister_shared_counters(LogQueue *self) { @@ -267,7 +299,7 @@ _unregister_shared_counters(LogQueue *self) { if (self->metrics.shared.output_events_sc_key) { - log_queue_queued_messages_sub(self, atomic_gssize_get_unsigned(&self->metrics.owned.queued_messages)); + log_queue_queued_messages_sub(self, stats_counter_get(self->metrics.owned.queued_messages)); stats_unregister_counter(self->metrics.shared.output_events_sc_key, SC_TYPE_QUEUED, &self->metrics.shared.queued_messages); stats_unregister_counter(self->metrics.shared.output_events_sc_key, SC_TYPE_DROPPED, @@ -278,7 +310,7 @@ _unregister_shared_counters(LogQueue *self) if (self->metrics.shared.memory_usage_sc_key) { - log_queue_memory_usage_sub(self, atomic_gssize_get_unsigned(&self->metrics.owned.memory_usage)); + log_queue_memory_usage_sub(self, stats_counter_get(self->metrics.owned.memory_usage)); stats_unregister_counter(self->metrics.shared.memory_usage_sc_key, SC_TYPE_SINGLE_VALUE, &self->metrics.shared.memory_usage); @@ -288,9 +320,40 @@ _unregister_shared_counters(LogQueue *self) stats_unlock(); } +static void +_unregister_owned_counters(LogQueue *self) +{ + stats_lock(); + { + if (self->metrics.owned.events_sc_key) + { + stats_unregister_counter(self->metrics.owned.events_sc_key, SC_TYPE_SINGLE_VALUE, + &self->metrics.owned.queued_messages); + + stats_cluster_key_free(self->metrics.owned.events_sc_key); + } + + if (self->metrics.owned.memory_usage_sc_key) + { + stats_unregister_counter(self->metrics.owned.memory_usage_sc_key, SC_TYPE_SINGLE_VALUE, + &self->metrics.owned.memory_usage); + + stats_cluster_key_free(self->metrics.owned.memory_usage_sc_key); + } + } + stats_unlock(); +} + +static void +_unregister_counters(LogQueue *self) +{ + _unregister_shared_counters(self); + _unregister_owned_counters(self); +} + void log_queue_init_instance(LogQueue *self, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder) + const StatsClusterKeyBuilder *driver_sck_builder, StatsClusterKeyBuilder *queue_sck_builder) { g_atomic_counter_set(&self->ref_cnt, 1); self->free_fn = log_queue_free_method; @@ -298,13 +361,13 @@ log_queue_init_instance(LogQueue *self, const gchar *persist_name, gint stats_le self->persist_name = persist_name ? g_strdup(persist_name) : NULL; g_mutex_init(&self->lock); - _register_shared_counters(self, stats_level, driver_sck_builder); + _register_counters(self, stats_level, driver_sck_builder, queue_sck_builder); } void log_queue_free_method(LogQueue *self) { - _unregister_shared_counters(self); + _unregister_counters(self); g_mutex_clear(&self->lock); g_free(self->persist_name); g_free(self); diff --git a/lib/logqueue.h b/lib/logqueue.h index 6b6df47ddd..d8cbb9b046 100644 --- a/lib/logqueue.h +++ b/lib/logqueue.h @@ -49,8 +49,11 @@ typedef struct _LogQueueMetrics struct { - atomic_gssize memory_usage; - atomic_gssize queued_messages; + StatsClusterKey *events_sc_key; + StatsClusterKey *memory_usage_sc_key; + + StatsCounterItem *memory_usage; + StatsCounterItem *queued_messages; } owned; } LogQueueMetrics; @@ -233,7 +236,8 @@ void log_queue_set_parallel_push(LogQueue *self, LogQueuePushNotifyFunc parallel gboolean log_queue_check_items(LogQueue *self, gint *timeout, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data, GDestroyNotify user_data_destroy); void log_queue_init_instance(LogQueue *self, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder); + const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder); void log_queue_free_method(LogQueue *self); diff --git a/lib/logthrdest/logthrdestdrv.c b/lib/logthrdest/logthrdestdrv.c index 59cbae9a76..736a6dad74 100644 --- a/lib/logthrdest/logthrdestdrv.c +++ b/lib/logthrdest/logthrdestdrv.c @@ -743,11 +743,29 @@ log_threaded_dest_worker_start(LogThreadedDestWorker *self) return main_loop_threaded_worker_start(&self->thread); } +static void +_init_queue_sck_builder(LogThreadedDestWorker *self, StatsClusterKeyBuilder *builder) +{ + stats_cluster_key_builder_add_label(builder, stats_cluster_label("id", self->owner->super.super.id ? : "")); + stats_cluster_key_builder_add_label(builder, stats_cluster_label("driver_instance", + self->owner->format_stats_instance(self->owner))); + + gchar worker_index_str[8]; + g_snprintf(worker_index_str, sizeof(worker_index_str), "%d", self->worker_index); + stats_cluster_key_builder_add_label(builder, stats_cluster_label("worker", worker_index_str)); +} + static gboolean _acquire_worker_queue(LogThreadedDestWorker *self, gint stats_level, const StatsClusterKeyBuilder *driver_sck_builder) { gchar *persist_name = _format_queue_persist_name(self); - self->queue = log_dest_driver_acquire_queue(&self->owner->super, persist_name, stats_level, driver_sck_builder); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); + _init_queue_sck_builder(self, queue_sck_builder); + + self->queue = log_dest_driver_acquire_queue(&self->owner->super, persist_name, stats_level, driver_sck_builder, + queue_sck_builder); + + stats_cluster_key_builder_free(queue_sck_builder); g_free(persist_name); if (!self->queue) diff --git a/lib/logwriter.c b/lib/logwriter.c index cab9f65882..b84296edd6 100644 --- a/lib/logwriter.c +++ b/lib/logwriter.c @@ -1756,6 +1756,13 @@ log_writer_init_driver_sck_builder(LogWriter *self, StatsClusterKeyBuilder *buil self->stats_instance); } +void +log_writer_init_queue_sck_builder(LogWriter *self, StatsClusterKeyBuilder *builder) +{ + stats_cluster_key_builder_add_label(builder, stats_cluster_label("id", self->stats_id)); + stats_cluster_key_builder_add_label(builder, stats_cluster_label("driver_instance", self->stats_instance)); +} + void log_writer_set_options(LogWriter *self, LogPipe *control, LogWriterOptions *options, const gchar *stats_id, const gchar *stats_instance) diff --git a/lib/logwriter.h b/lib/logwriter.h index ad2ed0de01..8743de04e4 100644 --- a/lib/logwriter.h +++ b/lib/logwriter.h @@ -89,6 +89,7 @@ LogWriter *log_writer_new(guint32 flags, GlobalConfig *cfg); void log_writer_msg_rewind(LogWriter *self); void log_writer_init_driver_sck_builder(LogWriter *self, StatsClusterKeyBuilder *builder); +void log_writer_init_queue_sck_builder(LogWriter *self, StatsClusterKeyBuilder *builder); void log_writer_options_set_template_escape(LogWriterOptions *options, gboolean enable); void log_writer_options_defaults(LogWriterOptions *options); diff --git a/lib/stats/stats-cluster-key-builder.c b/lib/stats/stats-cluster-key-builder.c index 5b938fb15a..8d9b09bbb9 100644 --- a/lib/stats/stats-cluster-key-builder.c +++ b/lib/stats/stats-cluster-key-builder.c @@ -28,6 +28,7 @@ struct _StatsClusterKeyBuilder { gchar *name; + gchar *name_prefix; gchar *name_suffix; GArray *labels; StatsClusterUnit unit; @@ -66,6 +67,7 @@ stats_cluster_key_builder_clone(const StatsClusterKeyBuilder *self) StatsClusterKeyBuilder *cloned = stats_cluster_key_builder_new(); stats_cluster_key_builder_set_name(cloned, self->name); + stats_cluster_key_builder_set_name_prefix(cloned, self->name_prefix); stats_cluster_key_builder_set_name_suffix(cloned, self->name_suffix); for (gint i = 0; i < self->labels->len; i++) { @@ -95,6 +97,13 @@ stats_cluster_key_builder_set_name(StatsClusterKeyBuilder *self, const gchar *na self->name = g_strdup(name); } +void +stats_cluster_key_builder_set_name_prefix(StatsClusterKeyBuilder *self, const gchar *name_prefix) +{ + g_free(self->name_prefix); + self->name_prefix = g_strdup(name_prefix); +} + void stats_cluster_key_builder_set_name_suffix(StatsClusterKeyBuilder *self, const gchar *name_suffix) { @@ -142,6 +151,7 @@ void stats_cluster_key_builder_reset(StatsClusterKeyBuilder *self) { stats_cluster_key_builder_set_name(self, NULL); + stats_cluster_key_builder_set_name_prefix(self, NULL); stats_cluster_key_builder_set_name_suffix(self, NULL); g_array_remove_range(self->labels, 0, self->labels->len); @@ -160,10 +170,7 @@ _labels_sort(const StatsClusterLabel *a, const StatsClusterLabel *b) static gchar * _format_name(const StatsClusterKeyBuilder *self) { - if (self->name_suffix) - return g_strdup_printf("%s%s", self->name, self->name_suffix); - - return g_strdup(self->name); + return g_strdup_printf("%s%s%s", self->name_prefix ? : "", self->name, self->name_suffix ? : ""); } static gboolean diff --git a/lib/stats/stats-cluster-key-builder.h b/lib/stats/stats-cluster-key-builder.h index f9d76778fa..402bc7e18d 100644 --- a/lib/stats/stats-cluster-key-builder.h +++ b/lib/stats/stats-cluster-key-builder.h @@ -34,6 +34,7 @@ StatsClusterKeyBuilder *stats_cluster_key_builder_clone(const StatsClusterKeyBui void stats_cluster_key_builder_free(StatsClusterKeyBuilder *self); void stats_cluster_key_builder_set_name(StatsClusterKeyBuilder *self, const gchar *name); +void stats_cluster_key_builder_set_name_prefix(StatsClusterKeyBuilder *self, const gchar *name_prefix); void stats_cluster_key_builder_set_name_suffix(StatsClusterKeyBuilder *self, const gchar *name_suffix); void stats_cluster_key_builder_add_label(StatsClusterKeyBuilder *self, const StatsClusterLabel label); void stats_cluster_key_builder_set_unit(StatsClusterKeyBuilder *self, StatsClusterUnit unit); diff --git a/lib/stats/tests/test_stats_cluster_key_builder.c b/lib/stats/tests/test_stats_cluster_key_builder.c index 40a8dfa126..65805e38d9 100644 --- a/lib/stats/tests/test_stats_cluster_key_builder.c +++ b/lib/stats/tests/test_stats_cluster_key_builder.c @@ -156,8 +156,10 @@ _test_builder(KeyType type) const gchar *dummy_name = "dummy_name"; const gchar *dummy_name_2 = "dummy_name_2"; + const gchar *dummy_name_prefix = "dummy_name_prefix_"; + const gchar *dummy_name_2_with_prefix = "dummy_name_prefix_dummy_name_2"; const gchar *dummy_name_suffix = "_dummy_name_suffix"; - const gchar *dummy_name_2_with_suffix = "dummy_name_2_dummy_name_suffix"; + const gchar *dummy_name_2_with_prefix_and_suffix = "dummy_name_prefix_dummy_name_2_dummy_name_suffix"; guint16 dummy_legacy_component = 42; const gchar *dummy_legacy_id = "dummy_legacy_id"; @@ -190,9 +192,13 @@ _test_builder(KeyType type) stats_cluster_key_builder_set_name(builder, dummy_name_2); _assert_built_sc_key_equals(builder, type, dummy_name_2, two_labels, G_N_ELEMENTS(two_labels)); + /* Name prefix */ + stats_cluster_key_builder_set_name_prefix(builder, dummy_name_prefix); + _assert_built_sc_key_equals(builder, type, dummy_name_2_with_prefix, two_labels, G_N_ELEMENTS(two_labels)); + /* Name suffix */ stats_cluster_key_builder_set_name_suffix(builder, dummy_name_suffix); - _assert_built_sc_key_equals(builder, type, dummy_name_2_with_suffix, two_labels, G_N_ELEMENTS(two_labels)); + _assert_built_sc_key_equals(builder, type, dummy_name_2_with_prefix_and_suffix, two_labels, G_N_ELEMENTS(two_labels)); /* Unit */ stats_cluster_key_builder_set_unit(builder, SCU_NANOSECONDS); @@ -200,7 +206,7 @@ _test_builder(KeyType type) /* Legacy alias */ stats_cluster_key_builder_set_legacy_alias(builder, dummy_legacy_component, dummy_legacy_id, dummy_legacy_instance); - _assert_built_sc_key_equals_with_legacy(builder, type, dummy_name_2_with_suffix, two_labels, + _assert_built_sc_key_equals_with_legacy(builder, type, dummy_name_2_with_prefix_and_suffix, two_labels, G_N_ELEMENTS(two_labels), dummy_legacy_component, dummy_legacy_id, dummy_legacy_instance, NULL); @@ -209,7 +215,7 @@ _test_builder(KeyType type) { /* LOGPIPE does not support setting the legacy name */ stats_cluster_key_builder_set_legacy_alias_name(builder, dummy_legacy_name); - _assert_built_sc_key_equals_with_legacy(builder, type, dummy_name_2_with_suffix, two_labels, + _assert_built_sc_key_equals_with_legacy(builder, type, dummy_name_2_with_prefix_and_suffix, two_labels, G_N_ELEMENTS(two_labels), dummy_legacy_component, dummy_legacy_id, dummy_legacy_instance, dummy_legacy_name); } diff --git a/lib/tests/test_logqueue.c b/lib/tests/test_logqueue.c index 2a08e1fb7d..0c289fe94c 100644 --- a/lib/tests/test_logqueue.c +++ b/lib/tests/test_logqueue.c @@ -179,8 +179,10 @@ Test(logqueue, test_zero_diskbuf_and_normal_acks) gint i; StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); - q = log_queue_fifo_new(OVERFLOW_SIZE, NULL, STATS_LEVEL0, driver_sck_builder); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); + q = log_queue_fifo_new(OVERFLOW_SIZE, NULL, STATS_LEVEL0, driver_sck_builder, queue_sck_builder); stats_cluster_key_builder_free(driver_sck_builder); + stats_cluster_key_builder_free(queue_sck_builder); log_queue_set_use_backlog(q, TRUE); cr_assert_eq(atomic_gssize_racy_get(&q->metrics.shared.queued_messages->value), 0); @@ -213,7 +215,7 @@ Test(logqueue, test_zero_diskbuf_alternating_send_acks) LogQueue *q; gint i; - q = log_queue_fifo_new(OVERFLOW_SIZE, NULL, STATS_LEVEL0, NULL); + q = log_queue_fifo_new(OVERFLOW_SIZE, NULL, STATS_LEVEL0, NULL, NULL); log_queue_set_use_backlog(q, TRUE); fed_messages = 0; @@ -244,7 +246,7 @@ Test(logqueue, test_with_threads) for (i = 0; i < TEST_RUNS; i++) { fprintf(stderr, "starting testrun: %d\n", i); - q = log_queue_fifo_new(MESSAGES_SUM, NULL, STATS_LEVEL0, NULL); + q = log_queue_fifo_new(MESSAGES_SUM, NULL, STATS_LEVEL0, NULL, NULL); log_queue_set_use_backlog(q, TRUE); for (j = 0; j < FEEDERS; j++) @@ -272,8 +274,10 @@ Test(logqueue, test_with_threads) Test(logqueue, log_queue_fifo_rewind_all_and_memory_usage) { StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); - LogQueue *q = log_queue_fifo_new(OVERFLOW_SIZE, NULL, STATS_LEVEL0, driver_sck_builder); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); + LogQueue *q = log_queue_fifo_new(OVERFLOW_SIZE, NULL, STATS_LEVEL0, driver_sck_builder, queue_sck_builder); stats_cluster_key_builder_free(driver_sck_builder); + stats_cluster_key_builder_free(queue_sck_builder); log_queue_set_use_backlog(q, TRUE); feed_some_messages(q, 1); @@ -301,8 +305,10 @@ Test(logqueue, log_queue_fifo_should_drop_only_non_flow_controlled_messages, gint fifo_size = 5; StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); - LogQueue *q = log_queue_fifo_new(fifo_size, NULL, STATS_LEVEL0, driver_sck_builder); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); + LogQueue *q = log_queue_fifo_new(fifo_size, NULL, STATS_LEVEL0, driver_sck_builder, queue_sck_builder); stats_cluster_key_builder_free(driver_sck_builder); + stats_cluster_key_builder_free(queue_sck_builder); log_queue_set_use_backlog(q, TRUE); fed_messages = 0; @@ -365,8 +371,10 @@ Test(logqueue, log_queue_fifo_should_drop_only_non_flow_controlled_messages_thre main_loop_worker_finalize_thread_space(); StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); - LogQueue *q = log_queue_fifo_new(fifo_size, NULL, STATS_LEVEL0, driver_sck_builder); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); + LogQueue *q = log_queue_fifo_new(fifo_size, NULL, STATS_LEVEL0, driver_sck_builder, queue_sck_builder); stats_cluster_key_builder_free(driver_sck_builder); + stats_cluster_key_builder_free(queue_sck_builder); log_queue_set_use_backlog(q, TRUE); GThread *thread = g_thread_new(NULL, _flow_control_feed_thread, q); @@ -391,40 +399,52 @@ Test(logqueue, log_queue_fifo_multiple_queues) LogPathOptions options = LOG_PATH_OPTIONS_INIT; StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); - LogQueue *queue_1 = log_queue_fifo_new(fifo_size, NULL, STATS_LEVEL0, driver_sck_builder); - LogQueue *queue_2 = log_queue_fifo_new(fifo_size, NULL, STATS_LEVEL0, driver_sck_builder); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); + stats_cluster_key_builder_add_label(queue_sck_builder, stats_cluster_label("log_queue_fifo_multiple_queues", "1")); + LogQueue *queue_1 = log_queue_fifo_new(fifo_size, NULL, STATS_LEVEL0, driver_sck_builder, queue_sck_builder); + stats_cluster_key_builder_reset(queue_sck_builder); + stats_cluster_key_builder_add_label(queue_sck_builder, stats_cluster_label("log_queue_fifo_multiple_queues", "2")); + LogQueue *queue_2 = log_queue_fifo_new(fifo_size, NULL, STATS_LEVEL0, driver_sck_builder, queue_sck_builder); log_queue_set_use_backlog(queue_1, TRUE); log_queue_set_use_backlog(queue_2, TRUE); + cr_assert_eq(stats_counter_get(queue_1->metrics.shared.queued_messages), 0); + cr_assert_eq(stats_counter_get(queue_1->metrics.owned.queued_messages), 0); + cr_assert_eq(stats_counter_get(queue_2->metrics.shared.queued_messages), 0); + cr_assert_eq(stats_counter_get(queue_2->metrics.owned.queued_messages), 0); + + log_queue_push_tail(queue_1, log_msg_new_empty(), &options); cr_assert_eq(stats_counter_get(queue_1->metrics.shared.queued_messages), 1); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_1->metrics.owned.queued_messages), 1); + cr_assert_eq(stats_counter_get(queue_1->metrics.owned.queued_messages), 1); cr_assert_eq(stats_counter_get(queue_2->metrics.shared.queued_messages), 1); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_2->metrics.owned.queued_messages), 0); + cr_assert_eq(stats_counter_get(queue_2->metrics.owned.queued_messages), 0); log_queue_push_tail(queue_2, log_msg_new_empty(), &options); cr_assert_eq(stats_counter_get(queue_1->metrics.shared.queued_messages), 2); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_1->metrics.owned.queued_messages), 1); + cr_assert_eq(stats_counter_get(queue_1->metrics.owned.queued_messages), 1); cr_assert_eq(stats_counter_get(queue_2->metrics.shared.queued_messages), 2); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_2->metrics.owned.queued_messages), 1); + cr_assert_eq(stats_counter_get(queue_2->metrics.owned.queued_messages), 1); log_queue_unref(queue_1); cr_assert_eq(stats_counter_get(queue_2->metrics.shared.queued_messages), 1); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_2->metrics.owned.queued_messages), 1); + cr_assert_eq(stats_counter_get(queue_2->metrics.owned.queued_messages), 1); - queue_1 = log_queue_fifo_new(fifo_size, NULL, STATS_LEVEL0, driver_sck_builder); + stats_cluster_key_builder_reset(queue_sck_builder); + stats_cluster_key_builder_add_label(queue_sck_builder, stats_cluster_label("queue", "1")); + queue_1 = log_queue_fifo_new(fifo_size, NULL, STATS_LEVEL0, driver_sck_builder, queue_sck_builder); log_queue_set_use_backlog(queue_1, TRUE); cr_assert_eq(stats_counter_get(queue_1->metrics.shared.queued_messages), 1); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_1->metrics.owned.queued_messages), 0); + cr_assert_eq(stats_counter_get(queue_1->metrics.owned.queued_messages), 0); cr_assert_eq(stats_counter_get(queue_2->metrics.shared.queued_messages), 1); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_2->metrics.owned.queued_messages), 1); + cr_assert_eq(stats_counter_get(queue_2->metrics.owned.queued_messages), 1); log_queue_unref(queue_2); cr_assert_eq(stats_counter_get(queue_1->metrics.shared.queued_messages), 0); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_1->metrics.owned.queued_messages), 0); + cr_assert_eq(stats_counter_get(queue_1->metrics.owned.queued_messages), 0); log_queue_unref(queue_1); diff --git a/lib/tests/test_logwriter.c b/lib/tests/test_logwriter.c index 226b9a9d3b..0b5a69bcf3 100644 --- a/lib/tests/test_logwriter.c +++ b/lib/tests/test_logwriter.c @@ -173,7 +173,7 @@ _assert_logwriter_output(LogWriterTestCase c) opt.template = templ; } msg = init_msg(c.msg, c.is_rfc5424); - queue = log_queue_fifo_new(1000, NULL, STATS_LEVEL0, NULL); + queue = log_queue_fifo_new(1000, NULL, STATS_LEVEL0, NULL, NULL); writer = log_writer_new(c.writer_flags, configuration); log_writer_set_options(writer, NULL, &opt, NULL, NULL); diff --git a/modules/affile/affile-dest.c b/modules/affile/affile-dest.c index f3d6da889f..1d8064dfe9 100644 --- a/modules/affile/affile-dest.c +++ b/modules/affile/affile-dest.c @@ -199,13 +199,17 @@ affile_dw_init(LogPipe *s) self->filename); StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); log_writer_init_driver_sck_builder(self->writer, driver_sck_builder); + log_writer_init_queue_sck_builder(self->writer, queue_sck_builder); LogQueue *queue = log_dest_driver_acquire_queue(&self->owner->super, affile_dw_format_persist_name(self), - self->owner->writer_options.stats_level, driver_sck_builder); + self->owner->writer_options.stats_level, driver_sck_builder, + queue_sck_builder); log_writer_set_queue(self->writer, queue); stats_cluster_key_builder_free(driver_sck_builder); + stats_cluster_key_builder_free(queue_sck_builder); if (!log_pipe_init((LogPipe *) self->writer)) { diff --git a/modules/afprog/afprog.c b/modules/afprog/afprog.c index 1609726795..6a5efe9f61 100644 --- a/modules/afprog/afprog.c +++ b/modules/afprog/afprog.c @@ -536,12 +536,16 @@ afprogram_dd_init(LogPipe *s) self->process_info.cmdline->str); StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); log_writer_init_driver_sck_builder(self->writer, driver_sck_builder); + log_writer_init_queue_sck_builder(self->writer, queue_sck_builder); LogQueue *queue = log_dest_driver_acquire_queue(&self->super, afprogram_dd_format_queue_persist_name(self), - self->writer_options.stats_level, driver_sck_builder); + self->writer_options.stats_level, driver_sck_builder, + queue_sck_builder); log_writer_set_queue(self->writer, queue); + stats_cluster_key_builder_free(queue_sck_builder); stats_cluster_key_builder_free(driver_sck_builder); if (!log_pipe_init((LogPipe *) self->writer)) diff --git a/modules/afsocket/afsocket-dest.c b/modules/afsocket/afsocket-dest.c index efccd787a3..ac6141c28d 100644 --- a/modules/afsocket/afsocket-dest.c +++ b/modules/afsocket/afsocket-dest.c @@ -507,12 +507,16 @@ afsocket_dd_setup_writer(AFSocketDestDriver *self) afsocket_dd_stats_instance(self)); StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); log_writer_init_driver_sck_builder(self->writer, driver_sck_builder); + log_writer_init_queue_sck_builder(self->writer, queue_sck_builder); LogQueue *queue = log_dest_driver_acquire_queue(&self->super, afsocket_dd_format_qfile_name(self), - self->writer_options.stats_level, driver_sck_builder); + self->writer_options.stats_level, driver_sck_builder, + queue_sck_builder); log_writer_set_queue(self->writer, queue); + stats_cluster_key_builder_free(queue_sck_builder); stats_cluster_key_builder_free(driver_sck_builder); if (!log_pipe_init((LogPipe *) self->writer)) diff --git a/modules/diskq/diskq.c b/modules/diskq/diskq.c index 854cb4d398..e932a28ee9 100644 --- a/modules/diskq/diskq.c +++ b/modules/diskq/diskq.c @@ -53,12 +53,14 @@ log_queue_disk_is_file_in_directory(const gchar *file, const gchar *directory) static LogQueue * _create_disk_queue(DiskQDestPlugin *self, const gchar *filename, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder) + const StatsClusterKeyBuilder *driver_sck_builder, StatsClusterKeyBuilder *queue_sck_builder) { if (self->options.reliable) - return log_queue_disk_reliable_new(&self->options, filename, persist_name, stats_level, driver_sck_builder); + return log_queue_disk_reliable_new(&self->options, filename, persist_name, stats_level, driver_sck_builder, + queue_sck_builder); - return log_queue_disk_non_reliable_new(&self->options, filename, persist_name, stats_level, driver_sck_builder); + return log_queue_disk_non_reliable_new(&self->options, filename, persist_name, stats_level, driver_sck_builder, + queue_sck_builder); } static void @@ -75,14 +77,16 @@ _warn_if_dir_changed(const gchar *qfile_name, const gchar *dir) static LogQueue * _create_and_start_disk_queue_with_filename_from_persist(DiskQDestPlugin *self, const gchar *persist_qfile_name, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder) + const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder) { if (!persist_qfile_name) return FALSE; _warn_if_dir_changed(persist_qfile_name, self->options.dir); - LogQueue *queue = _create_disk_queue(self, persist_qfile_name, persist_name, stats_level, driver_sck_builder); + LogQueue *queue = _create_disk_queue(self, persist_qfile_name, persist_name, stats_level, driver_sck_builder, + queue_sck_builder); if (log_queue_disk_start(queue)) return queue; @@ -92,7 +96,7 @@ _create_and_start_disk_queue_with_filename_from_persist(DiskQDestPlugin *self, c if (!new_qfile_name) return NULL; - queue = _create_disk_queue(self, persist_qfile_name, persist_name, stats_level, driver_sck_builder); + queue = _create_disk_queue(self, persist_qfile_name, persist_name, stats_level, driver_sck_builder, queue_sck_builder); if (log_queue_disk_start(queue)) { msg_error("Error opening disk-queue file, a new one started", @@ -112,12 +116,14 @@ _create_and_start_disk_queue_with_filename_from_persist(DiskQDestPlugin *self, c static LogQueue * _create_and_start_disk_queue_with_new_filename(DiskQDestPlugin *self, const gchar *new_qfile_name, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder) + const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder) { if (!new_qfile_name) return NULL; - LogQueue *queue = _create_disk_queue(self, new_qfile_name, persist_name, stats_level, driver_sck_builder); + LogQueue *queue = _create_disk_queue(self, new_qfile_name, persist_name, stats_level, driver_sck_builder, + queue_sck_builder); if (log_queue_disk_start(queue)) return queue; @@ -129,7 +135,7 @@ _create_and_start_disk_queue_with_new_filename(DiskQDestPlugin *self, const gcha static LogQueue * _acquire_queue(LogDestDriver *dd, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder) + const StatsClusterKeyBuilder *driver_sck_builder, StatsClusterKeyBuilder *queue_sck_builder) { DiskQDestPlugin *self = log_driver_get_plugin(&dd->super, DiskQDestPlugin, DISKQ_PLUGIN_NAME); GlobalConfig *cfg = log_pipe_get_config(&dd->super.super); @@ -141,13 +147,13 @@ _acquire_queue(LogDestDriver *dd, const gchar *persist_name, gint stats_level, persist_qfile_name = persist_state_lookup_string(cfg->state, persist_name, NULL, NULL); queue = _create_and_start_disk_queue_with_filename_from_persist( - self, persist_qfile_name, persist_name, stats_level, driver_sck_builder); + self, persist_qfile_name, persist_name, stats_level, driver_sck_builder, queue_sck_builder); if (queue) goto exit; new_qfile_name = qdisk_get_next_filename(self->options.dir, self->options.reliable); queue = _create_and_start_disk_queue_with_new_filename(self, new_qfile_name, persist_name, stats_level, - driver_sck_builder); + driver_sck_builder, queue_sck_builder); exit: if (queue) diff --git a/modules/diskq/dqtool.c b/modules/diskq/dqtool.c index c764b82998..59aeb3b7a8 100644 --- a/modules/diskq/dqtool.c +++ b/modules/diskq/dqtool.c @@ -124,13 +124,13 @@ open_queue(char *filename, LogQueue **lq, DiskQueueOptions *options) if (options->reliable) { options->mem_buf_size = 1024 * 1024; - *lq = log_queue_disk_reliable_new(options, filename, NULL, STATS_LEVEL0, NULL); + *lq = log_queue_disk_reliable_new(options, filename, NULL, STATS_LEVEL0, NULL, NULL); } else { options->mem_buf_size = 128; options->qout_size = 1000; - *lq = log_queue_disk_non_reliable_new(options, filename, NULL, STATS_LEVEL0, NULL); + *lq = log_queue_disk_non_reliable_new(options, filename, NULL, STATS_LEVEL0, NULL, NULL); } if (!log_queue_disk_start(*lq)) diff --git a/modules/diskq/logqueue-disk-non-reliable.c b/modules/diskq/logqueue-disk-non-reliable.c index f4fb6754a6..b42961c1dd 100644 --- a/modules/diskq/logqueue-disk-non-reliable.c +++ b/modules/diskq/logqueue-disk-non-reliable.c @@ -564,12 +564,13 @@ _set_virtual_functions(LogQueueDiskNonReliable *self) LogQueue * log_queue_disk_non_reliable_new(DiskQueueOptions *options, const gchar *filename, const gchar *persist_name, - gint stats_level, const StatsClusterKeyBuilder *driver_sck_builder) + gint stats_level, const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder) { g_assert(options->reliable == FALSE); LogQueueDiskNonReliable *self = g_new0(LogQueueDiskNonReliable, 1); log_queue_disk_init_instance(&self->super, options, "SLQF", filename, persist_name, stats_level, - driver_sck_builder); + driver_sck_builder, queue_sck_builder); self->qbacklog = g_queue_new(); self->qout = g_queue_new(); self->qoverflow = g_queue_new(); diff --git a/modules/diskq/logqueue-disk-non-reliable.h b/modules/diskq/logqueue-disk-non-reliable.h index f4287cd13a..bde7944df4 100644 --- a/modules/diskq/logqueue-disk-non-reliable.h +++ b/modules/diskq/logqueue-disk-non-reliable.h @@ -37,6 +37,7 @@ typedef struct _LogQueueDiskNonReliable } LogQueueDiskNonReliable; LogQueue *log_queue_disk_non_reliable_new(DiskQueueOptions *options, const gchar *filename, const gchar *persist_name, - gint stats_level, const StatsClusterKeyBuilder *driver_sck_builder); + gint stats_level, const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder); #endif /* LOG_QUEUE_DISK_NON_RELIABLE_H_ */ diff --git a/modules/diskq/logqueue-disk-reliable.c b/modules/diskq/logqueue-disk-reliable.c index 41b3d660fd..350ddc3d56 100644 --- a/modules/diskq/logqueue-disk-reliable.c +++ b/modules/diskq/logqueue-disk-reliable.c @@ -470,12 +470,13 @@ _set_virtual_functions(LogQueueDiskReliable *self) LogQueue * log_queue_disk_reliable_new(DiskQueueOptions *options, const gchar *filename, const gchar *persist_name, - gint stats_level, const StatsClusterKeyBuilder *driver_sck_builder) + gint stats_level, const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder) { g_assert(options->reliable == TRUE); LogQueueDiskReliable *self = g_new0(LogQueueDiskReliable, 1); log_queue_disk_init_instance(&self->super, options, "SLRQ", filename, persist_name, stats_level, - driver_sck_builder); + driver_sck_builder, queue_sck_builder); if (options->mem_buf_size < 0) { options->mem_buf_size = PESSIMISTIC_MEM_BUF_SIZE; diff --git a/modules/diskq/logqueue-disk-reliable.h b/modules/diskq/logqueue-disk-reliable.h index b521bc459d..82494fa172 100644 --- a/modules/diskq/logqueue-disk-reliable.h +++ b/modules/diskq/logqueue-disk-reliable.h @@ -36,6 +36,7 @@ typedef struct _LogQueueDiskReliable } LogQueueDiskReliable; LogQueue *log_queue_disk_reliable_new(DiskQueueOptions *options, const gchar *filename, const gchar *persist_name, - gint stats_level, const StatsClusterKeyBuilder *driver_sck_builder); + gint stats_level, const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder); #endif /* LOGQUEUE_DISK_RELIABLE_H_ */ diff --git a/modules/diskq/logqueue-disk.c b/modules/diskq/logqueue-disk.c index cfa0c41ba2..ea67737600 100644 --- a/modules/diskq/logqueue-disk.c +++ b/modules/diskq/logqueue-disk.c @@ -232,9 +232,18 @@ log_queue_disk_restart_corrupted(LogQueueDisk *self) void log_queue_disk_init_instance(LogQueueDisk *self, DiskQueueOptions *options, const gchar *qdisk_file_id, const gchar *filename, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder) + const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder) { - log_queue_init_instance(&self->super, persist_name, stats_level, driver_sck_builder); + if (queue_sck_builder) + { + stats_cluster_key_builder_set_name_prefix(queue_sck_builder, "disk_queue_"); + stats_cluster_key_builder_add_label(queue_sck_builder, stats_cluster_label("path", filename)); + stats_cluster_key_builder_add_label(queue_sck_builder, + stats_cluster_label("reliable", options->reliable ? "true" : "false")); + } + + log_queue_init_instance(&self->super, persist_name, stats_level, driver_sck_builder, queue_sck_builder); self->super.type = log_queue_disk_type; self->compaction = options->compaction; diff --git a/modules/diskq/logqueue-disk.h b/modules/diskq/logqueue-disk.h index 18ff99451a..6c4c080665 100644 --- a/modules/diskq/logqueue-disk.h +++ b/modules/diskq/logqueue-disk.h @@ -52,7 +52,8 @@ gboolean log_queue_disk_stop(LogQueue *self, gboolean *persistent); gboolean log_queue_disk_start(LogQueue *self); void log_queue_disk_init_instance(LogQueueDisk *self, DiskQueueOptions *options, const gchar *qdisk_file_id, const gchar *filename, const gchar *persist_name, gint stats_level, - const StatsClusterKeyBuilder *driver_sck_builder); + const StatsClusterKeyBuilder *driver_sck_builder, + StatsClusterKeyBuilder *queue_sck_builder); void log_queue_disk_restart_corrupted(LogQueueDisk *self); void log_queue_disk_free_method(LogQueueDisk *self); diff --git a/modules/diskq/tests/test_diskq.c b/modules/diskq/tests/test_diskq.c index e097a18f8a..7010c3712f 100644 --- a/modules/diskq/tests/test_diskq.c +++ b/modules/diskq/tests/test_diskq.c @@ -65,7 +65,7 @@ Test(diskq, testcase_zero_diskbuf_and_normal_acks) filename = g_string_sized_new(32); g_string_printf(filename, "test-normal_acks.qf"); unlink(filename->str); - q = log_queue_disk_reliable_new(&options, filename->str, NULL, STATS_LEVEL0, NULL); + q = log_queue_disk_reliable_new(&options, filename->str, NULL, STATS_LEVEL0, NULL, NULL); log_queue_set_use_backlog(q, TRUE); log_queue_disk_start(q); @@ -100,7 +100,7 @@ Test(diskq, testcase_zero_diskbuf_alternating_send_acks) filename = g_string_sized_new(32); g_string_printf(filename, "test-send_acks.qf"); unlink(filename->str); - q = log_queue_disk_reliable_new(&options, filename->str, NULL, STATS_LEVEL0, NULL); + q = log_queue_disk_reliable_new(&options, filename->str, NULL, STATS_LEVEL0, NULL, NULL); log_queue_set_use_backlog(q, TRUE); log_queue_disk_start(q); @@ -138,8 +138,10 @@ Test(diskq, testcase_ack_and_rewind_messages) g_string_printf(filename, "test-rewind_and_acks.qf"); unlink(filename->str); StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); - q = log_queue_disk_reliable_new(&options, filename->str, NULL, STATS_LEVEL0, driver_sck_builder); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); + q = log_queue_disk_reliable_new(&options, filename->str, NULL, STATS_LEVEL0, driver_sck_builder, queue_sck_builder); stats_cluster_key_builder_free(driver_sck_builder); + stats_cluster_key_builder_free(queue_sck_builder); log_queue_set_use_backlog(q, TRUE); cr_assert_eq(stats_counter_get(q->metrics.shared.queued_messages), 0, "queued messages: %d", __LINE__); @@ -286,7 +288,7 @@ Test(diskq, testcase_with_threads) filename = g_string_sized_new(32); g_string_printf(filename, "test-%04d.qf", i); unlink(filename->str); - q = log_queue_disk_reliable_new(&options, filename->str, NULL, STATS_LEVEL0, NULL); + q = log_queue_disk_reliable_new(&options, filename->str, NULL, STATS_LEVEL0, NULL, NULL); log_queue_disk_start(q); for (j = 0; j < FEEDERS; j++) @@ -323,9 +325,9 @@ static LogQueue * queue_new(gboolean reliable, DiskQueueOptions *options, const gchar *filename, const gchar *persist_name) { if (reliable) - return log_queue_disk_reliable_new(options, filename, persist_name, STATS_LEVEL0, NULL); + return log_queue_disk_reliable_new(options, filename, persist_name, STATS_LEVEL0, NULL, NULL); - return log_queue_disk_non_reliable_new(options, filename, persist_name, STATS_LEVEL0, NULL); + return log_queue_disk_non_reliable_new(options, filename, persist_name, STATS_LEVEL0, NULL, NULL); } ParameterizedTestParameters(diskq, testcase_diskbuffer_restart_corrupted) @@ -442,11 +444,15 @@ testcase_diskq_prepare(DiskQueueOptions *options, diskq_tester_parameters_t *par options->qout_size = parameters->qout_size; StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); if (parameters->reliable) - q = log_queue_disk_reliable_new(options, parameters->filename, NULL, STATS_LEVEL0, driver_sck_builder); + q = log_queue_disk_reliable_new(options, parameters->filename, NULL, STATS_LEVEL0, driver_sck_builder, + queue_sck_builder); else - q = log_queue_disk_non_reliable_new(options, parameters->filename, NULL, STATS_LEVEL0, driver_sck_builder); + q = log_queue_disk_non_reliable_new(options, parameters->filename, NULL, STATS_LEVEL0, driver_sck_builder, + queue_sck_builder); stats_cluster_key_builder_free(driver_sck_builder); + stats_cluster_key_builder_free(queue_sck_builder); cr_assert_eq(stats_counter_get(q->metrics.shared.queued_messages), 0, "queued messages: line: %d", __LINE__); cr_assert_eq(stats_counter_get(q->metrics.shared.memory_usage), 0, "memory_usage: line: %d", __LINE__); @@ -548,7 +554,7 @@ Test(diskq, test_no_next_filename_in_acquire) cr_assert(log_driver_add_plugin(&driver->super, (LogDriverPlugin *) plugin)); cr_assert(log_pipe_init(&driver->super.super)); - cr_assert_eq(log_dest_driver_acquire_queue(driver, queue_persist_name, STATS_LEVEL0, NULL), NULL); + cr_assert_eq(log_dest_driver_acquire_queue(driver, queue_persist_name, STATS_LEVEL0, NULL, NULL), NULL); cr_assert(log_pipe_deinit(&driver->super.super)); cr_assert(log_pipe_unref(&driver->super.super)); diff --git a/modules/diskq/tests/test_diskq_full.c b/modules/diskq/tests/test_diskq_full.c index 6dbca34dd0..1843daf391 100644 --- a/modules/diskq/tests/test_diskq_full.c +++ b/modules/diskq/tests/test_diskq_full.c @@ -56,18 +56,22 @@ test_diskq_become_full(gboolean reliable, const gchar *filename) const gchar *persist_name = "test_diskq"; StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); options.reliable = reliable; if (reliable) { _construct_options(&options, 1000, 1000, reliable); - q = log_queue_disk_reliable_new(&options, filename, persist_name, STATS_LEVEL0, driver_sck_builder); + q = log_queue_disk_reliable_new(&options, filename, persist_name, STATS_LEVEL0, driver_sck_builder, + queue_sck_builder); } else { _construct_options(&options, 1000, 0, reliable); - q = log_queue_disk_non_reliable_new(&options, filename, persist_name, STATS_LEVEL0, driver_sck_builder); + q = log_queue_disk_non_reliable_new(&options, filename, persist_name, STATS_LEVEL0, driver_sck_builder, + queue_sck_builder); } stats_cluster_key_builder_free(driver_sck_builder); + stats_cluster_key_builder_free(queue_sck_builder); log_queue_set_use_backlog(q, TRUE); diff --git a/modules/diskq/tests/test_diskq_truncate.c b/modules/diskq/tests/test_diskq_truncate.c index 4fd101d5de..9387f37b9d 100644 --- a/modules/diskq/tests/test_diskq_truncate.c +++ b/modules/diskq/tests/test_diskq_truncate.c @@ -43,7 +43,7 @@ static LogQueue * _get_non_reliable_diskqueue(gchar *filename, DiskQueueOptions *options) { - LogQueue *q = log_queue_disk_non_reliable_new(options, filename, NULL, STATS_LEVEL0, NULL); + LogQueue *q = log_queue_disk_non_reliable_new(options, filename, NULL, STATS_LEVEL0, NULL, NULL); log_queue_set_use_backlog(q, FALSE); log_queue_disk_start(q); return q; @@ -242,7 +242,7 @@ _create_reliable_diskqueue(gchar *filename, DiskQueueOptions *options, gboolean truncate_size_ratio = disk_queue_config_get_truncate_size_ratio(configuration); options->truncate_size_ratio = truncate_size_ratio; - q = log_queue_disk_reliable_new(options, filename, "persist-name", STATS_LEVEL0, NULL); + q = log_queue_disk_reliable_new(options, filename, "persist-name", STATS_LEVEL0, NULL, NULL); log_queue_set_use_backlog(q, use_backlog); log_queue_disk_start(q); return q; diff --git a/modules/diskq/tests/test_logqueue_disk.c b/modules/diskq/tests/test_logqueue_disk.c index 57a586f72f..b131094ced 100644 --- a/modules/diskq/tests/test_logqueue_disk.c +++ b/modules/diskq/tests/test_logqueue_disk.c @@ -79,8 +79,8 @@ _assert_log_queue_disk_reliable_is_empty(LogQueue *q) cr_assert_eq(stats_counter_get(q->metrics.shared.memory_usage), 0); cr_assert_eq(stats_counter_get(q->metrics.shared.queued_messages), 0); - cr_assert_eq(atomic_gssize_get_unsigned(&q->metrics.owned.memory_usage), 0); - cr_assert_eq(atomic_gssize_get_unsigned(&q->metrics.owned.queued_messages), 0); + cr_assert_eq(stats_counter_get(q->metrics.owned.memory_usage), 0); + cr_assert_eq(stats_counter_get(q->metrics.owned.queued_messages), 0); } Test(logqueue_disk, restart_corrupted_reliable) @@ -100,9 +100,11 @@ Test(logqueue_disk, restart_corrupted_reliable) disk_queue_options_qout_size_set(&options, 2); StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); LogQueue *queue = log_queue_disk_reliable_new(&options, filename, "restart_corrupted_reliable", STATS_LEVEL0, - driver_sck_builder); + driver_sck_builder, queue_sck_builder); stats_cluster_key_builder_free(driver_sck_builder); + stats_cluster_key_builder_free(queue_sck_builder); cr_assert(log_queue_disk_start(queue)); cr_assert_str_eq(log_queue_disk_get_filename(queue), filename); @@ -166,8 +168,8 @@ _assert_log_queue_disk_non_reliable_is_empty(LogQueue *q) cr_assert_eq(stats_counter_get(q->metrics.shared.memory_usage), 0); cr_assert_eq(stats_counter_get(q->metrics.shared.queued_messages), 0); - cr_assert_eq(atomic_gssize_get_unsigned(&q->metrics.owned.memory_usage), 0); - cr_assert_eq(atomic_gssize_get_unsigned(&q->metrics.owned.queued_messages), 0); + cr_assert_eq(stats_counter_get(q->metrics.owned.memory_usage), 0); + cr_assert_eq(stats_counter_get(q->metrics.owned.queued_messages), 0); } Test(logqueue_disk, restart_corrupted_non_reliable) @@ -187,9 +189,11 @@ Test(logqueue_disk, restart_corrupted_non_reliable) disk_queue_options_qout_size_set(&options, 0); StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); LogQueue *queue = log_queue_disk_non_reliable_new(&options, filename, "restart_corrupted_non_reliable", STATS_LEVEL0, - driver_sck_builder); + driver_sck_builder, queue_sck_builder); stats_cluster_key_builder_free(driver_sck_builder); + stats_cluster_key_builder_free(queue_sck_builder); cr_assert(log_queue_disk_start(queue)); cr_assert_str_eq(log_queue_disk_get_filename(queue), filename); @@ -258,8 +262,8 @@ _assert_log_queue_disk_non_reliable_has_messages_in_qout(LogQueue *q, guint num_ cr_assert_eq(stats_counter_get(q->metrics.shared.memory_usage), num_of_messages * log_msg_size); cr_assert_eq(stats_counter_get(q->metrics.shared.queued_messages), num_of_messages); - cr_assert_eq(atomic_gssize_get_unsigned(&q->metrics.owned.memory_usage), num_of_messages * log_msg_size); - cr_assert_eq(atomic_gssize_get_unsigned(&q->metrics.owned.queued_messages), num_of_messages); + cr_assert_eq(stats_counter_get(q->metrics.owned.memory_usage), num_of_messages * log_msg_size); + cr_assert_eq(stats_counter_get(q->metrics.owned.queued_messages), num_of_messages); } Test(logqueue_disk, restart_corrupted_non_reliable_with_qout) @@ -277,8 +281,9 @@ Test(logqueue_disk, restart_corrupted_non_reliable_with_qout) disk_queue_options_qout_size_set(&options, 1); StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); LogQueue *queue = log_queue_disk_non_reliable_new(&options, filename, "restart_corrupted_non_reliable_with_qout", - STATS_LEVEL0, driver_sck_builder); + STATS_LEVEL0, driver_sck_builder, queue_sck_builder); LogQueueDiskNonReliable *queue_disk_non_reliable = (LogQueueDiskNonReliable *) queue; cr_assert(log_queue_disk_start(queue)); @@ -303,13 +308,15 @@ Test(logqueue_disk, restart_corrupted_non_reliable_with_qout) gboolean persistent; log_queue_disk_stop(queue, &persistent); log_queue_unref(queue); + stats_cluster_key_builder_reset(queue_sck_builder); queue = log_queue_disk_non_reliable_new(&options, filename, "restart_corrupted_non_reliable_with_qout", - STATS_LEVEL0, driver_sck_builder); + STATS_LEVEL0, driver_sck_builder, queue_sck_builder); cr_assert(log_queue_disk_start(queue)); _assert_log_queue_disk_non_reliable_has_messages_in_qout(queue, 1); stats_cluster_key_builder_free(driver_sck_builder); + stats_cluster_key_builder_free(queue_sck_builder); log_queue_disk_stop(queue, &persistent); log_queue_unref(queue); disk_queue_options_destroy(&options); @@ -335,10 +342,12 @@ Test(logqueue_disk, restart_corrupted_with_multiple_queues) disk_queue_options_mem_buf_size_set(&options, 4096); StatsClusterKeyBuilder *driver_sck_builder = stats_cluster_key_builder_new(); + StatsClusterKeyBuilder *queue_sck_builder = stats_cluster_key_builder_new(); LogQueue *queue_1 = log_queue_disk_reliable_new(&options, filename_1, "restart_corrupted_with_multiple_queues_1", - STATS_LEVEL0, driver_sck_builder); + STATS_LEVEL0, driver_sck_builder, queue_sck_builder); + stats_cluster_key_builder_reset(queue_sck_builder); LogQueue *queue_2 = log_queue_disk_reliable_new(&options, filename_2, "restart_corrupted_with_multiple_queues_2", - STATS_LEVEL0, driver_sck_builder); + STATS_LEVEL0, driver_sck_builder, queue_sck_builder); cr_assert(log_queue_disk_start(queue_1)); cr_assert(log_queue_disk_start(queue_2)); @@ -347,46 +356,48 @@ Test(logqueue_disk, restart_corrupted_with_multiple_queues) log_queue_push_tail(queue_1, log_msg_new_empty(), &path_options); cr_assert_eq(stats_counter_get(queue_1->metrics.shared.queued_messages), 1); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_1->metrics.owned.queued_messages), 1); + cr_assert_eq(stats_counter_get(queue_1->metrics.owned.queued_messages), 1); cr_assert_eq(stats_counter_get(queue_2->metrics.shared.queued_messages), 1); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_2->metrics.owned.queued_messages), 0); + cr_assert_eq(stats_counter_get(queue_2->metrics.owned.queued_messages), 0); log_queue_push_tail(queue_2, log_msg_new_empty(), &path_options); cr_assert_eq(stats_counter_get(queue_1->metrics.shared.queued_messages), 2); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_1->metrics.owned.queued_messages), 1); + cr_assert_eq(stats_counter_get(queue_1->metrics.owned.queued_messages), 1); cr_assert_eq(stats_counter_get(queue_2->metrics.shared.queued_messages), 2); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_2->metrics.owned.queued_messages), 1); + cr_assert_eq(stats_counter_get(queue_2->metrics.owned.queued_messages), 1); gboolean persistent; log_queue_disk_stop(queue_1, &persistent); log_queue_unref(queue_1); cr_assert_eq(stats_counter_get(queue_2->metrics.shared.queued_messages), 1); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_2->metrics.owned.queued_messages), 1); + cr_assert_eq(stats_counter_get(queue_2->metrics.owned.queued_messages), 1); + stats_cluster_key_builder_reset(queue_sck_builder); queue_1 = log_queue_disk_reliable_new(&options, filename_1, "restart_corrupted_with_multiple_queues_1", - STATS_LEVEL0, driver_sck_builder); + STATS_LEVEL0, driver_sck_builder, queue_sck_builder); cr_assert(log_queue_disk_start(queue_1)); cr_assert_eq(stats_counter_get(queue_1->metrics.shared.queued_messages), 2); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_1->metrics.owned.queued_messages), 1); + cr_assert_eq(stats_counter_get(queue_1->metrics.owned.queued_messages), 1); cr_assert_eq(stats_counter_get(queue_2->metrics.shared.queued_messages), 2); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_2->metrics.owned.queued_messages), 1); + cr_assert_eq(stats_counter_get(queue_2->metrics.owned.queued_messages), 1); log_queue_disk_restart_corrupted((LogQueueDisk *) queue_1); cr_assert_eq(stats_counter_get(queue_1->metrics.shared.queued_messages), 1); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_1->metrics.owned.queued_messages), 0); + cr_assert_eq(stats_counter_get(queue_1->metrics.owned.queued_messages), 0); cr_assert_eq(stats_counter_get(queue_2->metrics.shared.queued_messages), 1); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_2->metrics.owned.queued_messages), 1); + cr_assert_eq(stats_counter_get(queue_2->metrics.owned.queued_messages), 1); log_queue_disk_restart_corrupted((LogQueueDisk *) queue_2); cr_assert_eq(stats_counter_get(queue_1->metrics.shared.queued_messages), 0); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_1->metrics.owned.queued_messages), 0); + cr_assert_eq(stats_counter_get(queue_1->metrics.owned.queued_messages), 0); cr_assert_eq(stats_counter_get(queue_2->metrics.shared.queued_messages), 0); - cr_assert_eq(atomic_gssize_get_unsigned(&queue_2->metrics.owned.queued_messages), 0); + cr_assert_eq(stats_counter_get(queue_2->metrics.owned.queued_messages), 0); stats_cluster_key_builder_free(driver_sck_builder); + stats_cluster_key_builder_free(queue_sck_builder); log_queue_disk_stop(queue_1, &persistent); log_queue_disk_stop(queue_2, &persistent); log_queue_unref(queue_1); diff --git a/modules/diskq/tests/test_reliable_backlog.c b/modules/diskq/tests/test_reliable_backlog.c index bc275f77cb..63b2d5d75b 100644 --- a/modules/diskq/tests/test_reliable_backlog.c +++ b/modules/diskq/tests/test_reliable_backlog.c @@ -58,7 +58,7 @@ _init_diskq_for_test(const gchar *filename, gint64 size, gint64 membuf_size) LogQueueDiskReliable *dq; _construct_options(&options, size, membuf_size, TRUE); - LogQueue *q = log_queue_disk_reliable_new(&options, filename, NULL, STATS_LEVEL0, NULL); + LogQueue *q = log_queue_disk_reliable_new(&options, filename, NULL, STATS_LEVEL0, NULL, NULL); struct stat st; num_of_ack = 0; unlink(filename); diff --git a/modules/examples/sources/threaded-diskq-source/threaded-diskq-source.c b/modules/examples/sources/threaded-diskq-source/threaded-diskq-source.c index e206fb90ad..c1cd5112d2 100644 --- a/modules/examples/sources/threaded-diskq-source/threaded-diskq-source.c +++ b/modules/examples/sources/threaded-diskq-source/threaded-diskq-source.c @@ -42,6 +42,7 @@ struct ThreadedDiskqSourceDriver struct stat diskq_file_stat; gboolean waiting_for_file_change; + StatsClusterKeyBuilder *queue_sck_builder; gchar *filename; }; @@ -75,13 +76,15 @@ _load_queue(ThreadedDiskqSourceDriver *self) if (self->diskq_options.reliable) { self->diskq_options.mem_buf_size = 1024 * 1024; - self->queue = log_queue_disk_reliable_new(&self->diskq_options, self->filename, NULL, STATS_LEVEL0, NULL); + self->queue = log_queue_disk_reliable_new(&self->diskq_options, self->filename, NULL, STATS_LEVEL0, NULL, + self->queue_sck_builder); } else { self->diskq_options.mem_buf_size = 128; self->diskq_options.qout_size = 1000; - self->queue = log_queue_disk_non_reliable_new(&self->diskq_options, self->filename, NULL, STATS_LEVEL0, NULL); + self->queue = log_queue_disk_non_reliable_new(&self->diskq_options, self->filename, NULL, STATS_LEVEL0, NULL, + self->queue_sck_builder); } if (!log_queue_disk_start(self->queue)) @@ -168,6 +171,20 @@ _fetch(LogThreadedFetcherDriver *s) return result; } +static const gchar * +_format_stats_instance(LogThreadedSourceDriver *s) +{ + ThreadedDiskqSourceDriver *self = (ThreadedDiskqSourceDriver *) s; + static gchar persist_name[1024]; + + if (s->super.super.super.persist_name) + g_snprintf(persist_name, sizeof(persist_name), "diskq-source,%s", s->super.super.super.persist_name); + else + g_snprintf(persist_name, sizeof(persist_name), "diskq-source,%s", self->filename); + + return persist_name; +} + static gboolean _init(LogPipe *s) { @@ -179,6 +196,12 @@ _init(LogPipe *s) return FALSE; } + stats_cluster_key_builder_reset(self->queue_sck_builder); + stats_cluster_key_builder_add_label(self->queue_sck_builder, + stats_cluster_label("id", self->super.super.super.super.id ? : "")); + stats_cluster_key_builder_add_label(self->queue_sck_builder, stats_cluster_label("driver_instance", + _format_stats_instance(&self->super.super))); + return log_threaded_fetcher_driver_init_method(s); } @@ -187,25 +210,12 @@ _free(LogPipe *s) { ThreadedDiskqSourceDriver *self = (ThreadedDiskqSourceDriver *) s; + stats_cluster_key_builder_free(self->queue_sck_builder); g_free(self->filename); log_threaded_fetcher_driver_free_method(s); } -static const gchar * -_format_stats_instance(LogThreadedSourceDriver *s) -{ - ThreadedDiskqSourceDriver *self = (ThreadedDiskqSourceDriver *) s; - static gchar persist_name[1024]; - - if (s->super.super.super.persist_name) - g_snprintf(persist_name, sizeof(persist_name), "diskq-source,%s", s->super.super.super.persist_name); - else - g_snprintf(persist_name, sizeof(persist_name), "diskq-source,%s", self->filename); - - return persist_name; -} - void threaded_diskq_sd_set_file(LogDriver *s, const gchar *filename) { @@ -223,6 +233,8 @@ threaded_diskq_sd_new(GlobalConfig *cfg) disk_queue_options_set_default_options(&self->diskq_options); + self->queue_sck_builder = stats_cluster_key_builder_new(); + self->super.connect = _open_diskq; self->super.disconnect = _close_diskq; self->super.fetch = _fetch; diff --git a/news/feature-4392.md b/news/feature-4392.md new file mode 100644 index 0000000000..494ae56a37 --- /dev/null +++ b/news/feature-4392.md @@ -0,0 +1,29 @@ +destination: Introduced queue metrics. + + * The corresponding driver is identified with the "id" and "driver_instance" labels. + * Available counters are "memory_usage_bytes" and "events". + * Memory queue metrics are available with "syslogng_memory_queue_" prefix, + `disk-buffer` metrics are available with "syslogng_disk_queue_" prefix. + * `disk-buffer` metrics have an additional "path" label, pointing to the location of the disk-buffer file + and a "reliable" label, which can be either "true" or "false". + * Threaded destinations, like `http`, `python`, etc have an additional "worker" label. + +Example metrics +``` +syslogng_disk_queue_events{driver_instance="http,http://localhost:1239",id="d_http_disk_buffer#0",path="/var/syslog-ng/syslog-ng-00000.rqf",reliable="true",worker="0"} 80 +syslogng_disk_queue_events{driver_instance="http,http://localhost:1239",id="d_http_disk_buffer#0",path="/var/syslog-ng/syslog-ng-00001.rqf",reliable="true",worker="1"} 7 +syslogng_disk_queue_events{driver_instance="http,http://localhost:1239",id="d_http_disk_buffer#0",path="/var/syslog-ng/syslog-ng-00002.rqf",reliable="true",worker="2"} 7 +syslogng_disk_queue_events{driver_instance="http,http://localhost:1239",id="d_http_disk_buffer#0",path="/var/syslog-ng/syslog-ng-00003.rqf",reliable="true",worker="3"} 7 +syslogng_disk_queue_events{driver_instance="tcp,localhost:1235",id="d_network_disk_buffer#0",path="/var/syslog-ng/syslog-ng-00000.qf",reliable="false"} 101 +syslogng_disk_queue_memory_usage_bytes{driver_instance="http,http://localhost:1239",id="d_http_disk_buffer#0",path="/var/syslog-ng/syslog-ng-00000.rqf",reliable="true",worker="0"} 3136 +syslogng_disk_queue_memory_usage_bytes{driver_instance="http,http://localhost:1239",id="d_http_disk_buffer#0",path="/var/syslog-ng/syslog-ng-00001.rqf",reliable="true",worker="1"} 2776 +syslogng_disk_queue_memory_usage_bytes{driver_instance="http,http://localhost:1239",id="d_http_disk_buffer#0",path="/var/syslog-ng/syslog-ng-00002.rqf",reliable="true",worker="2"} 2760 +syslogng_disk_queue_memory_usage_bytes{driver_instance="http,http://localhost:1239",id="d_http_disk_buffer#0",path="/var/syslog-ng/syslog-ng-00003.rqf",reliable="true",worker="3"} 2776 +syslogng_disk_queue_memory_usage_bytes{driver_instance="tcp,localhost:1235",id="d_network_disk_buffer#0",path="/var/syslog-ng/syslog-ng-00000.qf",reliable="false"} 39888 +syslogng_memory_queue_events{driver_instance="http,http://localhost:1236",id="d_http#0",worker="0"} 15 +syslogng_memory_queue_events{driver_instance="http,http://localhost:1236",id="d_http#0",worker="1"} 14 +syslogng_memory_queue_events{driver_instance="tcp,localhost:1234",id="d_network#0"} 29 +syslogng_memory_queue_memory_usage_bytes{driver_instance="http,http://localhost:1236",id="d_http#0",worker="0"} 5896 +syslogng_memory_queue_memory_usage_bytes{driver_instance="http,http://localhost:1236",id="d_http#0",worker="1"} 5552 +syslogng_memory_queue_memory_usage_bytes{driver_instance="tcp,localhost:1234",id="d_network#0"} 11448 +```