Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ dashboards: $(JB) $(JSONNET) ## Build grafana dashboards
docs: FORCE ## Update flowlogs-pipeline documentation
@./hack/update-docs.sh
@go run cmd/apitodoc/main.go > docs/api.md
@go run cmd/operationalmetricstodoc/main.go > docs/operational-metrics.md

.PHONY: clean
clean: ## Clean
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ FLP pipe-line module is built on top of [gopipes](https://github.com/netobserv/g
In addition, along with Prometheus and its ecosystem tools such as Thanos, Cortex etc.,
FLP provides an efficient scalable multi-cloud solution for comprehensive network analytics that can rely **solely on metrics data-source**.

Default metrics are documented here [docs/metrics.md](docs/metrics.md).
Default network metrics are documented here [docs/metrics.md](docs/metrics.md).
Operational metrics are documented here [docs/operational-metrics.md](docs/operational-metrics.md).

> note: operational metrics are exported only using prometheus
<br>
<br>

Expand Down
2 changes: 1 addition & 1 deletion cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

jsoniter "github.com/json-iterator/go"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/health"
"github.com/netobserv/flowlogs-pipeline/pkg/operational/health"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
log "github.com/sirupsen/logrus"
Expand Down
46 changes: 46 additions & 0 deletions cmd/operationalmetricstodoc/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package main

import (
"fmt"

operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
)

func main() {
// Do not remove this unnamed variable ---> This is needed for goland compiler/linker to init the
// variables and functions for all the modules including the operational metrics variables which
// fills up `metricsOpts` with information
var _ *pipeline.Pipeline

header := `
> Note: this file was automatically generated, to update execute "make docs"

# flowlogs-pipeline Operational Metrics

Each table below provides documentation for an exported flowlogs-pipeline operational metric.



`
doc := operationalMetrics.GetDocumentation()
data := fmt.Sprintf("%s\n%s\n", header, doc)
fmt.Printf("%s", data)
}
39 changes: 39 additions & 0 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

> Note: this file was automatically generated, to update execute "make docs"

# flowlogs-pipeline Operational Metrics

Each table below provides documentation for an exported flowlogs-pipeline operational metric.





### encode_prom_metrics_processed
| **Name** | encode_prom_metrics_processed |
|:---|:---|
| **Description** | Number of metrics processed |
| **Type** | counter |


### ingest_collector_queue_length
| **Name** | ingest_collector_queue_length |
|:---|:---|
| **Description** | Queue length |
| **Type** | gauge |


### ingest_collector_flow_logs_processed
| **Name** | ingest_collector_flow_logs_processed |
|:---|:---|
| **Description** | Number of log lines (flow logs) processed |
| **Type** | counter |


### loki_records_written
| **Name** | loki_records_written |
|:---|:---|
| **Description** | Number of records written to loki |
| **Type** | counter |


File renamed without changes.
File renamed without changes.
73 changes: 73 additions & 0 deletions pkg/operational/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package operationalMetrics

import (
"fmt"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type metricDefinition struct {
Name string
Help string
Type string
}

var metricsOpts []metricDefinition

func NewCounter(opts prometheus.CounterOpts) prometheus.Counter {
metricsOpts = append(metricsOpts, metricDefinition{
Name: opts.Name,
Help: opts.Help,
Type: "counter",
})
return promauto.NewCounter(opts)
}

func NewGauge(opts prometheus.GaugeOpts) prometheus.Gauge {
metricsOpts = append(metricsOpts, metricDefinition{
Name: opts.Name,
Help: opts.Help,
Type: "gauge",
})
return promauto.NewGauge(opts)
}

func GetDocumentation() string {
doc := ""
for _, opts := range metricsOpts {
doc += fmt.Sprintf(
`
### %s
| **Name** | %s |
|:---|:---|
| **Description** | %s |
| **Type** | %s |

`,
opts.Name,
opts.Name,
opts.Help,
opts.Type,
)
}

return doc
}
7 changes: 7 additions & 0 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -84,6 +85,11 @@ type encodeProm struct {
exitChan chan bool
}

var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{
Name: "encode_prom_metrics_processed",
Help: "Number of metrics processed",
})

// Encode encodes a metric before being stored
func (e *encodeProm) Encode(metrics []config.GenericMap) []config.GenericMap {
log.Debugf("entering encodeProm Encode")
Expand All @@ -92,6 +98,7 @@ func (e *encodeProm) Encode(metrics []config.GenericMap) []config.GenericMap {
out := make([]config.GenericMap, 0)
for _, metric := range metrics {
// TODO: We may need different handling for histograms
metricsProcessed.Inc()
metricOut := e.EncodeMetric(metric)
out = append(out, metricOut...)
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/pipeline/ingest/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import (

ms "github.com/mitchellh/mapstructure"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics"
pUtils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
goflowFormat "github.com/netsampler/goflow2/format"
goflowCommonFormat "github.com/netsampler/goflow2/format/common"
_ "github.com/netsampler/goflow2/format/protobuf"
goflowpb "github.com/netsampler/goflow2/pb"
"github.com/netsampler/goflow2/utils"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
Expand All @@ -57,6 +59,16 @@ type TransportWrapper struct {
c chan map[string]interface{}
}

var queueLength = operationalMetrics.NewGauge(prometheus.GaugeOpts{
Name: "ingest_collector_queue_length",
Help: "Queue length",
})

var linesProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{
Name: "ingest_collector_flow_logs_processed",
Help: "Number of log lines (flow logs) processed",
})

func NewWrapper(c chan map[string]interface{}) *TransportWrapper {
tw := TransportWrapper{c: c}
return &tw
Expand Down Expand Up @@ -156,13 +168,17 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []interface{}) {
records = append(records, string(recordAsBytes))
if len(records) >= ingestC.batchMaxLength {
log.Debugf("ingestCollector sending %d entries", len(records))
linesProcessed.Add(float64(len(records)))
queueLength.Set(float64(len(out)))
out <- records
records = []interface{}{}
}
case <-flushRecords.C:
// Process batch of records (if not empty)
if len(records) > 0 {
log.Debugf("ingestCollector sending %d entries", len(records))
linesProcessed.Add(float64(len(records)))
queueLength.Set(float64(len(out)))
out <- records
records = []interface{}{}
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/pipeline/write/write_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
pUtils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics"
pUtils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/prometheus/client_golang/prometheus"

logAdapter "github.com/go-kit/kit/log/logrus"
jsonIter "github.com/json-iterator/go"
Expand Down Expand Up @@ -58,6 +59,11 @@ type Loki struct {
exitChan chan bool
}

var recordsWritten = operationalMetrics.NewCounter(prometheus.CounterOpts{
Name: "loki_records_written",
Help: "Number of records written to loki",
})

func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) {
batchWait, err := time.ParseDuration(c.BatchWait)
if err != nil {
Expand Down Expand Up @@ -123,7 +129,12 @@ func (l *Loki) ProcessRecord(record config.GenericMap) error {
if err != nil {
return err
}
return l.client.Handle(labels, timestamp, string(js))

err = l.client.Handle(labels, timestamp, string(js))
if err != nil {
recordsWritten.Inc()
}
return err
}

func (l *Loki) extractTimestamp(record map[string]interface{}) time.Time {
Expand Down