Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ Flags:
-h, --help help for flowlogs-pipeline
--log-level string Log level: debug, info, warning, error (default "error")
--parameters string json of config file parameters field
--pipeline string json of config file pipeline field
--profile.port int Go pprof tool port (default: disabled)
--pipeline string json of config file pipeline field
```
<!---END-AUTO-flowlogs-pipeline_help--->

Expand Down
1 change: 1 addition & 0 deletions cmd/confgenerator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func initFlags() {
rootCmd.PersistentFlags().StringVar(&opts.DestGrafanaJsonnetFolder, "destGrafanaJsonnetFolder", "/tmp/jsonnet", "destination grafana jsonnet folder")
rootCmd.PersistentFlags().StringSliceVar(&opts.SkipWithTags, "skipWithTags", nil, "Skip definitions with Tags")
rootCmd.PersistentFlags().StringSliceVar(&opts.GenerateStages, "generateStages", nil, "Produce only specified stages (ingest, transform_generic, transform_network, extract_aggregate, encode_prom, write_loki")
rootCmd.PersistentFlags().StringVar(&opts.GlobalMetricsPrefix, "globalMetricsPrefix", "", "Common prefix for all generated metrics, including operational ones")
}

func main() {
Expand Down
8 changes: 4 additions & 4 deletions cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

jsoniter "github.com/json-iterator/go"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational/health"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
log "github.com/sirupsen/logrus"
Expand All @@ -44,7 +44,7 @@ var (
BuildDate string
cfgFile string
logLevel string
envPrefix = "FLOWLOGS-PIPILNE"
envPrefix = "FLOWLOGS-PIPELINE"
defaultLogFileName = ".flowlogs-pipeline"
opts config.Options
)
Expand Down Expand Up @@ -141,6 +141,7 @@ func initFlags() {
rootCmd.PersistentFlags().IntVar(&opts.Profile.Port, "profile.port", 0, "Go pprof tool port (default: disabled)")
rootCmd.PersistentFlags().StringVar(&opts.PipeLine, "pipeline", "", "json of config file pipeline field")
rootCmd.PersistentFlags().StringVar(&opts.Parameters, "parameters", "", "json of config file parameters field")
rootCmd.PersistentFlags().StringVar(&opts.MetricsSettings, "metrics-settings", "", "json for global metrics settings")
}

func main() {
Expand Down Expand Up @@ -191,7 +192,7 @@ func run() {
}

// Start health report server
health.NewHealthServer(&opts, mainPipeline)
operational.NewHealthServer(&opts, mainPipeline.IsAlive, mainPipeline.IsReady)

// Starts the flows pipeline
mainPipeline.Run()
Expand All @@ -200,5 +201,4 @@ func run() {
time.Sleep(time.Second)
log.Debugf("exiting main run")
os.Exit(0)

}
10 changes: 4 additions & 6 deletions cmd/operationalmetricstodoc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package main
import (
"fmt"

operationalMetrics "github.com/netobserv/flowlogs-pipeline/pkg/operational/metrics"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
)

Expand All @@ -32,15 +32,13 @@ func main() {

header := `
> Note: this file was automatically generated, to update execute "make docs"
# flowlogs-pipeline Operational Metrics
Each table below provides documentation for an exported flowlogs-pipeline operational metric.



