diff --git a/Makefile b/Makefile
index e586f7ae4..7b0a1306f 100644
--- a/Makefile
+++ b/Makefile
@@ -89,6 +89,7 @@ dashboards: $(JB) $(JSONNET) ## Build grafana dashboards
docs: FORCE ## Update flowlogs-pipeline documentation
@./hack/update-docs.sh
@go run cmd/apitodoc/main.go > docs/api.md
+ @go run cmd/operationalmetricstodoc/main.go > docs/operational-metrics.md
.PHONY: clean
clean: ## Clean
diff --git a/README.md b/README.md
index 06a9e32c2..99ae97124 100644
--- a/README.md
+++ b/README.md
@@ -24,7 +24,10 @@ FLP pipe-line module is built on top of [gopipes](https://github.com/netobserv/g
In addition, along with Prometheus and its ecosystem tools such as Thanos, Cortex etc.,
FLP provides an efficient scalable multi-cloud solution for comprehensive network analytics that can rely **solely on metrics data-source**.
-Default metrics are documented here [docs/metrics.md](docs/metrics.md).
+Default network metrics are documented here [docs/metrics.md](docs/metrics.md).
+Operational metrics are documented here [docs/operational-metrics.md](docs/operational-metrics.md).
+
+> note: operational metrics are exported only using prometheus
diff --git a/cmd/flowlogs-pipeline/main.go b/cmd/flowlogs-pipeline/main.go
index 2d924cedc..f3c5588bb 100644
--- a/cmd/flowlogs-pipeline/main.go
+++ b/cmd/flowlogs-pipeline/main.go
@@ -27,7 +27,7 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
- "github.com/netobserv/flowlogs-pipeline/pkg/health"
+ "github.com/netobserv/flowlogs-pipeline/pkg/operational/health"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
log "github.com/sirupsen/logrus"
diff --git a/cmd/operationalmetricstodoc/main.go b/cmd/operationalmetricstodoc/main.go
new file mode 100644
index 000000000..bb10d1d72
--- /dev/null
+++ b/cmd/operationalmetricstodoc/main.go
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2022 IBM, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package main
+
+import (
+ "fmt"
+
+ operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics"
+ "github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
+)
+
+func main() {
+ // Do not remove this unnamed variable ---> This is needed for goland compiler/linker to init the
+ // variables and functions for all the modules including the operational metrics variables which
+ // fills up `metricsOpts` with information
+ var _ *pipeline.Pipeline
+
+ header := `
+> Note: this file was automatically generated, to update execute "make docs"
+
+# flowlogs-pipeline Operational Metrics
+
+Each table below provides documentation for an exported flowlogs-pipeline operational metric.
+
+
+
+ `
+ doc := operationalMetrics.GetDocumentation()
+ data := fmt.Sprintf("%s\n%s\n", header, doc)
+ fmt.Printf("%s", data)
+}
diff --git a/docs/operational-metrics.md b/docs/operational-metrics.md
new file mode 100644
index 000000000..103cf6008
--- /dev/null
+++ b/docs/operational-metrics.md
@@ -0,0 +1,39 @@
+
+> Note: this file was automatically generated, to update execute "make docs"
+
+# flowlogs-pipeline Operational Metrics
+
+Each table below provides documentation for an exported flowlogs-pipeline operational metric.
+
+
+
+
+
+### encode_prom_metrics_processed
+| **Name** | encode_prom_metrics_processed |
+|:---|:---|
+| **Description** | Number of metrics processed |
+| **Type** | counter |
+
+
+### ingest_collector_queue_length
+| **Name** | ingest_collector_queue_length |
+|:---|:---|
+| **Description** | Queue length |
+| **Type** | gauge |
+
+
+### ingest_collector_flow_logs_processed
+| **Name** | ingest_collector_flow_logs_processed |
+|:---|:---|
+| **Description** | Number of log lines (flow logs) processed |
+| **Type** | counter |
+
+
+### loki_records_written
+| **Name** | loki_records_written |
+|:---|:---|
+| **Description** | Number of records written to loki |
+| **Type** | counter |
+
+
diff --git a/pkg/health/health.go b/pkg/operational/health/health.go
similarity index 100%
rename from pkg/health/health.go
rename to pkg/operational/health/health.go
diff --git a/pkg/health/health_test.go b/pkg/operational/health/health_test.go
similarity index 100%
rename from pkg/health/health_test.go
rename to pkg/operational/health/health_test.go
diff --git a/pkg/operational/metrics/metrics.go b/pkg/operational/metrics/metrics.go
new file mode 100644
index 000000000..8abc680c8
--- /dev/null
+++ b/pkg/operational/metrics/metrics.go
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 2022 IBM, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package operationalMetrics
+
+import (
+ "fmt"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+type metricDefinition struct {
+ Name string
+ Help string
+ Type string
+}
+
+var metricsOpts []metricDefinition
+
+func NewCounter(opts prometheus.CounterOpts) prometheus.Counter {
+ metricsOpts = append(metricsOpts, metricDefinition{
+ Name: opts.Name,
+ Help: opts.Help,
+ Type: "counter",
+ })
+ return promauto.NewCounter(opts)
+}
+
+func NewGauge(opts prometheus.GaugeOpts) prometheus.Gauge {
+ metricsOpts = append(metricsOpts, metricDefinition{
+ Name: opts.Name,
+ Help: opts.Help,
+ Type: "gauge",
+ })
+ return promauto.NewGauge(opts)
+}
+
+func GetDocumentation() string {
+ doc := ""
+ for _, opts := range metricsOpts {
+ doc += fmt.Sprintf(
+ `
+### %s
+| **Name** | %s |
+|:---|:---|
+| **Description** | %s |
+| **Type** | %s |
+
+`,
+ opts.Name,
+ opts.Name,
+ opts.Help,
+ opts.Type,
+ )
+ }
+
+ return doc
+}
diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go
index 47aff531a..aba9434b5 100644
--- a/pkg/pipeline/encode/encode_prom.go
+++ b/pkg/pipeline/encode/encode_prom.go
@@ -27,6 +27,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
+ operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -84,6 +85,11 @@ type encodeProm struct {
exitChan chan bool
}
+var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{
+ Name: "encode_prom_metrics_processed",
+ Help: "Number of metrics processed",
+})
+
// Encode encodes a metric before being stored
func (e *encodeProm) Encode(metrics []config.GenericMap) []config.GenericMap {
log.Debugf("entering encodeProm Encode")
@@ -92,6 +98,7 @@ func (e *encodeProm) Encode(metrics []config.GenericMap) []config.GenericMap {
out := make([]config.GenericMap, 0)
for _, metric := range metrics {
// TODO: We may need different handling for histograms
+ metricsProcessed.Inc()
metricOut := e.EncodeMetric(metric)
out = append(out, metricOut...)
}
diff --git a/pkg/pipeline/ingest/ingest_collector.go b/pkg/pipeline/ingest/ingest_collector.go
index 2dbdac458..43a5ab78d 100644
--- a/pkg/pipeline/ingest/ingest_collector.go
+++ b/pkg/pipeline/ingest/ingest_collector.go
@@ -27,12 +27,14 @@ import (
ms "github.com/mitchellh/mapstructure"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
+ operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics"
pUtils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
goflowFormat "github.com/netsampler/goflow2/format"
goflowCommonFormat "github.com/netsampler/goflow2/format/common"
_ "github.com/netsampler/goflow2/format/protobuf"
goflowpb "github.com/netsampler/goflow2/pb"
"github.com/netsampler/goflow2/utils"
+ "github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
@@ -57,6 +59,16 @@ type TransportWrapper struct {
c chan map[string]interface{}
}
+var queueLength = operationalMetrics.NewGauge(prometheus.GaugeOpts{
+ Name: "ingest_collector_queue_length",
+ Help: "Queue length",
+})
+
+var linesProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{
+ Name: "ingest_collector_flow_logs_processed",
+ Help: "Number of log lines (flow logs) processed",
+})
+
func NewWrapper(c chan map[string]interface{}) *TransportWrapper {
tw := TransportWrapper{c: c}
return &tw
@@ -156,6 +168,8 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []interface{}) {
records = append(records, string(recordAsBytes))
if len(records) >= ingestC.batchMaxLength {
log.Debugf("ingestCollector sending %d entries", len(records))
+ linesProcessed.Add(float64(len(records)))
+ queueLength.Set(float64(len(out)))
out <- records
records = []interface{}{}
}
@@ -163,6 +177,8 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []interface{}) {
// Process batch of records (if not empty)
if len(records) > 0 {
log.Debugf("ingestCollector sending %d entries", len(records))
+ linesProcessed.Add(float64(len(records)))
+ queueLength.Set(float64(len(out)))
out <- records
records = []interface{}{}
}
diff --git a/pkg/pipeline/write/write_loki.go b/pkg/pipeline/write/write_loki.go
index 62be19d6f..6b71505ee 100644
--- a/pkg/pipeline/write/write_loki.go
+++ b/pkg/pipeline/write/write_loki.go
@@ -25,9 +25,10 @@ import (
"time"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
- pUtils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
-
"github.com/netobserv/flowlogs-pipeline/pkg/config"
+ operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics"
+ pUtils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
+ "github.com/prometheus/client_golang/prometheus"
logAdapter "github.com/go-kit/kit/log/logrus"
jsonIter "github.com/json-iterator/go"
@@ -58,6 +59,11 @@ type Loki struct {
exitChan chan bool
}
+var recordsWritten = operationalMetrics.NewCounter(prometheus.CounterOpts{
+ Name: "loki_records_written",
+ Help: "Number of records written to loki",
+})
+
func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) {
batchWait, err := time.ParseDuration(c.BatchWait)
if err != nil {
@@ -123,7 +129,12 @@ func (l *Loki) ProcessRecord(record config.GenericMap) error {
if err != nil {
return err
}
- return l.client.Handle(labels, timestamp, string(js))
+
+ err = l.client.Handle(labels, timestamp, string(js))
+ if err != nil {
+ recordsWritten.Inc()
+ }
+ return err
}
func (l *Loki) extractTimestamp(record map[string]interface{}) time.Time {