From d4157c0345f9bf2c8b67fdf7158f4988e2a628e9 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Thu, 22 Sep 2022 21:33:51 +0200 Subject: [PATCH 1/2] NETOBSERV-579 unifying metrics - Allow to prefix operational metrics (it introduces a global settings for metrics, which will probably be extended later) - Unify metrics usage across stages: e.g. channel size was tracked only in netflow ingester; now they share a common struct that generate metrics - Adding new gauges for tracking channel sizes - Some renaming - Updated metrics doc --- README.md | 3 +- cmd/confgenerator/main.go | 1 + cmd/flowlogs-pipeline/main.go | 8 +- cmd/operationalmetricstodoc/main.go | 10 +- docs/api.md | 2 +- docs/operational-metrics.md | 80 ++++--- pkg/confgen/confgen_test.go | 1 + pkg/confgen/config.go | 1 + pkg/config/config.go | 42 +++- pkg/operational/{health => }/health.go | 15 +- pkg/operational/metrics.go | 208 ++++++++++++++++++ pkg/operational/metrics/metrics.go | 91 -------- pkg/pipeline/aggregate_prom_test.go | 3 +- pkg/pipeline/encode/encode_kafka.go | 18 +- pkg/pipeline/encode/encode_kafka_test.go | 12 +- pkg/pipeline/encode/encode_prom.go | 87 ++++---- pkg/pipeline/encode/encode_prom_test.go | 4 +- pkg/pipeline/extract/conntrack/conntrack.go | 22 +- .../extract/conntrack/conntrack_test.go | 30 ++- pkg/pipeline/extract/conntrack/metrics.go | 52 ++--- pkg/pipeline/extract/conntrack/store.go | 12 +- .../health => pipeline}/health_test.go | 14 +- pkg/pipeline/ingest/ingest_collector.go | 29 +-- pkg/pipeline/ingest/ingest_collector_test.go | 15 +- pkg/pipeline/ingest/ingest_grpc.go | 14 +- pkg/pipeline/ingest/ingest_kafka.go | 18 +- pkg/pipeline/ingest/ingest_kafka_test.go | 13 +- pkg/pipeline/ingest/metrics.go | 26 +++ pkg/pipeline/pipeline.go | 28 +-- pkg/pipeline/pipeline_builder.go | 76 ++++--- pkg/pipeline/pipeline_test.go | 5 +- pkg/pipeline/write/metrics.go | 18 ++ pkg/pipeline/write/write_loki.go | 16 +- pkg/pipeline/write/write_loki_test.go | 15 +- pkg/test/utils.go | 8 + 35 files changed, 636 insertions(+), 361 deletions(-) rename pkg/operational/{health => }/health.go (75%) create mode 100644 pkg/operational/metrics.go delete mode 100644 pkg/operational/metrics/metrics.go rename pkg/{operational/health => pipeline}/health_test.go (77%) create mode 100644 pkg/pipeline/ingest/metrics.go create mode 100644 pkg/pipeline/write/metrics.go diff --git a/README.md b/README.md index 487cbc327..a2546bd5b 100644 --- a/README.md +++ b/README.md @@ -51,8 +51,7 @@ Flags: -h, --help help for flowlogs-pipeline --log-level string Log level: debug, info, warning, error (default "error") --parameters string json of config file parameters field - --pipeline string json of config file pipeline field - --profile.port int Go pprof tool port (default: disabled) + --pipeline string json of config file pipeline field ``` diff --git a/cmd/confgenerator/main.go b/cmd/confgenerator/main.go index ca2f2c87c..264a5193b 100644 --- a/cmd/confgenerator/main.go +++ b/cmd/confgenerator/main.go @@ -135,6 +135,7 @@ func initFlags() { rootCmd.PersistentFlags().StringVar(&opts.DestGrafanaJsonnetFolder, "destGrafanaJsonnetFolder", "/tmp/jsonnet", "destination grafana jsonnet folder") rootCmd.PersistentFlags().StringSliceVar(&opts.SkipWithTags, "skipWithTags", nil, "Skip definitions with Tags") rootCmd.PersistentFlags().StringSliceVar(&opts.GenerateStages, "generateStages", nil, "Produce only specified stages (ingest, transform_generic, transform_network, extract_aggregate, encode_prom, write_loki") + rootCmd.PersistentFlags().StringVar(&opts.GlobalMetricsPrefix, "globalMetricsPrefix", "", "Common prefix for all generated metrics, including operational ones") } func main() { diff --git a/cmd/flowlogs-pipeline/main.go b/cmd/flowlogs-pipeline/main.go index 9c6de32dc..d07e4c748 100644 --- a/cmd/flowlogs-pipeline/main.go +++ b/cmd/flowlogs-pipeline/main.go @@ -30,7 +30,7 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/config" - "github.com/netobserv/flowlogs-pipeline/pkg/operational/health" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" log "github.com/sirupsen/logrus" @@ -44,7 +44,7 @@ var ( BuildDate string cfgFile string logLevel string - envPrefix = "FLOWLOGS-PIPILNE" + envPrefix = "FLOWLOGS-PIPELINE" defaultLogFileName = ".flowlogs-pipeline" opts config.Options ) @@ -141,6 +141,7 @@ func initFlags() { rootCmd.PersistentFlags().IntVar(&opts.Profile.Port, "profile.port", 0, "Go pprof tool port (default: disabled)") rootCmd.PersistentFlags().StringVar(&opts.PipeLine, "pipeline", "", "json of config file pipeline field") rootCmd.PersistentFlags().StringVar(&opts.Parameters, "parameters", "", "json of config file parameters field") + rootCmd.PersistentFlags().StringVar(&opts.MetricsSettings, "metrics-settings", "", "json for global metrics settings") } func main() { @@ -191,7 +192,7 @@ func run() { } // Start health report server - health.NewHealthServer(&opts, mainPipeline) + operational.NewHealthServer(&opts, mainPipeline.IsAlive, mainPipeline.IsReady) // Starts the flows pipeline mainPipeline.Run() @@ -200,5 +201,4 @@ func run() { time.Sleep(time.Second) log.Debugf("exiting main run") os.Exit(0) - } diff --git a/cmd/operationalmetricstodoc/main.go b/cmd/operationalmetricstodoc/main.go index bb10d1d72..407c7f071 100644 --- a/cmd/operationalmetricstodoc/main.go +++ b/cmd/operationalmetricstodoc/main.go @@ -20,7 +20,7 @@ package main import ( "fmt" - operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline" ) @@ -32,15 +32,13 @@ func main() { header := ` > Note: this file was automatically generated, to update execute "make docs" - + # flowlogs-pipeline Operational Metrics - + Each table below provides documentation for an exported flowlogs-pipeline operational metric. - - ` - doc := operationalMetrics.GetDocumentation() + doc := operational.GetDocumentation() data := fmt.Sprintf("%s\n%s\n", header, doc) fmt.Printf("%s", data) } diff --git a/docs/api.md b/docs/api.md index bf0b92e9e..0701b0c07 100644 --- a/docs/api.md +++ b/docs/api.md @@ -73,7 +73,7 @@ Following is the supported API format for the kafka ingest: json: JSON decoder protobuf: Protobuf decoder batchMaxLen: the number of accumulated flows before being forwarded for processing - pullBatchLen: the capacity of the queue use to store pulled flows + pullQueueCapacity: the capacity of the queue use to store pulled flows pullMaxBytes: the maximum number of bytes being pulled from kafka commitInterval: the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously. tls: TLS client configuration (optional) diff --git a/docs/operational-metrics.md b/docs/operational-metrics.md index 679d64893..161042ee9 100644 --- a/docs/operational-metrics.md +++ b/docs/operational-metrics.md @@ -1,19 +1,34 @@ > Note: this file was automatically generated, to update execute "make docs" - + # flowlogs-pipeline Operational Metrics - + Each table below provides documentation for an exported flowlogs-pipeline operational metric. - - -### encode_prom_metrics_processed -| **Name** | encode_prom_metrics_processed | +### conntrack_input_records +| **Name** | conntrack_input_records | |:---|:---| -| **Description** | Number of metrics processed | +| **Description** | The total number of input records per classification. | +| **Type** | counter | +| **Labels** | classification | + + +### conntrack_memory_connections +| **Name** | conntrack_memory_connections | +|:---|:---| +| **Description** | The total number of tracked connections in memory. | +| **Type** | gauge | +| **Labels** | | + + +### conntrack_output_records +| **Name** | conntrack_output_records | +|:---|:---| +| **Description** | The total number of output records. | | **Type** | counter | +| **Labels** | type | ### encode_prom_errors @@ -21,47 +36,54 @@ Each table below provides documentation for an exported flowlogs-pipeline operat |:---|:---| | **Description** | Total errors during metrics generation | | **Type** | counter | +| **Labels** | error, metric, key | -### conntrack_memory_connections -| **Name** | conntrack_memory_connections | +### ingest_flows_processed +| **Name** | ingest_flows_processed | |:---|:---| -| **Description** | The total number of tracked connections in memory. | -| **Type** | gauge | +| **Description** | Number of flow logs processed by the ingester | +| **Type** | counter | +| **Labels** | stage | -### conntrack_input_records -| **Name** | conntrack_input_records | +### metrics_processed +| **Name** | metrics_processed | |:---|:---| -| **Description** | The total number of input records per classification. | +| **Description** | Number of metrics processed | | **Type** | counter | +| **Labels** | stage | -### conntrack_output_records -| **Name** | conntrack_output_records | +### records_written +| **Name** | records_written | |:---|:---| -| **Description** | The total number of output records. | +| **Description** | Number of output records written | | **Type** | counter | +| **Labels** | stage | -### ingest_collector_queue_length -| **Name** | ingest_collector_queue_length | +### stage_duration_ms +| **Name** | stage_duration_ms | |:---|:---| -| **Description** | Queue length | -| **Type** | gauge | +| **Description** | Pipeline stage duration in milliseconds | +| **Type** | histogram | +| **Labels** | stage | -### ingest_collector_flow_logs_processed -| **Name** | ingest_collector_flow_logs_processed | +### stage_in_queue_size +| **Name** | stage_in_queue_size | |:---|:---| -| **Description** | Number of log lines (flow logs) processed | -| **Type** | counter | +| **Description** | Pipeline stage input queue size (number of elements in queue) | +| **Type** | gauge | +| **Labels** | stage | -### loki_records_written -| **Name** | loki_records_written | +### stage_out_queue_size +| **Name** | stage_out_queue_size | |:---|:---| -| **Description** | Number of records written to loki | -| **Type** | counter | +| **Description** | Pipeline stage output queue size (number of elements in queue) | +| **Type** | gauge | +| **Labels** | stage | diff --git a/pkg/confgen/confgen_test.go b/pkg/confgen/confgen_test.go index 0bae7c39e..534705623 100644 --- a/pkg/confgen/confgen_test.go +++ b/pkg/confgen/confgen_test.go @@ -83,6 +83,7 @@ func Test_RunShortConfGen(t *testing.T) { DestConfFile: configOut, DestDocFile: docOut, DestGrafanaJsonnetFolder: jsonnetOut, + GlobalMetricsPrefix: "flp_", }) err = os.WriteFile(filepath.Join(dirPath, configFileName), []byte(test.ConfgenShortConfig), 0644) require.NoError(t, err) diff --git a/pkg/confgen/config.go b/pkg/confgen/config.go index c0efb230f..f03420af3 100644 --- a/pkg/confgen/config.go +++ b/pkg/confgen/config.go @@ -34,6 +34,7 @@ type Options struct { SrcFolder string SkipWithTags []string GenerateStages []string + GlobalMetricsPrefix string } type ConfigVisualization struct { diff --git a/pkg/config/config.go b/pkg/config/config.go index 27e693768..0d0757b58 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -26,16 +26,18 @@ import ( ) type Options struct { - PipeLine string - Parameters string - Health Health - Profile Profile + PipeLine string + Parameters string + MetricsSettings string + Health Health + Profile Profile } type ConfigFileStruct struct { - LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"` - Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"` - Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"` + LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"` + Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"` + Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"` + MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"` } type Health struct { @@ -46,6 +48,18 @@ type Profile struct { Port int } +// MetricsSettings is similar to api.PromEncode, but is global to the application, ie. it also works with operational metrics. +// Also, currently FLP doesn't support defining more than one PromEncode stage. If this feature is added later, these global settings +// will help configuring common setting for all PromEncode stages - PromEncode settings would then act as overrides. +type MetricsSettings struct { + // TODO: manage global metrics server, ie. not coupled to PromEncode, cf https://github.com/netobserv/flowlogs-pipeline/issues/302 + // Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"port number to expose \"/metrics\" endpoint"` + // TLS *PromTLSConf `yaml:"tls,omitempty" json:"tls,omitempty" doc:"TLS configuration for the prometheus endpoint"` + // ExpiryTime int `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"seconds of no-flow to wait before deleting prometheus data item"` + Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty"` + NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"` +} + type Stage struct { Name string `yaml:"name" json:"name"` Follows string `yaml:"follows,omitempty" json:"follows,omitempty"` @@ -108,17 +122,27 @@ func ParseConfig(opts Options) (ConfigFileStruct, error) { logrus.Debugf("opts.PipeLine = %v ", opts.PipeLine) err := JsonUnmarshalStrict([]byte(opts.PipeLine), &out.Pipeline) if err != nil { - logrus.Errorf("error when reading config file: %v", err) + logrus.Errorf("error when parsing pipeline: %v", err) return out, err } logrus.Debugf("stages = %v ", out.Pipeline) err = JsonUnmarshalStrict([]byte(opts.Parameters), &out.Parameters) if err != nil { - logrus.Errorf("error when reading config file: %v", err) + logrus.Errorf("error when parsing pipeline parameters: %v", err) return out, err } logrus.Debugf("params = %v ", out.Parameters) + + if opts.MetricsSettings != "" { + err = JsonUnmarshalStrict([]byte(opts.MetricsSettings), &out.MetricsSettings) + if err != nil { + logrus.Errorf("error when parsing global metrics settings: %v", err) + return out, err + } + logrus.Debugf("metrics settings = %v ", out.MetricsSettings) + } + return out, nil } diff --git a/pkg/operational/health/health.go b/pkg/operational/health.go similarity index 75% rename from pkg/operational/health/health.go rename to pkg/operational/health.go index 9be24360f..18895b7fc 100644 --- a/pkg/operational/health/health.go +++ b/pkg/operational/health.go @@ -15,7 +15,7 @@ * */ -package health +package operational import ( "net" @@ -24,7 +24,6 @@ import ( "github.com/heptiolabs/healthcheck" "github.com/netobserv/flowlogs-pipeline/pkg/config" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline" log "github.com/sirupsen/logrus" ) @@ -32,28 +31,28 @@ const defaultServerHost = "0.0.0.0" type Server struct { handler healthcheck.Handler - address string + Address string } func (hs *Server) Serve() { for { - err := http.ListenAndServe(hs.address, hs.handler) + err := http.ListenAndServe(hs.Address, hs.handler) log.Errorf("http.ListenAndServe error %v", err) time.Sleep(60 * time.Second) } } -func NewHealthServer(opts *config.Options, pipeline *pipeline.Pipeline) *Server { +func NewHealthServer(opts *config.Options, isAlive healthcheck.Check, isReady healthcheck.Check) *Server { handler := healthcheck.NewHandler() address := net.JoinHostPort(defaultServerHost, opts.Health.Port) - handler.AddLivenessCheck("PipelineCheck", pipeline.IsAlive()) - handler.AddReadinessCheck("PipelineCheck", pipeline.IsReady()) + handler.AddLivenessCheck("PipelineCheck", isAlive) + handler.AddReadinessCheck("PipelineCheck", isReady) server := &Server{ handler: handler, - address: address, + Address: address, } go server.Serve() diff --git a/pkg/operational/metrics.go b/pkg/operational/metrics.go new file mode 100644 index 000000000..f3fe22f3f --- /dev/null +++ b/pkg/operational/metrics.go @@ -0,0 +1,208 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package operational + +import ( + "fmt" + "sort" + "strings" + + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" +) + +type MetricDefinition struct { + Name string + Help string + Type metricType + Labels []string +} + +type metricType string + +const TypeCounter = "counter" +const TypeGauge = "gauge" +const TypeHistogram = "histogram" + +var allMetrics = []MetricDefinition{} + +func DefineMetric(name, help string, t metricType, labels ...string) MetricDefinition { + def := MetricDefinition{ + Name: name, + Help: help, + Type: t, + Labels: labels, + } + allMetrics = append(allMetrics, def) + return def +} + +var ( + ingestFlowsProcessed = DefineMetric( + "ingest_flows_processed", + "Number of flow logs processed by the ingester", + TypeCounter, + "stage", + ) + recordsWritten = DefineMetric( + "records_written", + "Number of output records written", + TypeCounter, + "stage", + ) + stageInQueueSize = DefineMetric( + "stage_in_queue_size", + "Pipeline stage input queue size (number of elements in queue)", + TypeGauge, + "stage", + ) + stageOutQueueSize = DefineMetric( + "stage_out_queue_size", + "Pipeline stage output queue size (number of elements in queue)", + TypeGauge, + "stage", + ) +) + +func (def *MetricDefinition) mapLabels(labels []string) prometheus.Labels { + if len(labels) != len(def.Labels) { + logrus.Errorf("Could not map labels, length differ in def %s [%v / %v]", def.Name, def.Labels, labels) + } + labelsMap := prometheus.Labels{} + for i, label := range labels { + labelsMap[def.Labels[i]] = label + } + return labelsMap +} + +type Metrics struct { + settings *config.MetricsSettings +} + +func NewMetrics(settings *config.MetricsSettings) *Metrics { + return &Metrics{settings: settings} +} + +// register will register against the default registry. May panic or not depending on settings +func (o *Metrics) register(c prometheus.Collector, name string) { + err := prometheus.DefaultRegisterer.Register(c) + if err != nil { + if o.settings.NoPanic { + logrus.Errorf("metrics registration error [%s]: %v", name, err) + } else { + logrus.Panicf("metrics registration error [%s]: %v", name, err) + } + } +} + +func (o *Metrics) NewCounter(def *MetricDefinition, labels ...string) prometheus.Counter { + fullName := o.settings.Prefix + def.Name + c := prometheus.NewCounter(prometheus.CounterOpts{ + Name: fullName, + Help: def.Help, + ConstLabels: def.mapLabels(labels), + }) + o.register(c, fullName) + return c +} + +func (o *Metrics) NewCounterVec(def *MetricDefinition) *prometheus.CounterVec { + fullName := o.settings.Prefix + def.Name + c := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: fullName, + Help: def.Help, + }, def.Labels) + o.register(c, fullName) + return c +} + +func (o *Metrics) NewGauge(def *MetricDefinition, labels ...string) prometheus.Gauge { + fullName := o.settings.Prefix + def.Name + c := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: fullName, + Help: def.Help, + ConstLabels: def.mapLabels(labels), + }) + o.register(c, fullName) + return c +} + +func (o *Metrics) NewGaugeFunc(def *MetricDefinition, f func() float64, labels ...string) { + fullName := o.settings.Prefix + def.Name + c := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: fullName, + Help: def.Help, + ConstLabels: def.mapLabels(labels), + }, f) + o.register(c, fullName) +} + +func (o *Metrics) NewHistogramVec(def *MetricDefinition, buckets []float64) *prometheus.HistogramVec { + fullName := o.settings.Prefix + def.Name + c := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: fullName, + Help: def.Help, + Buckets: buckets, + }, def.Labels) + o.register(c, fullName) + return c +} + +func (o *Metrics) CreateFlowsProcessedCounter(stage string) prometheus.Counter { + return o.NewCounter(&ingestFlowsProcessed, stage) +} + +func (o *Metrics) CreateRecordsWrittenCounter(stage string) prometheus.Counter { + return o.NewCounter(&recordsWritten, stage) +} + +func (o *Metrics) CreateInQueueSizeGauge(stage string, f func() int) { + o.NewGaugeFunc(&stageInQueueSize, func() float64 { return float64(f()) }, stage) +} + +func (o *Metrics) CreateOutQueueSizeGauge(stage string, f func() int) { + o.NewGaugeFunc(&stageOutQueueSize, func() float64 { return float64(f()) }, stage) +} + +func GetDocumentation() string { + doc := "" + sort.Slice(allMetrics, func(i, j int) bool { + return allMetrics[i].Name < allMetrics[j].Name + }) + for _, opts := range allMetrics { + doc += fmt.Sprintf( + ` +### %s +| **Name** | %s | +|:---|:---| +| **Description** | %s | +| **Type** | %s | +| **Labels** | %s | + +`, + opts.Name, + opts.Name, + opts.Help, + opts.Type, + strings.Join(opts.Labels, ", "), + ) + } + + return doc +} diff --git a/pkg/operational/metrics/metrics.go b/pkg/operational/metrics/metrics.go deleted file mode 100644 index e3c230f80..000000000 --- a/pkg/operational/metrics/metrics.go +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (C) 2022 IBM, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package operationalMetrics - -import ( - "fmt" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -type metricDefinition struct { - Name string - Help string - Type string -} - -var metricsOpts []metricDefinition - -func NewCounter(opts prometheus.CounterOpts) prometheus.Counter { - metricsOpts = append(metricsOpts, metricDefinition{ - Name: opts.Name, - Help: opts.Help, - Type: "counter", - }) - return promauto.NewCounter(opts) -} - -func NewCounterVec(opts prometheus.CounterOpts, labels []string) *prometheus.CounterVec { - metricsOpts = append(metricsOpts, metricDefinition{ - Name: opts.Name, - Help: opts.Help, - Type: "counter", - }) - return promauto.NewCounterVec(opts, labels) -} - -func NewGauge(opts prometheus.GaugeOpts) prometheus.Gauge { - metricsOpts = append(metricsOpts, metricDefinition{ - Name: opts.Name, - Help: opts.Help, - Type: "gauge", - }) - return promauto.NewGauge(opts) -} - -func NewHistogramVec(opts prometheus.HistogramOpts, labels []string) *prometheus.HistogramVec { - metricsOpts = append(metricsOpts, metricDefinition{ - Name: opts.Name, - Help: opts.Help, - Type: "histogram", - }) - return promauto.NewHistogramVec(opts, labels) -} - -func GetDocumentation() string { - doc := "" - for _, opts := range metricsOpts { - doc += fmt.Sprintf( - ` -### %s -| **Name** | %s | -|:---|:---| -| **Description** | %s | -| **Type** | %s | - -`, - opts.Name, - opts.Name, - opts.Help, - opts.Type, - ) - } - - return doc -} diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index a1a45c4c5..e6bb45306 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" @@ -96,7 +97,7 @@ parameters: extractAggregate, err := extract.NewExtractAggregate(cfg.Parameters[0]) require.NoError(t, err) - promEncode, err := encode.NewEncodeProm(cfg.Parameters[1]) + promEncode, err := encode.NewEncodeProm(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[1]) require.Equal(t, err, nil) // Test cases diff --git a/pkg/pipeline/encode/encode_kafka.go b/pkg/pipeline/encode/encode_kafka.go index fd12d6a5f..1f1b7a6ab 100644 --- a/pkg/pipeline/encode/encode_kafka.go +++ b/pkg/pipeline/encode/encode_kafka.go @@ -23,6 +23,8 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/prometheus/client_golang/prometheus" "github.com/segmentio/kafka-go" kafkago "github.com/segmentio/kafka-go" log "github.com/sirupsen/logrus" @@ -39,9 +41,9 @@ type kafkaWriteMessage interface { } type encodeKafka struct { - kafkaParams api.EncodeKafka - kafkaWriter kafkaWriteMessage - prevRecords []config.GenericMap + kafkaParams api.EncodeKafka + kafkaWriter kafkaWriteMessage + recordsWritten prometheus.Counter } // Encode writes entries to kafka topic @@ -60,12 +62,13 @@ func (r *encodeKafka) Encode(in []config.GenericMap) { err := r.kafkaWriter.WriteMessages(context.Background(), msgs...) if err != nil { log.Errorf("encodeKafka error: %v", err) + } else { + r.recordsWritten.Add(float64(len(msgs))) } - r.prevRecords = in } // NewEncodeKafka create a new writer to kafka -func NewEncodeKafka(params config.StageParam) (Encoder, error) { +func NewEncodeKafka(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) { log.Debugf("entering NewEncodeKafka") config := api.EncodeKafka{} if params.Encode != nil && params.Encode.Kafka != nil { @@ -124,7 +127,8 @@ func NewEncodeKafka(params config.StageParam) (Encoder, error) { } return &encodeKafka{ - kafkaParams: config, - kafkaWriter: &kafkaWriter, + kafkaParams: config, + kafkaWriter: &kafkaWriter, + recordsWritten: opMetrics.CreateRecordsWrittenCounter(params.Name), }, nil } diff --git a/pkg/pipeline/encode/encode_kafka_test.go b/pkg/pipeline/encode/encode_kafka_test.go index 215642ee2..abec8eb6e 100644 --- a/pkg/pipeline/encode/encode_kafka_test.go +++ b/pkg/pipeline/encode/encode_kafka_test.go @@ -23,6 +23,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/test" kafkago "github.com/segmentio/kafka-go" "github.com/stretchr/testify/mock" @@ -58,7 +59,7 @@ func initNewEncodeKafka(t *testing.T) Encoder { v, cfg := test.InitConfig(t, testKafkaConfig) require.NotNil(t, v) - newEncode, err := NewEncodeKafka(cfg.Parameters[0]) + newEncode, err := NewEncodeKafka(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0]) require.NoError(t, err) return newEncode } @@ -93,18 +94,20 @@ func Test_EncodeKafka(t *testing.T) { } func Test_TLSConfigEmpty(t *testing.T) { + test.ResetPromRegistry() pipeline := config.NewCollectorPipeline("ingest", api.IngestCollector{}) pipeline.EncodeKafka("encode-kafka", api.EncodeKafka{ Address: "any", Topic: "topic", }) - newEncode, err := NewEncodeKafka(pipeline.GetStageParams()[1]) + newEncode, err := NewEncodeKafka(operational.NewMetrics(&config.MetricsSettings{}), pipeline.GetStageParams()[1]) require.NoError(t, err) tlsConfig := newEncode.(*encodeKafka).kafkaWriter.(*kafkago.Writer).Transport.(*kafkago.Transport).TLS require.Nil(t, tlsConfig) } func Test_TLSConfigCA(t *testing.T) { + test.ResetPromRegistry() ca, cleanup := test.CreateCACert(t) defer cleanup() pipeline := config.NewCollectorPipeline("ingest", api.IngestCollector{}) @@ -115,7 +118,7 @@ func Test_TLSConfigCA(t *testing.T) { CACertPath: ca, }, }) - newEncode, err := NewEncodeKafka(pipeline.GetStageParams()[1]) + newEncode, err := NewEncodeKafka(operational.NewMetrics(&config.MetricsSettings{}), pipeline.GetStageParams()[1]) require.NoError(t, err) tlsConfig := newEncode.(*encodeKafka).kafkaWriter.(*kafkago.Writer).Transport.(*kafkago.Transport).TLS @@ -125,6 +128,7 @@ func Test_TLSConfigCA(t *testing.T) { } func Test_MutualTLSConfig(t *testing.T) { + test.ResetPromRegistry() ca, user, userKey, cleanup := test.CreateAllCerts(t) defer cleanup() pipeline := config.NewCollectorPipeline("ingest", api.IngestCollector{}) @@ -137,7 +141,7 @@ func Test_MutualTLSConfig(t *testing.T) { UserKeyPath: userKey, }, }) - newEncode, err := NewEncodeKafka(pipeline.GetStageParams()[1]) + newEncode, err := NewEncodeKafka(operational.NewMetrics(&config.MetricsSettings{}), pipeline.GetStageParams()[1]) require.NoError(t, err) tlsConfig := newEncode.(*encodeKafka).kafkaWriter.(*kafkago.Writer).Transport.(*kafkago.Transport).TLS diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 6e884a9c3..b88d2383a 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -28,7 +28,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" - operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -53,26 +53,33 @@ type histoInfo struct { } type EncodeProm struct { - gauges []gaugeInfo - counters []counterInfo - histos []histoInfo - aggHistos []histoInfo - expiryTime int64 - mCache *utils.TimedCache - exitChan <-chan struct{} - server *http.Server - tlsConfig *api.PromTLSConf + gauges []gaugeInfo + counters []counterInfo + histos []histoInfo + aggHistos []histoInfo + expiryTime int64 + mCache *utils.TimedCache + exitChan <-chan struct{} + server *http.Server + tlsConfig *api.PromTLSConf + metricsProcessed prometheus.Counter + errorsCounter *prometheus.CounterVec } -var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{ - Name: "encode_prom_metrics_processed", - Help: "Number of metrics processed", -}) - -var errorsCounter = operationalMetrics.NewCounterVec(prometheus.CounterOpts{ - Name: "encode_prom_errors", - Help: "Total errors during metrics generation", -}, []string{"error", "metric", "key"}) +var ( + metricsProcessed = operational.DefineMetric( + "metrics_processed", + "Number of metrics processed", + operational.TypeCounter, + "stage", + ) + encodePromErrors = operational.DefineMetric( + "encode_prom_errors", + "Total errors during metrics generation", + operational.TypeCounter, + "error", "metric", "key", + ) +) // Encode encodes a metric before being stored func (e *EncodeProm) Encode(metrics []config.GenericMap) { @@ -95,11 +102,11 @@ func (e *EncodeProm) EncodeMetric(metricRecord config.GenericMap) { m, err := mInfo.counter.GetMetricWith(labels) if err != nil { log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() continue } m.Add(value) - metricsProcessed.Inc() + e.metricsProcessed.Inc() } // Process gauges @@ -111,11 +118,11 @@ func (e *EncodeProm) EncodeMetric(metricRecord config.GenericMap) { m, err := mInfo.gauge.GetMetricWith(labels) if err != nil { log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() continue } m.Set(value) - metricsProcessed.Inc() + e.metricsProcessed.Inc() } // Process histograms @@ -127,11 +134,11 @@ func (e *EncodeProm) EncodeMetric(metricRecord config.GenericMap) { m, err := mInfo.histo.GetMetricWith(labels) if err != nil { log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() continue } m.Observe(value) - metricsProcessed.Inc() + e.metricsProcessed.Inc() } // Process pre-aggregated histograms @@ -143,13 +150,13 @@ func (e *EncodeProm) EncodeMetric(metricRecord config.GenericMap) { m, err := mInfo.histo.GetMetricWith(labels) if err != nil { log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() continue } for _, v := range values { m.Observe(v) } - metricsProcessed.Inc() + e.metricsProcessed.Inc() } } @@ -160,7 +167,7 @@ func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *api.PromMetrics } floatVal, err := utils.ConvertToFloat64(val) if err != nil { - errorsCounter.WithLabelValues("ValueConversionError", info.Name, info.ValueKey).Inc() + e.errorsCounter.WithLabelValues("ValueConversionError", info.Name, info.ValueKey).Inc() return nil, 0 } @@ -177,7 +184,7 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.PromMetri } values, ok := val.([]float64) if !ok { - errorsCounter.WithLabelValues("HistoValueConversionError", info.Name, info.ValueKey).Inc() + e.errorsCounter.WithLabelValues("HistoValueConversionError", info.Name, info.ValueKey).Inc() return nil, nil } @@ -201,7 +208,7 @@ func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *api.PromM } val, found := flow[info.ValueKey] if !found { - errorsCounter.WithLabelValues("RecordKeyMissing", info.Name, info.ValueKey).Inc() + e.errorsCounter.WithLabelValues("RecordKeyMissing", info.Name, info.ValueKey).Inc() return nil } return val @@ -265,7 +272,7 @@ func (e *EncodeProm) closeServer(ctx context.Context) error { return e.server.Shutdown(ctx) } -func NewEncodeProm(params config.StageParam) (Encoder, error) { +func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) { cfg := api.PromEncode{} if params.Encode != nil && params.Encode.Prom != nil { cfg = *params.Encode.Prom @@ -356,14 +363,16 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) { MinVersion: tls.VersionTLS12, }, }, - tlsConfig: cfg.TLS, - counters: counters, - gauges: gauges, - histos: histos, - aggHistos: aggHistos, - expiryTime: expiryTime, - mCache: utils.NewTimedCache(), - exitChan: utils.ExitChannel(), + tlsConfig: cfg.TLS, + counters: counters, + gauges: gauges, + histos: histos, + aggHistos: aggHistos, + expiryTime: expiryTime, + mCache: utils.NewTimedCache(), + exitChan: utils.ExitChannel(), + metricsProcessed: opMetrics.NewCounter(&metricsProcessed, params.Name), + errorsCounter: opMetrics.NewCounterVec(&encodePromErrors), } go w.startServer() go w.cleanupExpiredEntriesLoop() diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index 5ba6fd842..1da6b978e 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -28,6 +28,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -80,7 +81,8 @@ func initPromWithServer(params *api.PromEncode) (*EncodeProm, func(), error) { prometheus.DefaultRegisterer = reg prometheus.DefaultGatherer = reg http.DefaultServeMux = http.NewServeMux() - enc, err := NewEncodeProm(config.StageParam{Encode: &config.Encode{Prom: params}}) + opMetrics := operational.NewMetrics(&config.MetricsSettings{}) + enc, err := NewEncodeProm(opMetrics, config.StageParam{Encode: &config.Encode{Prom: params}}) if err != nil { return nil, nil, err } diff --git a/pkg/pipeline/extract/conntrack/conntrack.go b/pkg/pipeline/extract/conntrack/conntrack.go index 09e5c660b..68875ce08 100644 --- a/pkg/pipeline/extract/conntrack/conntrack.go +++ b/pkg/pipeline/extract/conntrack/conntrack.go @@ -26,6 +26,7 @@ import ( "github.com/benbjohnson/clock" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract" log "github.com/sirupsen/logrus" ) @@ -50,6 +51,7 @@ type conntrackImpl struct { shouldOutputNewConnection bool shouldOutputEndConnection bool shouldOutputUpdateConnection bool + metrics *metricsType } func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericMap { @@ -60,7 +62,7 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM computedHash, err := ComputeHash(fl, ct.config.KeyDefinition, ct.hashProvider()) if err != nil { log.Warningf("skipping flow log %v: %v", fl, err) - metrics.inputRecords.WithLabelValues("rejected").Inc() + ct.metrics.inputRecords.WithLabelValues("rejected").Inc() continue } conn, exists := ct.connStore.getConnection(computedHash.hashTotal) @@ -74,17 +76,17 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM Build() ct.connStore.addConnection(computedHash.hashTotal, conn) ct.updateConnection(conn, fl, computedHash) - metrics.inputRecords.WithLabelValues("newConnection").Inc() + ct.metrics.inputRecords.WithLabelValues("newConnection").Inc() if ct.shouldOutputNewConnection { record := conn.toGenericMap() addHashField(record, computedHash.hashTotal) addTypeField(record, api.ConnTrackOutputRecordTypeName("NewConnection")) outputRecords = append(outputRecords, record) - metrics.outputRecords.WithLabelValues("newConnection").Inc() + ct.metrics.outputRecords.WithLabelValues("newConnection").Inc() } } else { ct.updateConnection(conn, fl, computedHash) - metrics.inputRecords.WithLabelValues("update").Inc() + ct.metrics.inputRecords.WithLabelValues("update").Inc() } if ct.shouldOutputFlowLogs { @@ -92,20 +94,20 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM addHashField(record, computedHash.hashTotal) addTypeField(record, api.ConnTrackOutputRecordTypeName("FlowLog")) outputRecords = append(outputRecords, record) - metrics.outputRecords.WithLabelValues("flowLog").Inc() + ct.metrics.outputRecords.WithLabelValues("flowLog").Inc() } } endConnectionRecords := ct.popEndConnections() if ct.shouldOutputEndConnection { outputRecords = append(outputRecords, endConnectionRecords...) - metrics.outputRecords.WithLabelValues("endConnection").Add(float64(len(endConnectionRecords))) + ct.metrics.outputRecords.WithLabelValues("endConnection").Add(float64(len(endConnectionRecords))) } if ct.shouldOutputUpdateConnection { updateConnectionRecords := ct.prepareUpdateConnectionRecords() outputRecords = append(outputRecords, updateConnectionRecords...) - metrics.outputRecords.WithLabelValues("updateConnection").Add(float64(len(updateConnectionRecords))) + ct.metrics.outputRecords.WithLabelValues("updateConnection").Add(float64(len(updateConnectionRecords))) } return outputRecords @@ -178,7 +180,7 @@ func (ct *conntrackImpl) getFlowLogDirection(conn connection, flowLogHash totalH } // NewConnectionTrack creates a new connection track instance -func NewConnectionTrack(params config.StageParam, clock clock.Clock) (extract.Extractor, error) { +func NewConnectionTrack(opMetrics *operational.Metrics, params config.StageParam, clock clock.Clock) (extract.Extractor, error) { cfg := params.Extract.ConnTrack if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("ConnectionTrack config is invalid: %w", err) @@ -211,9 +213,10 @@ func NewConnectionTrack(params config.StageParam, clock clock.Clock) (extract.Ex } } + metrics := newMetrics(opMetrics) conntrack := &conntrackImpl{ clock: clock, - connStore: newConnectionStore(), + connStore: newConnectionStore(metrics), config: cfg, hashProvider: fnv.New64a, aggregators: aggregators, @@ -221,6 +224,7 @@ func NewConnectionTrack(params config.StageParam, clock clock.Clock) (extract.Ex shouldOutputNewConnection: shouldOutputNewConnection, shouldOutputEndConnection: shouldOutputEndConnection, shouldOutputUpdateConnection: shouldOutputUpdateConnection, + metrics: metrics, } return conntrack, nil } diff --git a/pkg/pipeline/extract/conntrack/conntrack_test.go b/pkg/pipeline/extract/conntrack/conntrack_test.go index 0d38e2e08..1003c74bc 100644 --- a/pkg/pipeline/extract/conntrack/conntrack_test.go +++ b/pkg/pipeline/extract/conntrack/conntrack_test.go @@ -26,9 +26,13 @@ import ( "github.com/benbjohnson/clock" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/stretchr/testify/require" ) +var opMetrics = operational.NewMetrics(&config.MetricsSettings{}) + func buildMockConnTrackConfig(isBidirectional bool, outputRecordType []string) *config.StageParam { splitAB := isBidirectional var hash api.ConnTrackHash @@ -149,12 +153,13 @@ func TestTrack(t *testing.T) { }, } - for _, test := range table { - t.Run(test.name, func(t *testing.T) { - ct, err := NewConnectionTrack(*test.conf, clock.NewMock()) + for _, testt := range table { + t.Run(testt.name, func(t *testing.T) { + test.ResetPromRegistry() + ct, err := NewConnectionTrack(opMetrics, *testt.conf, clock.NewMock()) require.NoError(t, err) - actual := ct.Extract(test.inputFlowLogs) - require.Equal(t, test.expected, actual) + actual := ct.Extract(testt.inputFlowLogs) + require.Equal(t, testt.expected, actual) }) } } @@ -164,9 +169,10 @@ func TestTrack(t *testing.T) { // The test simulates 2 flow logs from A to B and 2 from B to A in different timestamps. // Then the test verifies that an end connection record is outputted only after 30 seconds from the last flow log. func TestEndConn_Bidirectional(t *testing.T) { + test.ResetPromRegistry() clk := clock.NewMock() conf := buildMockConnTrackConfig(true, []string{"newConnection", "flowLog", "endConnection"}) - ct, err := NewConnectionTrack(*conf, clk) + ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) ipA := "10.0.0.1" @@ -246,9 +252,10 @@ func TestEndConn_Bidirectional(t *testing.T) { // The test simulates 2 flow logs from A to B and 2 from B to A in different timestamps. // Then the test verifies that an end connection record is outputted only after 30 seconds from the last flow log. func TestEndConn_Unidirectional(t *testing.T) { + test.ResetPromRegistry() clk := clock.NewMock() conf := buildMockConnTrackConfig(false, []string{"newConnection", "flowLog", "endConnection"}) - ct, err := NewConnectionTrack(*conf, clk) + ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) ipA := "10.0.0.1" @@ -345,9 +352,10 @@ func TestEndConn_Unidirectional(t *testing.T) { // Then the test verifies that an update connection record is outputted only after 10 seconds from the last update // connection report. func TestUpdateConn_Unidirectional(t *testing.T) { + test.ResetPromRegistry() clk := clock.NewMock() conf := buildMockConnTrackConfig(false, []string{"newConnection", "flowLog", "updateConnection", "endConnection"}) - ct, err := NewConnectionTrack(*conf, clk) + ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) ipA := "10.0.0.1" @@ -494,8 +502,9 @@ func assertConnDoesntExist(t *testing.T, store *connectionStore, hashId uint64) } func TestIterateFrontToBack(t *testing.T) { + test.ResetPromRegistry() // This test adds 2 connections to the store, deletes them and verifies deletion. - cs := newConnectionStore() + cs := newConnectionStore(newMetrics(operational.NewMetrics(&config.MetricsSettings{}))) conn1hash := totalHashType{0x10, 0x11, 0x12} conn1 := NewConnBuilder().Hash(conn1hash).Build() @@ -520,11 +529,12 @@ func TestPrepareUpdateConnectionRecords(t *testing.T) { // It sets the update report interval to 10 seconds and creates 3 records for the first interval and 3 records for the second interval (6 in total). // Then, it calls prepareUpdateConnectionRecords() a couple of times in different times. // It makes sure that only the right records are returned on each call. + test.ResetPromRegistry() clk := clock.NewMock() conf := buildMockConnTrackConfig(false, []string{"updateConnection"}) interval := 10 * time.Second conf.Extract.ConnTrack.UpdateConnectionInterval = api.Duration{Duration: interval} - extract, err := NewConnectionTrack(*conf, clk) + extract, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) ct := extract.(*conntrackImpl) startTime := clk.Now() diff --git a/pkg/pipeline/extract/conntrack/metrics.go b/pkg/pipeline/extract/conntrack/metrics.go index 41d175ca0..9510fb373 100644 --- a/pkg/pipeline/extract/conntrack/metrics.go +++ b/pkg/pipeline/extract/conntrack/metrics.go @@ -18,40 +18,42 @@ package conntrack import ( - operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/prometheus/client_golang/prometheus" ) -const ( - classificationLabel = "classification" - typeLabel = "type" +var ( + connStoreLengthDef = operational.DefineMetric( + "conntrack_memory_connections", + "The total number of tracked connections in memory.", + operational.TypeGauge, + ) + + inputRecordsDef = operational.DefineMetric( + "conntrack_input_records", + "The total number of input records per classification.", + operational.TypeCounter, + "classification", + ) + + outputRecordsDef = operational.DefineMetric( + "conntrack_output_records", + "The total number of output records.", + operational.TypeCounter, + "type", + ) ) -var metrics = newMetrics() - type metricsType struct { connStoreLength prometheus.Gauge inputRecords *prometheus.CounterVec outputRecords *prometheus.CounterVec } -func newMetrics() *metricsType { - var m metricsType - - m.connStoreLength = operationalMetrics.NewGauge(prometheus.GaugeOpts{ - Name: "conntrack_memory_connections", - Help: "The total number of tracked connections in memory.", - }) - - m.inputRecords = operationalMetrics.NewCounterVec(prometheus.CounterOpts{ - Name: "conntrack_input_records", - Help: "The total number of input records per classification.", - }, []string{classificationLabel}) - - m.outputRecords = operationalMetrics.NewCounterVec(prometheus.CounterOpts{ - Name: "conntrack_output_records", - Help: "The total number of output records.", - }, []string{typeLabel}) - - return &m +func newMetrics(opMetrics *operational.Metrics) *metricsType { + return &metricsType{ + connStoreLength: opMetrics.NewGauge(&connStoreLengthDef), + inputRecords: opMetrics.NewCounterVec(&inputRecordsDef), + outputRecords: opMetrics.NewCounterVec(&outputRecordsDef), + } } diff --git a/pkg/pipeline/extract/conntrack/store.go b/pkg/pipeline/extract/conntrack/store.go index f14a6dbad..48a8e92c5 100644 --- a/pkg/pipeline/extract/conntrack/store.go +++ b/pkg/pipeline/extract/conntrack/store.go @@ -32,7 +32,8 @@ const ( // connectionStore provides both retrieving a connection by its hash and iterating connections sorted by their last // update time. type connectionStore struct { - mom *utils.MultiOrderedMap + mom *utils.MultiOrderedMap + metrics *metricsType } type processConnF func(connection) (shouldDelete, shouldStop bool) @@ -42,7 +43,7 @@ func (cs *connectionStore) addConnection(hashId uint64, conn connection) { if err != nil { log.Errorf("BUG. connection with hash %x already exists in store. %v", hashId, conn) } - metrics.connStoreLength.Set(float64(cs.mom.Len())) + cs.metrics.connStoreLength.Set(float64(cs.mom.Len())) } func (cs *connectionStore) getConnection(hashId uint64) (connection, bool) { @@ -89,11 +90,12 @@ func (cs *connectionStore) iterateFrontToBack(orderID utils.OrderID, f processCo shouldDelete, shouldStop = f(r.(connection)) return }) - metrics.connStoreLength.Set(float64(cs.mom.Len())) + cs.metrics.connStoreLength.Set(float64(cs.mom.Len())) } -func newConnectionStore() *connectionStore { +func newConnectionStore(metrics *metricsType) *connectionStore { return &connectionStore{ - mom: utils.NewMultiOrderedMap(expiryOrder, nextUpdateReportTimeOrder), + mom: utils.NewMultiOrderedMap(expiryOrder, nextUpdateReportTimeOrder), + metrics: metrics, } } diff --git a/pkg/operational/health/health_test.go b/pkg/pipeline/health_test.go similarity index 77% rename from pkg/operational/health/health_test.go rename to pkg/pipeline/health_test.go index 3aa08bf48..69a1da6f5 100644 --- a/pkg/operational/health/health_test.go +++ b/pkg/pipeline/health_test.go @@ -15,7 +15,7 @@ * */ -package health +package pipeline import ( "fmt" @@ -25,7 +25,7 @@ import ( "time" "github.com/netobserv/flowlogs-pipeline/pkg/config" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/stretchr/testify/require" ) @@ -34,7 +34,7 @@ func TestNewHealthServer(t *testing.T) { livePath := "/live" type args struct { - pipeline pipeline.Pipeline + pipeline Pipeline port string } type want struct { @@ -46,8 +46,8 @@ func TestNewHealthServer(t *testing.T) { args args want want }{ - {name: "pipeline running", args: args{pipeline: pipeline.Pipeline{IsRunning: true}, port: "7000"}, want: want{statusCode: 200}}, - {name: "pipeline not running", args: args{pipeline: pipeline.Pipeline{IsRunning: false}, port: "7001"}, want: want{statusCode: 503}}, + {name: "pipeline running", args: args{pipeline: Pipeline{IsRunning: true}, port: "7000"}, want: want{statusCode: 200}}, + {name: "pipeline not running", args: args{pipeline: Pipeline{IsRunning: false}, port: "7001"}, want: want{statusCode: 503}}, } for _, tt := range tests { @@ -55,9 +55,9 @@ func TestNewHealthServer(t *testing.T) { opts := config.Options{Health: config.Health{Port: tt.args.port}} expectedAddr := fmt.Sprintf("0.0.0.0:%s", opts.Health.Port) - server := NewHealthServer(&opts, &tt.args.pipeline) + server := operational.NewHealthServer(&opts, tt.args.pipeline.IsAlive, tt.args.pipeline.IsReady) require.NotNil(t, server) - require.Equal(t, expectedAddr, server.address) + require.Equal(t, expectedAddr, server.Address) client := &http.Client{} diff --git a/pkg/pipeline/ingest/ingest_collector.go b/pkg/pipeline/ingest/ingest_collector.go index 663df37f1..2ab529017 100644 --- a/pkg/pipeline/ingest/ingest_collector.go +++ b/pkg/pipeline/ingest/ingest_collector.go @@ -27,14 +27,13 @@ import ( ms "github.com/mitchellh/mapstructure" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" - operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" pUtils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" goflowFormat "github.com/netsampler/goflow2/format" goflowCommonFormat "github.com/netsampler/goflow2/format/common" _ "github.com/netsampler/goflow2/format/protobuf" goflowpb "github.com/netsampler/goflow2/pb" "github.com/netsampler/goflow2/utils" - "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" ) @@ -53,6 +52,7 @@ type ingestCollector struct { batchFlushTime time.Duration batchMaxLength int exitChan <-chan struct{} + metrics *metrics } // TransportWrapper is an implementation of the goflow2 transport interface @@ -60,16 +60,6 @@ type TransportWrapper struct { c chan map[string]interface{} } -var queueLength = operationalMetrics.NewGauge(prometheus.GaugeOpts{ - Name: "ingest_collector_queue_length", - Help: "Queue length", -}) - -var linesProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{ - Name: "ingest_collector_flow_logs_processed", - Help: "Number of log lines (flow logs) processed", -}) - func NewWrapper(c chan map[string]interface{}) *TransportWrapper { tw := TransportWrapper{c: c} return &tw @@ -114,7 +104,7 @@ func (w *TransportWrapper) Send(_, data []byte) error { // Ingest ingests entries from a network collector using goflow2 library (https://github.com/netsampler/goflow2) func (ingestC *ingestCollector) Ingest(out chan<- []config.GenericMap) { ctx := context.Background() - ingestC.in = make(chan map[string]interface{}, channelSize) + ingestC.metrics.createOutQueueLen(out) // initialize background listeners (a.k.a.netflow+legacy collector) ingestC.initCollectorListener(ctx) @@ -172,8 +162,7 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []config.GenericMap) records = append(records, record) if len(records) >= ingestC.batchMaxLength { log.Debugf("ingestCollector sending %d entries, %d entries waiting", len(records), len(ingestC.in)) - linesProcessed.Add(float64(len(records))) - queueLength.Set(float64(len(out))) + ingestC.metrics.flowsProcessed.Add(float64(len(records))) log.Debugf("ingestCollector records = %v", records) out <- records records = []config.GenericMap{} @@ -188,8 +177,7 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []config.GenericMap) } } log.Debugf("ingestCollector sending %d entries, %d entries waiting", len(records), len(ingestC.in)) - linesProcessed.Add(float64(len(records))) - queueLength.Set(float64(len(out))) + ingestC.metrics.flowsProcessed.Add(float64(len(records))) log.Debugf("ingestCollector records = %v", records) out <- records records = []config.GenericMap{} @@ -199,7 +187,7 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []config.GenericMap) } // NewIngestCollector create a new ingester -func NewIngestCollector(params config.StageParam) (Ingester, error) { +func NewIngestCollector(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) { jsonIngestCollector := api.IngestCollector{} if params.Ingest != nil && params.Ingest.Collector != nil { jsonIngestCollector = *params.Ingest.Collector @@ -220,6 +208,9 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) { bml = jsonIngestCollector.BatchMaxLen } + in := make(chan map[string]interface{}, channelSize) + metrics := newMetrics(opMetrics, params.Name, func() int { return len(in) }) + return &ingestCollector{ hostname: jsonIngestCollector.HostName, port: jsonIngestCollector.Port, @@ -227,5 +218,7 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) { exitChan: pUtils.ExitChannel(), batchFlushTime: defaultBatchFlushTime, batchMaxLength: bml, + in: in, + metrics: metrics, }, nil } diff --git a/pkg/pipeline/ingest/ingest_collector_test.go b/pkg/pipeline/ingest/ingest_collector_test.go index 3e5c84c3b..7b1e94363 100644 --- a/pkg/pipeline/ingest/ingest_collector_test.go +++ b/pkg/pipeline/ingest/ingest_collector_test.go @@ -21,7 +21,9 @@ import ( "testing" "time" + "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -32,14 +34,13 @@ const timeout = 5 * time.Second func TestIngest(t *testing.T) { collectorPort, err := test.UDPPort() require.NoError(t, err) - ic := &ingestCollector{ - hostname: "0.0.0.0", - port: collectorPort, - batchFlushTime: 10 * time.Millisecond, - exitChan: make(chan struct{}), - } + stage := config.NewCollectorPipeline("ingest-ipfix", api.IngestCollector{ + HostName: "0.0.0.0", + Port: collectorPort, + }) + ic, err := NewIngestCollector(operational.NewMetrics(&config.MetricsSettings{}), stage.GetStageParams()[0]) + require.NoError(t, err) forwarded := make(chan []config.GenericMap) - //defer close(forwarded) // GIVEN an IPFIX collector Ingester go ic.Ingest(forwarded) diff --git a/pkg/pipeline/ingest/ingest_grpc.go b/pkg/pipeline/ingest/ingest_grpc.go index db135bb7c..1a9aaa6e7 100644 --- a/pkg/pipeline/ingest/ingest_grpc.go +++ b/pkg/pipeline/ingest/ingest_grpc.go @@ -9,6 +9,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc" @@ -41,9 +42,10 @@ var ( type GRPCProtobuf struct { collector *grpc.CollectorServer flowPackets chan *pbflow.Records + metrics *metrics } -func NewGRPCProtobuf(params config.StageParam) (*GRPCProtobuf, error) { +func NewGRPCProtobuf(opMetrics *operational.Metrics, params config.StageParam) (*GRPCProtobuf, error) { netObserv := api.IngestGRPCProto{} if params.Ingest != nil && params.Ingest.GRPC != nil { netObserv = *params.Ingest.GRPC @@ -56,18 +58,22 @@ func NewGRPCProtobuf(params config.StageParam) (*GRPCProtobuf, error) { bufLen = defaultBufferLen } flowPackets := make(chan *pbflow.Records, bufLen) + metrics := newMetrics(opMetrics, params.Name, func() int { return len(flowPackets) }) + counter := func(inc int) { metrics.flowsProcessed.Add(float64(inc)) } collector, err := grpc.StartCollector(netObserv.Port, flowPackets, - grpc.WithGRPCServerOptions(grpc2.UnaryInterceptor(instrumentGRPC(netObserv.Port)))) + grpc.WithGRPCServerOptions(grpc2.UnaryInterceptor(instrumentGRPC(netObserv.Port, counter)))) if err != nil { return nil, err } return &GRPCProtobuf{ collector: collector, flowPackets: flowPackets, + metrics: metrics, }, nil } func (no *GRPCProtobuf) Ingest(out chan<- []config.GenericMap) { + no.metrics.createOutQueueLen(out) go func() { <-utils.ExitChannel() close(no.flowPackets) @@ -86,7 +92,7 @@ func (no *GRPCProtobuf) Close() error { return err } -func instrumentGRPC(port int) grpc2.UnaryServerInterceptor { +func instrumentGRPC(port int, counter func(int)) grpc2.UnaryServerInterceptor { localPort := strconv.Itoa(port) return func( ctx context.Context, @@ -111,7 +117,7 @@ func instrumentGRPC(port int) grpc2.UnaryServerInterceptor { prometheus.Labels{"worker": "", "name": decoderName}).Inc() // instruments number of processed individual flows - linesProcessed.Add(float64(len(flowRecords.Entries))) + counter(len(flowRecords.Entries)) // extract sender IP address remoteIP := "unknown" diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index 870e56205..fa4e6f125 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -23,6 +23,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" @@ -43,8 +44,8 @@ type ingestKafka struct { decoder decode.Decoder in chan []byte exitChan <-chan struct{} - prevRecords []config.GenericMap // copy of most recently sent records; for testing and debugging batchMaxLength int + metrics *metrics canLogMessages bool } @@ -55,6 +56,7 @@ const defaultKafkaCommitInterval = 500 // Ingest ingests entries from kafka topic func (ingestK *ingestKafka) Ingest(out chan<- []config.GenericMap) { log.Debugf("entering ingestKafka.Ingest") + ingestK.metrics.createOutQueueLen(out) // initialize background listener ingestK.kafkaListener() @@ -119,16 +121,13 @@ func (ingestK *ingestKafka) processBatch(out chan<- []config.GenericMap, records // Update metrics flowDecoderCount.With( prometheus.Labels{"worker": "", "name": ingestK.kafkaParams.Decoder.Type}).Inc() - linesProcessed.Add(float64(len(records))) - queueLength.Set(float64(len(out))) - ingestK.prevRecords = decoded + ingestK.metrics.flowsProcessed.Add(float64(len(records))) for _, record := range decoded { processRecordDelay(record) } // Send batch - log.Debugf("prevRecords = %v", ingestK.prevRecords) out <- decoded } @@ -165,7 +164,7 @@ func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) { } // NewIngestKafka create a new ingester -func NewIngestKafka(params config.StageParam) (Ingester, error) { +func NewIngestKafka(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) { log.Debugf("entering NewIngestKafka") jsonIngestKafka := api.IngestKafka{} if params.Ingest != nil && params.Ingest.Kafka != nil { @@ -259,14 +258,17 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) { bml = jsonIngestKafka.BatchMaxLen } + in := make(chan []byte, 2*bml) + metrics := newMetrics(opMetrics, params.Name, func() int { return len(in) }) + return &ingestKafka{ kafkaParams: jsonIngestKafka, kafkaReader: kafkaReader, decoder: decoder, exitChan: utils.ExitChannel(), - in: make(chan []byte, 2*bml), - prevRecords: make([]config.GenericMap, 0), + in: in, batchMaxLength: bml, + metrics: metrics, canLogMessages: jsonIngestKafka.Decoder.Type == api.DecoderName("JSON"), }, nil } diff --git a/pkg/pipeline/ingest/ingest_kafka_test.go b/pkg/pipeline/ingest/ingest_kafka_test.go index ab63aec2a..c66c2c116 100644 --- a/pkg/pipeline/ingest/ingest_kafka_test.go +++ b/pkg/pipeline/ingest/ingest_kafka_test.go @@ -24,6 +24,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/test" kafkago "github.com/segmentio/kafka-go" "github.com/stretchr/testify/mock" @@ -70,10 +71,11 @@ parameters: ` func initNewIngestKafka(t *testing.T, configTemplate string) Ingester { + test.ResetPromRegistry() v, cfg := test.InitConfig(t, configTemplate) require.NotNil(t, v) - newIngest, err := NewIngestKafka(cfg.Parameters[0]) + newIngest, err := NewIngestKafka(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0]) require.NoError(t, err) return newIngest } @@ -255,18 +257,20 @@ func Test_BatchTimeout(t *testing.T) { } func Test_TLSConfigEmpty(t *testing.T) { + test.ResetPromRegistry() stage := config.NewKafkaPipeline("ingest-kafka", api.IngestKafka{ Brokers: []string{"any"}, Topic: "topic", Decoder: api.Decoder{Type: "json"}, }) - newIngest, err := NewIngestKafka(stage.GetStageParams()[0]) + newIngest, err := NewIngestKafka(operational.NewMetrics(&config.MetricsSettings{}), stage.GetStageParams()[0]) require.NoError(t, err) tlsConfig := newIngest.(*ingestKafka).kafkaReader.Config().Dialer.TLS require.Nil(t, tlsConfig) } func Test_TLSConfigCA(t *testing.T) { + test.ResetPromRegistry() ca, cleanup := test.CreateCACert(t) defer cleanup() stage := config.NewKafkaPipeline("ingest-kafka", api.IngestKafka{ @@ -277,7 +281,7 @@ func Test_TLSConfigCA(t *testing.T) { CACertPath: ca, }, }) - newIngest, err := NewIngestKafka(stage.GetStageParams()[0]) + newIngest, err := NewIngestKafka(operational.NewMetrics(&config.MetricsSettings{}), stage.GetStageParams()[0]) require.NoError(t, err) tlsConfig := newIngest.(*ingestKafka).kafkaReader.Config().Dialer.TLS @@ -288,6 +292,7 @@ func Test_TLSConfigCA(t *testing.T) { } func Test_MutualTLSConfig(t *testing.T) { + test.ResetPromRegistry() ca, user, userKey, cleanup := test.CreateAllCerts(t) defer cleanup() stage := config.NewKafkaPipeline("ingest-kafka", api.IngestKafka{ @@ -300,7 +305,7 @@ func Test_MutualTLSConfig(t *testing.T) { UserKeyPath: userKey, }, }) - newIngest, err := NewIngestKafka(stage.GetStageParams()[0]) + newIngest, err := NewIngestKafka(operational.NewMetrics(&config.MetricsSettings{}), stage.GetStageParams()[0]) require.NoError(t, err) tlsConfig := newIngest.(*ingestKafka).kafkaReader.Config().Dialer.TLS diff --git a/pkg/pipeline/ingest/metrics.go b/pkg/pipeline/ingest/metrics.go new file mode 100644 index 000000000..bc322bf33 --- /dev/null +++ b/pkg/pipeline/ingest/metrics.go @@ -0,0 +1,26 @@ +package ingest + +import ( + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/prometheus/client_golang/prometheus" +) + +type metrics struct { + *operational.Metrics + stage string + flowsProcessed prometheus.Counter +} + +func newMetrics(opMetrics *operational.Metrics, stage string, inGaugeFunc func() int) *metrics { + opMetrics.CreateInQueueSizeGauge(stage, inGaugeFunc) + return &metrics{ + Metrics: opMetrics, + stage: stage, + flowsProcessed: opMetrics.CreateFlowsProcessedCounter(stage), + } +} + +func (m *metrics) createOutQueueLen(out chan<- []config.GenericMap) { + m.CreateOutQueueSizeGauge(m.stage, func() int { return len(out) }) +} diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index c26dbfb85..228b30e1a 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -20,8 +20,8 @@ package pipeline import ( "fmt" - "github.com/heptiolabs/healthcheck" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/gopipes/pkg/node" log "github.com/sirupsen/logrus" ) @@ -43,6 +43,7 @@ type Pipeline struct { // TODO: this field is only used for test verification. We should rewrite the build process // to be able to remove it from here pipelineStages []*pipelineEntry + Metrics *operational.Metrics } // NewPipeline defines the pipeline elements @@ -54,7 +55,10 @@ func NewPipeline(cfg *config.ConfigFileStruct) (*Pipeline, error) { configParams := cfg.Parameters log.Debugf("configParams = %v ", configParams) - build := newBuilder(configParams, stages) + // Get global metrics settings + om := operational.NewMetrics(&cfg.MetricsSettings) + + build := newBuilder(configParams, stages, om) if err := build.readStages(); err != nil { return nil, err } @@ -75,20 +79,16 @@ func (p *Pipeline) Run() { p.IsRunning = false } -func (p *Pipeline) IsReady() healthcheck.Check { - return func() error { - if !p.IsRunning { - return fmt.Errorf("pipeline is not running") - } - return nil +func (p *Pipeline) IsReady() error { + if !p.IsRunning { + return fmt.Errorf("pipeline is not running") } + return nil } -func (p *Pipeline) IsAlive() healthcheck.Check { - return func() error { - if !p.IsRunning { - return fmt.Errorf("pipeline is not running") - } - return nil +func (p *Pipeline) IsAlive() error { + if !p.IsRunning { + return fmt.Errorf("pipeline is not running") } + return nil } diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index b766e988b..33e2ac9c6 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -8,7 +8,7 @@ import ( "github.com/benbjohnson/clock" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" - operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/conntrack" @@ -43,6 +43,8 @@ type builder struct { createdStages map[string]interface{} startNodes []*node.Init terminalNodes []*node.Terminal + opMetrics *operational.Metrics + stageDuration *prometheus.HistogramVec } type pipelineEntry struct { @@ -55,18 +57,25 @@ type pipelineEntry struct { Writer write.Writer } -var stageDuration = operationalMetrics.NewHistogramVec(prometheus.HistogramOpts{ - Name: "stage_duration_ms", - Help: "Pipeline stage duration in milliseconds", - Buckets: []float64{.001, .01, .1, 1, 10, 100, 1000, 10000}, -}, []string{"name"}) +var ( + stageDurationDef = operational.DefineMetric( + "stage_duration_ms", + "Pipeline stage duration in milliseconds", + operational.TypeHistogram, + "stage", + ) +) + +func newBuilder(params []config.StageParam, stages []config.Stage, opMetrics *operational.Metrics) *builder { + stageDuration := opMetrics.NewHistogramVec(&stageDurationDef, []float64{.001, .01, .1, 1, 10, 100, 1000, 10000}) -func newBuilder(params []config.StageParam, stages []config.Stage) *builder { return &builder{ pipelineEntryMap: map[string]*pipelineEntry{}, createdStages: map[string]interface{}{}, configStages: stages, configParams: params, + opMetrics: opMetrics, + stageDuration: stageDuration, } } @@ -81,15 +90,15 @@ func (b *builder) readStages() error { var err error switch pEntry.stageType { case StageIngest: - pEntry.Ingester, err = getIngester(param) + pEntry.Ingester, err = getIngester(b.opMetrics, param) case StageTransform: - pEntry.Transformer, err = getTransformer(param) + pEntry.Transformer, err = getTransformer(b.opMetrics, param) case StageExtract: - pEntry.Extractor, err = getExtractor(param) + pEntry.Extractor, err = getExtractor(b.opMetrics, param) case StageEncode: - pEntry.Encoder, err = getEncoder(param) + pEntry.Encoder, err = getEncoder(b.opMetrics, param) case StageWrite: - pEntry.Writer, err = getWriter(param) + pEntry.Writer, err = getWriter(b.opMetrics, param) default: err = fmt.Errorf("invalid stage type: %v, stage name: %v", pEntry.stageType, pEntry.stageName) } @@ -180,6 +189,7 @@ func (b *builder) build() (*Pipeline, error) { startNodes: b.startNodes, terminalNodes: b.terminalNodes, pipelineStages: b.pipelineStages, + Metrics: b.opMetrics, }, nil } @@ -217,11 +227,11 @@ func isSender(p *pipelineEntry) bool { return p.stageType != StageWrite && p.stageType != StageEncode } -func runMeasured(name string, f func()) { +func (b *builder) runMeasured(name string, f func()) { start := time.Now() f() duration := time.Since(start) - stageDuration.WithLabelValues(name).Observe(float64(duration.Milliseconds())) + b.stageDuration.WithLabelValues(name).Observe(float64(duration.Milliseconds())) } func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, error) { @@ -238,8 +248,9 @@ func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, stage = init case StageWrite: term := node.AsTerminal(func(in <-chan []config.GenericMap) { + b.opMetrics.CreateInQueueSizeGauge(stageID, func() int { return len(in) }) for i := range in { - runMeasured(stageID, func() { + b.runMeasured(stageID, func() { pe.Writer.Write(i) }) } @@ -248,8 +259,9 @@ func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, stage = term case StageEncode: encode := node.AsTerminal(func(in <-chan []config.GenericMap) { + b.opMetrics.CreateInQueueSizeGauge(stageID, func() int { return len(in) }) for i := range in { - runMeasured(stageID, func() { + b.runMeasured(stageID, func() { pe.Encoder.Encode(i) }) } @@ -258,16 +270,20 @@ func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, stage = encode case StageTransform: stage = node.AsMiddle(func(in <-chan []config.GenericMap, out chan<- []config.GenericMap) { + b.opMetrics.CreateInQueueSizeGauge(stageID, func() int { return len(in) }) + b.opMetrics.CreateOutQueueSizeGauge(stageID, func() int { return len(out) }) for i := range in { - runMeasured(stageID, func() { + b.runMeasured(stageID, func() { out <- pe.Transformer.Transform(i) }) } }) case StageExtract: stage = node.AsMiddle(func(in <-chan []config.GenericMap, out chan<- []config.GenericMap) { + b.opMetrics.CreateInQueueSizeGauge(stageID, func() int { return len(in) }) + b.opMetrics.CreateOutQueueSizeGauge(stageID, func() int { return len(out) }) for i := range in { - runMeasured(stageID, func() { + b.runMeasured(stageID, func() { out <- pe.Extractor.Extract(i) }) } @@ -282,18 +298,18 @@ func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, return stage, nil } -func getIngester(params config.StageParam) (ingest.Ingester, error) { +func getIngester(opMetrics *operational.Metrics, params config.StageParam) (ingest.Ingester, error) { var ingester ingest.Ingester var err error switch params.Ingest.Type { case api.FileType, api.FileLoopType, api.FileChunksType: ingester, err = ingest.NewIngestFile(params) case api.CollectorType: - ingester, err = ingest.NewIngestCollector(params) + ingester, err = ingest.NewIngestCollector(opMetrics, params) case api.KafkaType: - ingester, err = ingest.NewIngestKafka(params) + ingester, err = ingest.NewIngestKafka(opMetrics, params) case api.GRPCType: - ingester, err = ingest.NewGRPCProtobuf(params) + ingester, err = ingest.NewGRPCProtobuf(opMetrics, params) case api.FakeType: ingester, err = ingest.NewIngestFake(params) default: @@ -302,7 +318,7 @@ func getIngester(params config.StageParam) (ingest.Ingester, error) { return ingester, err } -func getWriter(params config.StageParam) (write.Writer, error) { +func getWriter(opMetrics *operational.Metrics, params config.StageParam) (write.Writer, error) { var writer write.Writer var err error switch params.Write.Type { @@ -311,7 +327,7 @@ func getWriter(params config.StageParam) (write.Writer, error) { case api.NoneType: writer, err = write.NewWriteNone() case api.LokiType: - writer, err = write.NewWriteLoki(params) + writer, err = write.NewWriteLoki(opMetrics, params) case api.FakeType: writer, err = write.NewWriteFake(params) default: @@ -320,7 +336,7 @@ func getWriter(params config.StageParam) (write.Writer, error) { return writer, err } -func getTransformer(params config.StageParam) (transform.Transformer, error) { +func getTransformer(opMetrics *operational.Metrics, params config.StageParam) (transform.Transformer, error) { var transformer transform.Transformer var err error switch params.Transform.Type { @@ -338,7 +354,7 @@ func getTransformer(params config.StageParam) (transform.Transformer, error) { return transformer, err } -func getExtractor(params config.StageParam) (extract.Extractor, error) { +func getExtractor(opMetrics *operational.Metrics, params config.StageParam) (extract.Extractor, error) { var extractor extract.Extractor var err error switch params.Extract.Type { @@ -347,7 +363,7 @@ func getExtractor(params config.StageParam) (extract.Extractor, error) { case api.AggregateType: extractor, err = extract.NewExtractAggregate(params) case api.ConnTrackType: - extractor, err = conntrack.NewConnectionTrack(params, clock.New()) + extractor, err = conntrack.NewConnectionTrack(opMetrics, params, clock.New()) case api.TimebasedType: extractor, err = extract.NewExtractTimebased(params) default: @@ -356,14 +372,14 @@ func getExtractor(params config.StageParam) (extract.Extractor, error) { return extractor, err } -func getEncoder(params config.StageParam) (encode.Encoder, error) { +func getEncoder(opMetrics *operational.Metrics, params config.StageParam) (encode.Encoder, error) { var encoder encode.Encoder var err error switch params.Encode.Type { case api.PromType: - encoder, err = encode.NewEncodeProm(params) + encoder, err = encode.NewEncodeProm(opMetrics, params) case api.KafkaType: - encoder, err = encode.NewEncodeKafka(params) + encoder, err = encode.NewEncodeKafka(opMetrics, params) case api.NoneType: encoder, _ = encode.NewEncodeNone() default: diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index 7ab3cfdf2..34ee5d247 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -29,6 +29,7 @@ import ( test2 "github.com/mariomac/guara/pkg/test" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write" @@ -61,7 +62,7 @@ func Test_transformToLoki(t *testing.T) { v, cfg := test.InitConfig(t, yamlConfigNoParams) require.NotNil(t, v) - loki, err := write.NewWriteLoki(cfg.Parameters[0]) + loki, err := write.NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0]) require.NoError(t, err) loki.Write(transformed) } @@ -209,7 +210,7 @@ parameters: require.True(t, scanner.Scan()) capturedRecord := map[string]interface{}{} bytes := scanner.Bytes() - require.NoError(t, json.Unmarshal(bytes, &capturedRecord)) + require.NoError(t, json.Unmarshal(bytes, &capturedRecord), string(bytes)) assert.NotZero(t, capturedRecord["TimeReceived"]) delete(capturedRecord, "TimeReceived") diff --git a/pkg/pipeline/write/metrics.go b/pkg/pipeline/write/metrics.go new file mode 100644 index 000000000..16e2511b5 --- /dev/null +++ b/pkg/pipeline/write/metrics.go @@ -0,0 +1,18 @@ +package write + +import ( + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/prometheus/client_golang/prometheus" +) + +type metrics struct { + *operational.Metrics + recordsWritten prometheus.Counter +} + +func newMetrics(opMetrics *operational.Metrics, stage string) *metrics { + return &metrics{ + Metrics: opMetrics, + recordsWritten: opMetrics.CreateRecordsWrittenCounter(stage), + } +} diff --git a/pkg/pipeline/write/write_loki.go b/pkg/pipeline/write/write_loki.go index e9bd9f9e7..8a7633fff 100644 --- a/pkg/pipeline/write/write_loki.go +++ b/pkg/pipeline/write/write_loki.go @@ -25,9 +25,8 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" - operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" pUtils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" - "github.com/prometheus/client_golang/prometheus" logAdapter "github.com/go-kit/kit/log/logrus" jsonIter "github.com/json-iterator/go" @@ -58,13 +57,9 @@ type Loki struct { timeNow func() time.Time in chan config.GenericMap exitChan <-chan struct{} + metrics *metrics } -var recordsWritten = operationalMetrics.NewCounter(prometheus.CounterOpts{ - Name: "loki_records_written", - Help: "Number of records written to loki", -}) - func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) { batchWait, err := time.ParseDuration(c.BatchWait) if err != nil { @@ -134,7 +129,7 @@ func (l *Loki) ProcessRecord(in config.GenericMap) error { timestamp := l.extractTimestamp(out) err = l.client.Handle(labels, timestamp, string(js)) if err == nil { - recordsWritten.Inc() + l.metrics.recordsWritten.Inc() } return err } @@ -232,7 +227,7 @@ func (l *Loki) processRecords() { } // NewWriteLoki creates a Loki writer from configuration -func NewWriteLoki(params config.StageParam) (*Loki, error) { +func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Loki, error) { log.Debugf("entering NewWriteLoki") lokiConfigIn := api.WriteLoki{} if params.Write != nil && params.Write.Loki != nil { @@ -271,7 +266,9 @@ func NewWriteLoki(params config.StageParam) (*Loki, error) { } } + // TODO / FIXME / FIGUREOUT: seems like we have 2 input channels for Loki? (this one, and see also pipeline_builder.go / getStageNode / StageWrite) in := make(chan config.GenericMap, channelSize) + opMetrics.CreateInQueueSizeGauge(params.Name+"-2", func() int { return len(in) }) l := &Loki{ lokiConfig: lokiConfig, @@ -282,6 +279,7 @@ func NewWriteLoki(params config.StageParam) (*Loki, error) { timeNow: time.Now, exitChan: pUtils.ExitChannel(), in: in, + metrics: newMetrics(opMetrics, params.Name), } go l.processRecords() diff --git a/pkg/pipeline/write/write_loki_test.go b/pkg/pipeline/write/write_loki_test.go index a98d75a5b..7dc4504d7 100644 --- a/pkg/pipeline/write/write_loki_test.go +++ b/pkg/pipeline/write/write_loki_test.go @@ -28,6 +28,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" @@ -72,7 +73,7 @@ parameters: v, cfg := test.InitConfig(t, yamlConfig) require.NotNil(t, v) - loki, err := NewWriteLoki(cfg.Parameters[0]) + loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0]) require.NoError(t, err) assert.Equal(t, "https://foo:8888/loki/api/v1/push", loki.lokiConfig.URL.String()) @@ -109,7 +110,7 @@ parameters: v, cfg := test.InitConfig(t, yamlConfig) require.NotNil(t, v) - loki, err := NewWriteLoki(cfg.Parameters[0]) + loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0]) require.NoError(t, err) fe := fakeEmitter{} @@ -163,7 +164,7 @@ parameters: v, cfg := test.InitConfig(t, yamlConf) require.NotNil(t, v) - loki, err := NewWriteLoki(cfg.Parameters[0]) + loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0]) require.NoError(t, err) fe := fakeEmitter{} @@ -204,7 +205,7 @@ func TestTimestampExtraction_LocalTime(t *testing.T) { v, cfg := test.InitConfig(t, yamlConfigNoParams) require.NotNil(t, v) - loki, err := NewWriteLoki(cfg.Parameters[0]) + loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0]) require.NoError(t, err) loki.apiConfig.TimestampLabel = testCase.tsLabel @@ -246,7 +247,7 @@ parameters: v, cfg := test.InitConfig(t, yamlConfig) require.NotNil(t, v) - loki, err := NewWriteLoki(cfg.Parameters[0]) + loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0]) require.NoError(t, err) fe := fakeEmitter{} @@ -279,7 +280,7 @@ parameters: url: %s `, fakeLoki.URL) _, cfg := test.InitConfig(t, yamlConfig) - loki, err := NewWriteLoki(cfg.Parameters[0]) + loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0]) require.NoError(t, err) require.NoError(t, loki.ProcessRecord(map[string]interface{}{"foo": "bar", "baz": "bae"})) @@ -329,7 +330,7 @@ func BenchmarkWriteLoki(b *testing.B) { TimestampScale: "1ms", } - loki, err := NewWriteLoki(config.StageParam{Write: &config.Write{Loki: ¶ms}}) + loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), config.StageParam{Write: &config.Write{Loki: ¶ms}}) require.NoError(b, err) for i := 0; i < b.N; i++ { diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 213e23222..174b204e9 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -32,6 +32,7 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/prometheus/client_golang/prometheus" "github.com/spf13/viper" "github.com/stretchr/testify/require" ) @@ -58,6 +59,7 @@ func GetIngestMockEntry(missingKey bool) config.GenericMap { } func InitConfig(t *testing.T, conf string) (*viper.Viper, *config.ConfigFileStruct) { + ResetPromRegistry() var json = jsoniter.ConfigCompatibleWithStandardLibrary yamlConfig := []byte(conf) v := viper.New() @@ -190,3 +192,9 @@ func GetExtractMockEntries2() []config.GenericMap { } return entries } + +func ResetPromRegistry() { + reg := prometheus.NewRegistry() + prometheus.DefaultRegisterer = reg + prometheus.DefaultGatherer = reg +} From 82dd1799c81b7439bb05bfc09c19925bad357d4c Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Tue, 27 Sep 2022 07:44:57 +0200 Subject: [PATCH 2/2] Allow filtering metrics on numeric values --- go.mod | 2 +- go.sum | 2 ++ pkg/pipeline/encode/encode_prom.go | 12 ++++--- pkg/pipeline/encode/encode_prom_test.go | 15 +++++++++ .../netobserv/loki-client-go/loki/client.go | 31 ++++++++++--------- vendor/modules.txt | 3 +- 6 files changed, 43 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index 6e0fdf5fa..7bcc904ce 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f github.com/mitchellh/mapstructure v1.4.3 github.com/netobserv/gopipes v0.1.1 - github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9 + github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500 github.com/netobserv/netobserv-ebpf-agent v0.1.1-0.20220608092850-3fd4695b7cc2 github.com/netsampler/goflow2 v1.1.1-0.20220509155230-5300494e4785 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index dcac0080a..1ac919937 100644 --- a/go.sum +++ b/go.sum @@ -701,6 +701,8 @@ github.com/netobserv/gopipes v0.1.1 h1:f8zJsvnMgRFRa2B+1siwRtW0Y4dqeBROmkcI/HgT1 github.com/netobserv/gopipes v0.1.1/go.mod h1:eGoHZW1ON8Dx/zmDXUhsbVNqatPjtpdO0UZBmGZGmVI= github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9 h1:c2swm3EamzgjBq9idNbEs5bNz20FJo/HK6uxyigXekQ= github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9/go.mod h1:LHXpc5tjKvsfZn0pwLKrvlgEhZcCaw3Di9mUEZGAI4E= +github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500 h1:RmnoJe/ci5q+QdM7upFdxiU+D8F3L3qTd5wXCwwHefw= +github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500/go.mod h1:LHXpc5tjKvsfZn0pwLKrvlgEhZcCaw3Di9mUEZGAI4E= github.com/netobserv/netobserv-ebpf-agent v0.1.1-0.20220608092850-3fd4695b7cc2 h1:K7SjoqEfzpMfIjHV85Lg8UDMvZu8rPfrsgKRoo7W30o= github.com/netobserv/netobserv-ebpf-agent v0.1.1-0.20220608092850-3fd4695b7cc2/go.mod h1:996FEHp8Xj+AKCkiN4eH3dl/yF2DzuYM0kchWZOrapM= github.com/netobserv/prometheus-common v0.31.2-0.20220720134304-43e74fd22881 h1:hx5bi6xBovRjmwUoVJBzhJ3EDo4K4ZUsqqKrJuQ2vMI= diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index b88d2383a..097278794 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -196,10 +196,14 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.PromMetri func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *api.PromMetricsItem) interface{} { if info.Filter.Key != "" { - val, found := flow[info.Filter.Key] - shouldKeepRecord := found && val == info.Filter.Value - if !shouldKeepRecord { - return nil + if val, found := flow[info.Filter.Key]; found { + sVal, ok := val.(string) + if !ok { + sVal = fmt.Sprint(val) + } + if sVal != info.Filter.Value { + return nil + } } } if info.ValueKey == "" { diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index 1da6b978e..666986599 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -136,6 +136,7 @@ func Test_CustomMetric(t *testing.T) { "srcIP": "20.0.0.2", "dstIP": "10.0.0.1", "flags": "SYN", + "dir": float64(0), "bytes": 7, "packets": 1, "latency": 0.1, @@ -143,6 +144,7 @@ func Test_CustomMetric(t *testing.T) { "srcIP": "20.0.0.2", "dstIP": "10.0.0.1", "flags": "RST", + "dir": int(0), "bytes": 1, "packets": 1, "latency": 0.05, @@ -150,6 +152,7 @@ func Test_CustomMetric(t *testing.T) { "srcIP": "10.0.0.1", "dstIP": "30.0.0.3", "flags": "SYN", + "dir": 1, "bytes": 12, "packets": 2, "latency": 0.2, @@ -180,6 +183,16 @@ func Test_CustomMetric(t *testing.T) { Type: "counter", ValueKey: "", // empty valuekey means it's a records counter Labels: []string{"srcIP", "dstIP"}, + }, { + Name: "flows_incoming", + Type: "counter", + Filter: api.PromMetricsFilter{Key: "dir", Value: "0"}, + ValueKey: "", // empty valuekey means it's a records counter + }, { + Name: "flows_outgoing", + Type: "counter", + Filter: api.PromMetricsFilter{Key: "dir", Value: "1"}, + ValueKey: "", // empty valuekey means it's a records counter }}, } @@ -209,6 +222,8 @@ func Test_CustomMetric(t *testing.T) { require.Contains(t, exposed, `test_latency_seconds_count{dstIP="30.0.0.3",srcIP="10.0.0.1"} 1`) require.Contains(t, exposed, `test_flows_total{dstIP="10.0.0.1",srcIP="20.0.0.2"} 2`) require.Contains(t, exposed, `test_flows_total{dstIP="30.0.0.3",srcIP="10.0.0.1"} 1`) + require.Contains(t, exposed, `test_flows_incoming 2`) + require.Contains(t, exposed, `test_flows_outgoing 1`) } func Test_MetricTTL(t *testing.T) { diff --git a/vendor/github.com/netobserv/loki-client-go/loki/client.go b/vendor/github.com/netobserv/loki-client-go/loki/client.go index e2b669787..bf3093b5e 100644 --- a/vendor/github.com/netobserv/loki-client-go/loki/client.go +++ b/vendor/github.com/netobserv/loki-client-go/loki/client.go @@ -41,42 +41,43 @@ const ( LatencyLabel = "filename" HostLabel = "host" + MetricPrefix = "netobserv" ) var ( encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "promtail", - Name: "encoded_bytes_total", + Namespace: MetricPrefix, + Name: "loki_encoded_bytes_total", Help: "Number of bytes encoded and ready to send.", }, []string{HostLabel}) sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "promtail", - Name: "sent_bytes_total", + Namespace: MetricPrefix, + Name: "loki_sent_bytes_total", Help: "Number of bytes sent.", }, []string{HostLabel}) droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "promtail", - Name: "dropped_bytes_total", + Namespace: MetricPrefix, + Name: "loki_dropped_bytes_total", Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.", }, []string{HostLabel}) sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "promtail", - Name: "sent_entries_total", + Namespace: MetricPrefix, + Name: "loki_sent_entries_total", Help: "Number of log entries sent to the ingester.", }, []string{HostLabel}) droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "promtail", - Name: "dropped_entries_total", + Namespace: MetricPrefix, + Name: "loki_dropped_entries_total", Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.", }, []string{HostLabel}) requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "promtail", - Name: "request_duration_seconds", + Namespace: MetricPrefix, + Name: "loki_request_duration_seconds", Help: "Duration of send requests.", }, []string{"status_code", HostLabel}) batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "promtail", - Name: "batch_retries_total", + Namespace: MetricPrefix, + Name: "loki_batch_retries_total", Help: "Number of times batches has had to be retried.", }, []string{HostLabel}) streamLag *metric.Gauges @@ -97,7 +98,7 @@ func init() { prometheus.MustRegister(requestDuration) prometheus.MustRegister(batchRetries) var err error - streamLag, err = metric.NewGauges("promtail_stream_lag_seconds", + streamLag, err = metric.NewGauges(MetricPrefix+"_loki_stream_lag_seconds", "Difference between current time and last batch timestamp for successful sends", metric.GaugeConfig{Action: "set"}, int64(1*time.Minute.Seconds()), // This strips out files which update slowly and reduces noise in this metric. diff --git a/vendor/modules.txt b/vendor/modules.txt index f74aac981..d4c28e287 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -165,7 +165,7 @@ github.com/mwitkow/go-conntrack github.com/netobserv/gopipes/pkg/internal/connect github.com/netobserv/gopipes/pkg/internal/refl github.com/netobserv/gopipes/pkg/node -# github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9 +# github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500 ## explicit; go 1.15 github.com/netobserv/loki-client-go/loki github.com/netobserv/loki-client-go/pkg/backoff @@ -209,7 +209,6 @@ github.com/pmezard/go-difflib/difflib ## explicit; go 1.13 github.com/prometheus/client_golang/prometheus github.com/prometheus/client_golang/prometheus/internal -github.com/prometheus/client_golang/prometheus/promauto github.com/prometheus/client_golang/prometheus/promhttp # github.com/prometheus/client_model v0.2.0 ## explicit; go 1.9