`
doc := operationalMetrics.GetDocumentation()
doc := operational.GetDocumentation()
data := fmt.Sprintf("%s\n%s\n", header, doc)
fmt.Printf("%s", data)
}
2 changes: 1 addition & 1 deletion docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Following is the supported API format for the kafka ingest:
json: JSON decoder
protobuf: Protobuf decoder
batchMaxLen: the number of accumulated flows before being forwarded for processing
pullBatchLen: the capacity of the queue use to store pulled flows
pullQueueCapacity: the capacity of the queue use to store pulled flows
pullMaxBytes: the maximum number of bytes being pulled from kafka
commitInterval: the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously.
tls: TLS client configuration (optional)
Expand Down
80 changes: 51 additions & 29 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
@@ -1,67 +1,89 @@

> Note: this file was automatically generated, to update execute "make docs"
# flowlogs-pipeline Operational Metrics
Each table below provides documentation for an exported flowlogs-pipeline operational metric.





### encode_prom_metrics_processed
| **Name** | encode_prom_metrics_processed |
### conntrack_input_records
| **Name** | conntrack_input_records |
|:---|:---|
| **Description** | Number of metrics processed |
| **Description** | The total number of input records per classification. |
| **Type** | counter |
| **Labels** | classification |


### conntrack_memory_connections
| **Name** | conntrack_memory_connections |
|:---|:---|
| **Description** | The total number of tracked connections in memory. |
| **Type** | gauge |
| **Labels** | |


### conntrack_output_records
| **Name** | conntrack_output_records |
|:---|:---|
| **Description** | The total number of output records. |
| **Type** | counter |
| **Labels** | type |


### encode_prom_errors
| **Name** | encode_prom_errors |
|:---|:---|
| **Description** | Total errors during metrics generation |
| **Type** | counter |
| **Labels** | error, metric, key |


### conntrack_memory_connections
| **Name** | conntrack_memory_connections |
### ingest_flows_processed
| **Name** | ingest_flows_processed |
|:---|:---|
| **Description** | The total number of tracked connections in memory. |
| **Type** | gauge |
| **Description** | Number of flow logs processed by the ingester |
| **Type** | counter |
| **Labels** | stage |


### conntrack_input_records
| **Name** | conntrack_input_records |
### metrics_processed
| **Name** | metrics_processed |
|:---|:---|
| **Description** | The total number of input records per classification. |
| **Description** | Number of metrics processed |
| **Type** | counter |
| **Labels** | stage |


### conntrack_output_records
| **Name** | conntrack_output_records |
### records_written
| **Name** | records_written |
|:---|:---|
| **Description** | The total number of output records. |
| **Description** | Number of output records written |
| **Type** | counter |
| **Labels** | stage |


### ingest_collector_queue_length
| **Name** | ingest_collector_queue_length |
### stage_duration_ms
| **Name** | stage_duration_ms |
|:---|:---|
| **Description** | Queue length |
| **Type** | gauge |
| **Description** | Pipeline stage duration in milliseconds |
| **Type** | histogram |
| **Labels** | stage |


### ingest_collector_flow_logs_processed
| **Name** | ingest_collector_flow_logs_processed |
### stage_in_queue_size
| **Name** | stage_in_queue_size |
|:---|:---|
| **Description** | Number of log lines (flow logs) processed |
| **Type** | counter |
| **Description** | Pipeline stage input queue size (number of elements in queue) |
| **Type** | gauge |
| **Labels** | stage |


### loki_records_written
| **Name** | loki_records_written |
### stage_out_queue_size
| **Name** | stage_out_queue_size |
|:---|:---|
| **Description** | Number of records written to loki |
| **Type** | counter |
| **Description** | Pipeline stage output queue size (number of elements in queue) |
| **Type** | gauge |
| **Labels** | stage |


2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f
github.com/mitchellh/mapstructure v1.4.3
github.com/netobserv/gopipes v0.1.1
github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9
github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500
github.com/netobserv/netobserv-ebpf-agent v0.1.1-0.20220608092850-3fd4695b7cc2
github.com/netsampler/goflow2 v1.1.1-0.20220509155230-5300494e4785
github.com/pkg/errors v0.9.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,8 @@ github.com/netobserv/gopipes v0.1.1 h1:f8zJsvnMgRFRa2B+1siwRtW0Y4dqeBROmkcI/HgT1
github.com/netobserv/gopipes v0.1.1/go.mod h1:eGoHZW1ON8Dx/zmDXUhsbVNqatPjtpdO0UZBmGZGmVI=
github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9 h1:c2swm3EamzgjBq9idNbEs5bNz20FJo/HK6uxyigXekQ=
github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9/go.mod h1:LHXpc5tjKvsfZn0pwLKrvlgEhZcCaw3Di9mUEZGAI4E=
github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500 h1:RmnoJe/ci5q+QdM7upFdxiU+D8F3L3qTd5wXCwwHefw=
github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500/go.mod h1:LHXpc5tjKvsfZn0pwLKrvlgEhZcCaw3Di9mUEZGAI4E=
github.com/netobserv/netobserv-ebpf-agent v0.1.1-0.20220608092850-3fd4695b7cc2 h1:K7SjoqEfzpMfIjHV85Lg8UDMvZu8rPfrsgKRoo7W30o=
github.com/netobserv/netobserv-ebpf-agent v0.1.1-0.20220608092850-3fd4695b7cc2/go.mod h1:996FEHp8Xj+AKCkiN4eH3dl/yF2DzuYM0kchWZOrapM=
github.com/netobserv/prometheus-common v0.31.2-0.20220720134304-43e74fd22881 h1:hx5bi6xBovRjmwUoVJBzhJ3EDo4K4ZUsqqKrJuQ2vMI=
Expand Down
1 change: 1 addition & 0 deletions pkg/confgen/confgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func Test_RunShortConfGen(t *testing.T) {
DestConfFile: configOut,
DestDocFile: docOut,
DestGrafanaJsonnetFolder: jsonnetOut,
GlobalMetricsPrefix: "flp_",
})
err = os.WriteFile(filepath.Join(dirPath, configFileName), []byte(test.ConfgenShortConfig), 0644)
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/confgen/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Options struct {
SrcFolder string
SkipWithTags []string
GenerateStages []string
GlobalMetricsPrefix string
}

type ConfigVisualization struct {
Expand Down
42 changes: 33 additions & 9 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ import (
)

type Options struct {
PipeLine string
Parameters string
Health Health
Profile Profile
PipeLine string
Parameters string
MetricsSettings string
Health Health
Profile Profile
}

type ConfigFileStruct struct {
LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"`
Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"`
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"`
Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"`
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"`
}

type Health struct {
Expand All @@ -46,6 +48,18 @@ type Profile struct {
Port int
}

// MetricsSettings is similar to api.PromEncode, but is global to the application, ie. it also works with operational metrics.
// Also, currently FLP doesn't support defining more than one PromEncode stage. If this feature is added later, these global settings
// will help configuring common setting for all PromEncode stages - PromEncode settings would then act as overrides.
type MetricsSettings struct {
// TODO: manage global metrics server, ie. not coupled to PromEncode, cf https://github.com/netobserv/flowlogs-pipeline/issues/302
// Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"port number to expose \"/metrics\" endpoint"`
// TLS *PromTLSConf `yaml:"tls,omitempty" json:"tls,omitempty" doc:"TLS configuration for the prometheus endpoint"`
// ExpiryTime int `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"seconds of no-flow to wait before deleting prometheus data item"`
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty"`
NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"`
}

type Stage struct {
Name string `yaml:"name" json:"name"`
Follows string `yaml:"follows,omitempty" json:"follows,omitempty"`
Expand Down Expand Up @@ -108,17 +122,27 @@ func ParseConfig(opts Options) (ConfigFileStruct, error) {
logrus.Debugf("opts.PipeLine = %v ", opts.PipeLine)
err := JsonUnmarshalStrict([]byte(opts.PipeLine), &out.Pipeline)
if err != nil {
logrus.Errorf("error when reading config file: %v", err)
logrus.Errorf("error when parsing pipeline: %v", err)
return out, err
}
logrus.Debugf("stages = %v ", out.Pipeline)

err = JsonUnmarshalStrict([]byte(opts.Parameters), &out.Parameters)
if err != nil {
logrus.Errorf("error when reading config file: %v", err)
logrus.Errorf("error when parsing pipeline parameters: %v", err)
return out, err
}
logrus.Debugf("params = %v ", out.Parameters)

if opts.MetricsSettings != "" {
err = JsonUnmarshalStrict([]byte(opts.MetricsSettings), &out.MetricsSettings)
if err != nil {
logrus.Errorf("error when parsing global metrics settings: %v", err)
return out, err
}
logrus.Debugf("metrics settings = %v ", out.MetricsSettings)
}

return out, nil
}

Expand Down
15 changes: 7 additions & 8 deletions pkg/operational/health/health.go → pkg/operational/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
*/

package health
package operational

import (
"net"
Expand All @@ -24,36 +24,35 @@ import (

"github.com/heptiolabs/healthcheck"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
log "github.com/sirupsen/logrus"
)

const defaultServerHost = "0.0.0.0"

type Server struct {
handler healthcheck.Handler
address string
Address string
}

func (hs *Server) Serve() {
for {
err := http.ListenAndServe(hs.address, hs.handler)
err := http.ListenAndServe(hs.Address, hs.handler)
log.Errorf("http.ListenAndServe error %v", err)
time.Sleep(60 * time.Second)
}
}

func NewHealthServer(opts *config.Options, pipeline *pipeline.Pipeline) *Server {
func NewHealthServer(opts *config.Options, isAlive healthcheck.Check, isReady healthcheck.Check) *Server {

handler := healthcheck.NewHandler()
address := net.JoinHostPort(defaultServerHost, opts.Health.Port)

handler.AddLivenessCheck("PipelineCheck", pipeline.IsAlive())
handler.AddReadinessCheck("PipelineCheck", pipeline.IsReady())
handler.AddLivenessCheck("PipelineCheck", isAlive)
handler.AddReadinessCheck("PipelineCheck", isReady)

server := &Server{
handler: handler,
address: address,
Address: address,
}

go server.Serve()
Expand Down
Loading