diff --git a/Makefile b/Makefile index 314f7fdba..b877e4988 100644 --- a/Makefile +++ b/Makefile @@ -73,6 +73,7 @@ dashboards: $(JB) $(JSONNET) ## Build grafana dashboards docs: FORCE ## Update flowlogs-pipeline documentation @./hack/update-docs.sh @go run cmd/apitodoc/main.go > docs/api.md + .PHONY: clean clean: ## Clean rm -f "${FLP_BIN_FILE}" diff --git a/README.md b/README.md index 2dc94a456..3f3306cc4 100644 --- a/README.md +++ b/README.md @@ -32,24 +32,12 @@ Usage: flowlogs-pipeline [flags] Flags: - --config string config file (default is $HOME/.flowlogs-pipeline) - --health.port string Health server port (default "8080") - -h, --help help for flowlogs-pipeline - --log-level string Log level: debug, info, warning, error (default "error") - --pipeline.decode.aws string aws fields - --pipeline.decode.type string Decode type: aws, json, none - --pipeline.encode.kafka string Kafka encode API - --pipeline.encode.prom string Prometheus encode API - --pipeline.encode.type string Encode type: prom, json, kafka, none - --pipeline.extract.aggregates string Aggregates (see docs) - --pipeline.extract.type string Extract type: aggregates, none - --pipeline.ingest.collector string Ingest collector API - --pipeline.ingest.file.filename string Ingest filename (file) - --pipeline.ingest.kafka string Ingest Kafka API - --pipeline.ingest.type string Ingest type: file, collector,file_loop (required) - --pipeline.transform string Transforms (list) API (default "[{"type": "none"}]") - --pipeline.write.loki string Loki write API - --pipeline.write.type string Write type: stdout, none + --config string config file (default is $HOME/.flowlogs-pipeline) + --health.port string Health server port (default "8080") + -h, --help help for flowlogs-pipeline + --log-level string Log level: debug, info, warning, error (default "error") + --parameters string json of config file parameters field + --pipeline string json of config file pipeline field ``` diff --git a/cmd/flowlogs-pipeline/main.go b/cmd/flowlogs-pipeline/main.go index 89b6d99f3..be22c2733 100644 --- a/cmd/flowlogs-pipeline/main.go +++ b/cmd/flowlogs-pipeline/main.go @@ -125,31 +125,17 @@ func bindFlags(cmd *cobra.Command, v *viper.Viper) { func initFlags() { cobra.OnInitialize(initConfig) - rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", fmt.Sprintf("config file (default is $HOME/%s)", defaultLogFileName)) rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Type, "pipeline.ingest.type", "", "Ingest type: file, collector,file_loop (required)") rootCmd.PersistentFlags().StringVar(&config.Opt.Health.Port, "health.port", "8080", "Health server port") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.File.Filename, "pipeline.ingest.file.filename", "", "Ingest filename (file)") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Collector, "pipeline.ingest.collector", "", "Ingest collector API") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Kafka, "pipeline.ingest.kafka", "", "Ingest Kafka API") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Decode.Aws, "pipeline.decode.aws", "", "aws fields") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Decode.Type, "pipeline.decode.type", "", "Decode type: aws, json, none") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Transform, "pipeline.transform", "[{\"type\": \"none\"}]", "Transforms (list) API") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Extract.Type, "pipeline.extract.type", "", "Extract type: aggregates, none") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Extract.Aggregates, "pipeline.extract.aggregates", "", "Aggregates (see docs)") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Write.Type, "pipeline.write.type", "", "Write type: stdout, none") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Write.Loki, "pipeline.write.loki", "", "Loki write API") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Encode.Type, "pipeline.encode.type", "", "Encode type: prom, json, kafka, none") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Encode.Prom, "pipeline.encode.prom", "", "Prometheus encode API") - rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Encode.Kafka, "pipeline.encode.kafka", "", "Kafka encode API") - - _ = rootCmd.MarkPersistentFlagRequired("pipeline.ingest.type") + rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine, "pipeline", "", "json of config file pipeline field") + rootCmd.PersistentFlags().StringVar(&config.Opt.Parameters, "parameters", "", "json of config file parameters field") } func main() { // Initialize flags (command line parameters) initFlags() + if err := rootCmd.Execute(); err != nil { fmt.Println(err) os.Exit(1) @@ -168,6 +154,12 @@ func run() { // Dump configuration dumpConfig() + err = config.ParseConfig() + if err != nil { + log.Errorf("error in parsing config file: %v", err) + os.Exit(1) + } + // Setup (threads) exit manager utils.SetupElegantExit() diff --git a/contrib/dashboards/jsonnet/dashboard_details.jsonnet b/contrib/dashboards/jsonnet/dashboard_details.jsonnet index 8dbb39dfc..5d4bd9636 100644 --- a/contrib/dashboards/jsonnet/dashboard_details.jsonnet +++ b/contrib/dashboards/jsonnet/dashboard_details.jsonnet @@ -182,7 +182,7 @@ dashboard.new( ) .addTarget( prometheus.target( - expr='topk(10,rate(flp_egress_per_namespace{aggregate=~".*pod.*"}[1m]))', + expr='topk(10,rate(flp_egress_per_namespace{aggregate=~".*Pod.*"}[1m]))', ) ), gridPos={ x: 0, diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index 45512efb1..393d1f0c2 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -1,9 +1,166 @@ # This file was generated automatically by flowlogs-pipeline confgenerator log-level: error -pipeline: - decode: +parameters: +- ingest: + collector: + hostname: 0.0.0.0 + port: 2055 + type: collector + name: ingest1 +- decode: type: json - encode: + name: decode1 +- name: transform1 + transform: + generic: + rules: + - input: SrcAddr + output: srcIP + - input: SrcPort + output: srcPort + - input: DstAddr + output: dstIP + - input: DstPort + output: dstPort + - input: Proto + output: proto + - input: Bytes + output: bytes + - input: TCPFlags + output: TCPFlags + - input: SrcAS + output: srcAS + - input: DstAS + output: dstAS + type: generic +- name: transform2 + transform: + network: + rules: + - input: dstPort + output: service + type: add_service + parameters: proto + - input: dstIP + output: dstSubnet24 + type: add_subnet + parameters: /24 + - input: srcIP + output: srcSubnet24 + type: add_subnet + parameters: /24 + - input: srcIP + output: srcSubnet + type: add_subnet + parameters: /16 + - input: '{{.srcIP}},{{.srcPort}},{{.dstIP}},{{.dstPort}},{{.proto}}' + output: isNewFlow + type: conn_tracking + parameters: "1" + - input: dstIP + output: dstSubnet + type: add_subnet + parameters: /16 + - input: srcIP + output: srcK8S + type: add_kubernetes + parameters: srcK8S_labels + - input: dstIP + output: dstLocation + type: add_location + parameters: "" + - input: bytes + output: mice + type: add_if + parameters: <512 + - input: bytes + output: elephant + type: add_if + parameters: '>=512' + type: network +- extract: + aggregates: + - Name: bandwidth_network_service + By: + - service + Operation: sum + RecordKey: bytes + - Name: bandwidth_source_destination_subnet + By: + - dstSubnet24 + - srcSubnet24 + Operation: sum + RecordKey: bytes + - Name: bandwidth_source_subnet + By: + - srcSubnet + Operation: sum + RecordKey: bytes + - Name: dest_connection_subnet_count + By: + - dstSubnet + Operation: sum + RecordKey: isNewFlow + - Name: src_connection_count + By: + - srcSubnet + Operation: count + RecordKey: "" + - Name: TCPFlags_count + By: + - TCPFlags + Operation: count + RecordKey: "" + - Name: dst_as_connection_count + By: + - dstAS + Operation: count + RecordKey: "" + - Name: src_as_connection_count + By: + - srcAS + Operation: count + RecordKey: "" + - Name: count_source_destination_subnet + By: + - dstSubnet24 + - srcSubnet24 + Operation: count + RecordKey: "" + - Name: bandwidth_destination_subnet + By: + - dstSubnet + Operation: sum + RecordKey: bytes + - Name: bandwidth_namespace + By: + - srcK8S_Namespace + - srcK8S_Type + Operation: sum + RecordKey: bytes + - Name: dest_connection_location_count + By: + - dstLocation_CountryName + Operation: count + RecordKey: "" + - Name: mice_count + By: + - mice_Evaluate + Operation: count + RecordKey: "" + - Name: elephant_count + By: + - elephant_Evaluate + Operation: count + RecordKey: "" + - Name: dest_service_count + By: + - service + Operation: count + RecordKey: "" + type: aggregates + name: extract1 +- encode: prom: metrics: - name: bandwidth_per_network_service @@ -114,161 +271,26 @@ pipeline: port: 9102 prefix: flp_ type: prom - extract: - aggregates: - - name: bandwidth_network_service - by: - - service - operation: sum - recordkey: bytes - - name: bandwidth_source_destination_subnet - by: - - dstSubnet24 - - srcSubnet24 - operation: sum - recordkey: bytes - - name: bandwidth_source_subnet - by: - - srcSubnet - operation: sum - recordkey: bytes - - name: dest_connection_subnet_count - by: - - dstSubnet - operation: sum - recordkey: isNewFlow - - name: src_connection_count - by: - - srcSubnet - operation: count - recordkey: "" - - name: TCPFlags_count - by: - - TCPFlags - operation: count - recordkey: "" - - name: dst_as_connection_count - by: - - dstAS - operation: count - recordkey: "" - - name: src_as_connection_count - by: - - srcAS - operation: count - recordkey: "" - - name: count_source_destination_subnet - by: - - dstSubnet24 - - srcSubnet24 - operation: count - recordkey: "" - - name: bandwidth_destination_subnet - by: - - dstSubnet - operation: sum - recordkey: bytes - - name: bandwidth_namespace - by: - - srcK8S_Namespace - - srcK8S_Type - operation: sum - recordkey: bytes - - name: dest_connection_location_count - by: - - dstLocation_CountryName - operation: count - recordkey: "" - - name: mice_count - by: - - mice_Evaluate - operation: count - recordkey: "" - - name: elephant_count - by: - - elephant_Evaluate - operation: count - recordkey: "" - - name: dest_service_count - by: - - service - operation: count - recordkey: "" - type: aggregates - ingest: - collector: - hostname: 0.0.0.0 - port: 2055 - type: collector - transform: - - generic: - rules: - - input: SrcAddr - output: srcIP - - input: SrcPort - output: srcPort - - input: DstAddr - output: dstIP - - input: DstPort - output: dstPort - - input: Proto - output: proto - - input: Bytes - output: bytes - - input: TCPFlags - output: TCPFlags - - input: SrcAS - output: srcAS - - input: DstAS - output: dstAS - type: generic - - network: - rules: - - input: dstPort - output: service - type: add_service - parameters: proto - - input: dstIP - output: dstSubnet24 - type: add_subnet - parameters: /24 - - input: srcIP - output: srcSubnet24 - type: add_subnet - parameters: /24 - - input: srcIP - output: srcSubnet - type: add_subnet - parameters: /16 - - input: '{{.srcIP}},{{.srcPort}},{{.dstIP}},{{.dstPort}},{{.proto}}' - output: isNewFlow - type: conn_tracking - parameters: "1" - - input: dstIP - output: dstSubnet - type: add_subnet - parameters: /16 - - input: srcIP - output: srcK8S - type: add_kubernetes - parameters: srcK8S_labels - - input: dstIP - output: dstLocation - type: add_location - parameters: "" - - input: bytes - output: mice - type: add_if - parameters: <512 - - input: bytes - output: elephant - type: add_if - parameters: '>=512' - type: network + name: encode1 +- name: write1 write: loki: url: http://loki.default.svc.cluster.local:3100 staticLabels: job: flowlogs-pipeline type: loki +pipeline: +- name: ingest1 +- follows: ingest1 + name: decode1 +- follows: decode1 + name: transform1 +- follows: transform1 + name: transform2 +- follows: transform2 + name: extract1 +- follows: extract1 + name: encode1 +- follows: transform2 + name: write1 diff --git a/docs/api.md b/docs/api.md index 709285e82..7c70ee973 100644 --- a/docs/api.md +++ b/docs/api.md @@ -109,4 +109,14 @@ Following is the supported API format for writing to loki: clientConfig: clientConfig timestampLabel: label to use for time indexing timestampScale: timestamp units scale (e.g. for UNIX = 1s) + +## Aggregate metrics API +Following is the supported API format for specifying metrics aggregations: + +
+ aggregates:
+         Name: description of aggregation result
+         By: list of fields on which to aggregate
+         Operation: sum, min, max, or avg
+         RecordKey: internal field on which to perform the operation
 
\ No newline at end of file diff --git a/go.mod b/go.mod index 22c39b264..e0ae962d5 100644 --- a/go.mod +++ b/go.mod @@ -10,8 +10,8 @@ require ( github.com/json-iterator/go v1.1.12 github.com/mitchellh/mapstructure v1.4.3 github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9 - github.com/netsampler/goflow2 v1.0.5-0.20220106210010-20e8e567090c - github.com/prometheus/client_golang v1.12.0 + github.com/netsampler/goflow2 v1.0.4 + github.com/prometheus/client_golang v1.12.1 github.com/prometheus/common v0.32.1 github.com/segmentio/kafka-go v0.4.28 github.com/sirupsen/logrus v1.8.1 @@ -19,13 +19,13 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.10.1 github.com/stretchr/testify v1.7.0 - golang.org/x/net v0.0.0-20211209124913-491a49abca63 + golang.org/x/net v0.0.0-20220225172249-27dd8689420f google.golang.org/protobuf v1.27.1 gopkg.in/yaml.v2 v2.4.0 honnef.co/go/netdb v0.0.0-20210921115105-e902e863d85d - k8s.io/api v0.23.2 - k8s.io/apimachinery v0.23.2 - k8s.io/client-go v0.23.2 + k8s.io/api v0.23.4 + k8s.io/apimachinery v0.23.4 + k8s.io/client-go v0.23.4 ) require ( @@ -38,7 +38,7 @@ require ( github.com/go-logr/logr v1.2.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/golang/snappy v0.0.4 // indirect + github.com/golang/snappy v0.0.3 // indirect github.com/google/go-cmp v0.5.6 // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/googleapis/gnostic v0.5.5 // indirect @@ -47,16 +47,14 @@ require ( github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/klauspost/compress v1.13.6 // indirect - github.com/libp2p/go-reuseport v0.1.0 // indirect + github.com/libp2p/go-reuseport v0.0.2 // indirect github.com/magiconair/properties v1.8.5 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect - github.com/onsi/ginkgo v1.16.2 // indirect - github.com/onsi/gomega v1.13.0 // indirect github.com/pelletier/go-toml v1.9.4 // indirect - github.com/pierrec/lz4 v2.6.1+incompatible // indirect + github.com/pierrec/lz4 v2.6.0+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect @@ -68,23 +66,20 @@ require ( github.com/stretchr/objx v0.2.0 // indirect github.com/subosito/gotenv v1.2.0 // indirect go.uber.org/atomic v1.9.0 // indirect - go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 // indirect golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect - golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b // indirect + golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect google.golang.org/grpc v1.43.0 // indirect - gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.66.2 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect k8s.io/klog/v2 v2.30.0 // indirect k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect - k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b // indirect + k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect sigs.k8s.io/yaml v1.2.0 // indirect diff --git a/go.sum b/go.sum index 95be3817b..152f46f8f 100644 --- a/go.sum +++ b/go.sum @@ -402,6 +402,7 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -609,6 +610,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/libp2p/go-reuseport v0.0.2 h1:XSG94b1FJfGA01BUrT82imejHQyTxO4jEWqheyCXYvU= +github.com/libp2p/go-reuseport v0.0.2/go.mod h1:SPD+5RwGC7rcnzngoYC86GjPzjSywuQyMVAheVBD9nQ= github.com/libp2p/go-reuseport v0.1.0 h1:0ooKOx2iwyIkf339WCZ2HN3ujTDbkK0PjC7JVoP1AiM= github.com/libp2p/go-reuseport v0.1.0/go.mod h1:bQVn9hmfcTaoo0c9v5pBhOarsU1eNOBZdaAd2hzXRKU= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= @@ -697,6 +700,8 @@ github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9 h1:c2swm3EamzgjBq9idNbEs5bNz20FJo/HK6uxyigXekQ= github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9/go.mod h1:LHXpc5tjKvsfZn0pwLKrvlgEhZcCaw3Di9mUEZGAI4E= +github.com/netsampler/goflow2 v1.0.4 h1:xUW5ry7h9oYAsh35mgiIvL59HKKluyzchenvCEGnZ7o= +github.com/netsampler/goflow2 v1.0.4/go.mod h1:8ORfpaFZtAtvf6aRIACc7/M2hI9IT88wmw/JxT6s6nk= github.com/netsampler/goflow2 v1.0.5-0.20220106210010-20e8e567090c h1:1Hk5lzky++7t4O/cLwMoPXDXGgpja35Nf/U6rXXjEns= github.com/netsampler/goflow2 v1.0.5-0.20220106210010-20e8e567090c/go.mod h1:yqw2cLe+lbnDN1+JKBqxoj2FKOA83iB8wV0aCKnlesg= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= @@ -757,6 +762,7 @@ github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHu github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -779,9 +785,12 @@ github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeD github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_golang v1.6.0/go.mod h1:ZLOG9ck3JLRdB5MgO8f+lLTe83AXG6ro35rLTxvnIl4= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= +github.com/prometheus/client_golang v1.10.0/go.mod h1:WJM3cc3yu7XKBKa/I8WeZm+V3eltZnBwfENSU7mdogU= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.12.0 h1:C+UIj/QWtmqY13Arb8kwMt5j34/0Z2iKamrJ+ryC0Gg= github.com/prometheus/client_golang v1.12.0/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= +github.com/prometheus/client_golang v1.12.1 h1:ZiaPsmm9uiBeaSMRznKsCDNtPCS0T3JVDGF+06gjBzk= +github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -796,6 +805,7 @@ github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt2 github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.14.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= +github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= github.com/prometheus/common v0.30.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/common v0.31.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= @@ -1086,6 +1096,8 @@ golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211209124913-491a49abca63 h1:iocB37TsdFuN6IBRZ+ry36wrkoV51/tl5vOWqkcPGvY= golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= +golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1191,6 +1203,7 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1219,6 +1232,8 @@ golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b h1:9zKuko04nR4gjZ4+DNjHqRlAJqbJETHwiNKDqTfOjfE= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1540,12 +1555,18 @@ honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 k8s.io/api v0.19.2/go.mod h1:IQpK0zFQ1xc5iNIQPqzgoOwuFugaYHK4iCknlAQP9nI= k8s.io/api v0.23.2 h1:62cpzreV3dCuj0hqPi8r4dyWh48ogMcyh+ga9jEGij4= k8s.io/api v0.23.2/go.mod h1:sYuDb3flCtRPI8ghn6qFrcK5ZBu2mhbElxRE95qpwlI= +k8s.io/api v0.23.4 h1:85gnfXQOWbJa1SiWGpE9EEtHs0UVvDyIsSMpEtl2D4E= +k8s.io/api v0.23.4/go.mod h1:i77F4JfyNNrhOjZF7OwwNJS5Y1S9dpwvb9iYRYRczfI= k8s.io/apimachinery v0.19.2/go.mod h1:DnPGDnARWFvYa3pMHgSxtbZb7gpzzAZ1pTfaUNDVlmA= k8s.io/apimachinery v0.23.2 h1:dBmjCOeYBdg2ibcQxMuUq+OopZ9fjfLIR5taP/XKeTs= k8s.io/apimachinery v0.23.2/go.mod h1:zDqeV0AK62LbCI0CI7KbWCAYdLg+E+8UXJ0rIz5gmS8= +k8s.io/apimachinery v0.23.4 h1:fhnuMd/xUL3Cjfl64j5ULKZ1/J9n8NuQEgNL+WXWfdM= +k8s.io/apimachinery v0.23.4/go.mod h1:BEuFMMBaIbcOqVIJqNZJXGFTP4W6AycEpb5+m/97hrM= k8s.io/client-go v0.19.2/go.mod h1:S5wPhCqyDNAlzM9CnEdgTGV4OqhsW3jGO1UM1epwfJA= k8s.io/client-go v0.23.2 h1:BNbOcxa99jxHH8mM1cPKGIrrKRnCSAfAtyonYGsbFtE= k8s.io/client-go v0.23.2/go.mod h1:k3YbsWg6GWdHF1THHTQP88X9RhB1DWPo3Dq7KfU/D1c= +k8s.io/client-go v0.23.4 h1:YVWvPeerA2gpUudLelvsolzH7c2sFoXXR5wM/sWqNFU= +k8s.io/client-go v0.23.4/go.mod h1:PKnIL4pqLuvYUK1WU7RLTMYKPiIh7MYShLshtRY9cj0= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= @@ -1562,6 +1583,8 @@ k8s.io/utils v0.0.0-20200729134348-d5654de09c73/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b h1:wxEMGetGMur3J1xuGLQY7GEQYg9bZxKn3tKo5k/eYcs= k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= +k8s.io/utils v0.0.0-20211116205334-6203023598ed h1:ck1fRPWPJWsMd8ZRFsWc6mh/zHp5fZ/shhbrgPUxDAE= +k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/pkg/api/api.go b/pkg/api/api.go index fa5f47fce..36290d2b2 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -24,12 +24,13 @@ const TagEnum = "enum" // Note: items beginning with doc: "## title" are top level items that get divided into sections inside api.md. type API struct { - PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"` - KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"` - IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the netflow collector:\n"` - IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"` - DecodeAws DecodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"` - TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"` - TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"` - WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"` + PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"` + KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"` + IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the netflow collector:\n"` + IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"` + DecodeAws DecodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"` + TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"` + TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"` + WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"` + ExtractAggregate AggregateDefinition `yaml:"aggregates" doc:"## Aggregate metrics API\nFollowing is the supported API format for specifying metrics aggregations:\n"` } diff --git a/pkg/api/extract_aggregate.go b/pkg/api/extract_aggregate.go new file mode 100644 index 000000000..fac8d2b3c --- /dev/null +++ b/pkg/api/extract_aggregate.go @@ -0,0 +1,11 @@ +package api + +type AggregateBy []string +type AggregateOperation string + +type AggregateDefinition struct { + Name string `yaml:"Name" doc:"description of aggregation result"` + By AggregateBy `yaml:"By" doc:"list of fields on which to aggregate"` + Operation AggregateOperation `yaml:"Operation" doc:"sum, min, max, or avg"` + RecordKey string `yaml:"RecordKey" doc:"internal field on which to perform the operation"` +} diff --git a/pkg/confgen/confgen.go b/pkg/confgen/confgen.go index 2fc678988..f32771f14 100644 --- a/pkg/confgen/confgen.go +++ b/pkg/confgen/confgen.go @@ -87,9 +87,9 @@ func (cg *ConfGen) Run() error { cg.dedupe() - err = cg.generateFlowlogs2MetricsConfig(Opt.DestConfFile) + err = cg.generateFlowlogs2PipelineConfig(Opt.DestConfFile) if err != nil { - log.Debugf("cg.generateFlowlogs2MetricsConfig err: %v ", err) + log.Debugf("cg.generateFlowlogs2PipelineConfig err: %v ", err) return err } diff --git a/pkg/confgen/dedup.go b/pkg/confgen/dedup.go index e08c621ff..fdb016c39 100644 --- a/pkg/confgen/dedup.go +++ b/pkg/confgen/dedup.go @@ -51,9 +51,9 @@ func dedupeNetworkTransformRules(rules api.NetworkTransformRules) api.NetworkTra } // dedupeAggregateDefinitions is inefficient because we can't use a map to look for duplicates. -// The reason is that aggregate.Definition is not hashable due to its By field which is a slice. +// The reason is that aggregate.AggregateDefinition is not hashable due to its AggregateBy field which is a slice. func dedupeAggregateDefinitions(aggregateDefinitions aggregate.Definitions) aggregate.Definitions { - var dedpueSlice []aggregate.Definition + var dedpueSlice []api.AggregateDefinition for i, aggregateDefinition := range aggregateDefinitions { if containsAggregateDefinitions(dedpueSlice, aggregateDefinition) { // duplicate aggregateDefinition @@ -65,7 +65,7 @@ func dedupeAggregateDefinitions(aggregateDefinitions aggregate.Definitions) aggr return dedpueSlice } -func containsAggregateDefinitions(slice []aggregate.Definition, searchItem aggregate.Definition) bool { +func containsAggregateDefinitions(slice []api.AggregateDefinition, searchItem api.AggregateDefinition) bool { for _, item := range slice { if reflect.DeepEqual(item, searchItem) { return true diff --git a/pkg/confgen/dedup_test.go b/pkg/confgen/dedup_test.go index 30354a887..9a36e7ac1 100644 --- a/pkg/confgen/dedup_test.go +++ b/pkg/confgen/dedup_test.go @@ -43,17 +43,17 @@ func Test_dedupeNetworkTransformRules(t *testing.T) { func Test_dedupeAggregateDefinitions(t *testing.T) { slice := aggregate.Definitions{ - aggregate.Definition{Name: "n1", By: aggregate.By{"a", "b"}, Operation: aggregate.Operation("o1")}, - aggregate.Definition{Name: "n1", By: aggregate.By{"a"}, Operation: aggregate.Operation("o1")}, - aggregate.Definition{Name: "n2", By: aggregate.By{"a", "b"}, Operation: aggregate.Operation("o2")}, - aggregate.Definition{Name: "n3", By: aggregate.By{"a", "b"}, Operation: aggregate.Operation("o3")}, - aggregate.Definition{Name: "n2", By: aggregate.By{"a", "b"}, Operation: aggregate.Operation("o2")}, + api.AggregateDefinition{Name: "n1", By: api.AggregateBy{"a", "b"}, Operation: api.AggregateOperation("o1")}, + api.AggregateDefinition{Name: "n1", By: api.AggregateBy{"a"}, Operation: api.AggregateOperation("o1")}, + api.AggregateDefinition{Name: "n2", By: api.AggregateBy{"a", "b"}, Operation: api.AggregateOperation("o2")}, + api.AggregateDefinition{Name: "n3", By: api.AggregateBy{"a", "b"}, Operation: api.AggregateOperation("o3")}, + api.AggregateDefinition{Name: "n2", By: api.AggregateBy{"a", "b"}, Operation: api.AggregateOperation("o2")}, } expected := aggregate.Definitions{ - aggregate.Definition{Name: "n1", By: aggregate.By{"a", "b"}, Operation: aggregate.Operation("o1")}, - aggregate.Definition{Name: "n1", By: aggregate.By{"a"}, Operation: aggregate.Operation("o1")}, - aggregate.Definition{Name: "n2", By: aggregate.By{"a", "b"}, Operation: aggregate.Operation("o2")}, - aggregate.Definition{Name: "n3", By: aggregate.By{"a", "b"}, Operation: aggregate.Operation("o3")}, + api.AggregateDefinition{Name: "n1", By: api.AggregateBy{"a", "b"}, Operation: api.AggregateOperation("o1")}, + api.AggregateDefinition{Name: "n1", By: api.AggregateBy{"a"}, Operation: api.AggregateOperation("o1")}, + api.AggregateDefinition{Name: "n2", By: api.AggregateBy{"a", "b"}, Operation: api.AggregateOperation("o2")}, + api.AggregateDefinition{Name: "n3", By: api.AggregateBy{"a", "b"}, Operation: api.AggregateOperation("o3")}, } actual := dedupeAggregateDefinitions(slice) diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index b64960992..8ecba8a1c 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -23,49 +23,82 @@ import ( "io/ioutil" ) -func (cg *ConfGen) generateFlowlogs2MetricsConfig(fileName string) error { +func (cg *ConfGen) generateFlowlogs2PipelineConfig(fileName string) error { config := map[string]interface{}{ "log-level": "error", - "pipeline": map[string]interface{}{ - "ingest": map[string]interface{}{ - "type": "collector", - "collector": map[string]interface{}{ - "port": cg.config.Ingest.Collector.Port, - "hostname": cg.config.Ingest.Collector.HostName, + "pipeline": []map[string]string{ + {"name": "ingest1"}, + {"name": "decode1", + "follows": "ingest1", + }, + {"name": "transform1", + "follows": "decode1", + }, + {"name": "transform2", + "follows": "transform1", + }, + {"name": "extract1", + "follows": "transform2", + }, + {"name": "encode1", + "follows": "extract1", + }, + {"name": "write1", + "follows": "transform2", + }, + }, + "parameters": []map[string]interface{}{ + {"name": "ingest1", + "ingest": map[string]interface{}{ + "type": "collector", + "collector": map[string]interface{}{ + "port": cg.config.Ingest.Collector.Port, + "hostname": cg.config.Ingest.Collector.HostName, + }, }, }, - "decode": map[string]interface{}{ - "type": "json", + {"name": "decode1", + "decode": map[string]interface{}{ + "type": "json", + }, }, - "transform": []interface{}{ - map[string]interface{}{ + {"name": "transform1", + "transform": map[string]interface{}{ "type": "generic", "generic": map[string]interface{}{ "rules": cg.config.Transform.Generic.Rules, }, }, - map[string]interface{}{ + }, + {"name": "transform2", + "transform": map[string]interface{}{ "type": "network", "network": map[string]interface{}{ "rules": cg.transformRules, }, }, }, - "extract": map[string]interface{}{ - "type": "aggregates", - "aggregates": cg.aggregateDefinitions, + {"name": "extract1", + "extract": map[string]interface{}{ + "type": "aggregates", + "aggregates": cg.aggregateDefinitions, + }, }, - "encode": map[string]interface{}{ - "type": "prom", - "prom": map[string]interface{}{ - "port": cg.config.Encode.Prom.Port, - "prefix": cg.config.Encode.Prom.Prefix, - "metrics": cg.promMetrics, + {"name": "encode1", + "encode": map[string]interface{}{ + "type": "prom", + "prom": map[string]interface{}{ + "port": cg.config.Encode.Prom.Port, + "prefix": cg.config.Encode.Prom.Prefix, + "metrics": cg.promMetrics, + }, }, }, - "write": map[string]interface{}{ - "type": cg.config.Write.Type, - "loki": cg.config.Write.Loki, + {"name": "write1", + "write": map[string]interface{}{ + "type": cg.config.Write.Type, + "loki": cg.config.Write.Loki, + }, }, }, } diff --git a/pkg/config/config.go b/pkg/config/config.go index fd35af85e..cd07d9c0f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -17,25 +17,40 @@ package config +import ( + "encoding/json" + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/sirupsen/logrus" +) + type GenericMap map[string]interface{} var ( - Opt = Options{} + Opt = Options{} + PipeLine []Stage + Parameters []StageParam ) type Options struct { - PipeLine Pipeline - Health Health + PipeLine string + Parameters string + Health Health } type Health struct { Port string } -type Pipeline struct { +type Stage struct { + Name string + Follows string +} + +type StageParam struct { + Name string Ingest Ingest Decode Decode - Transform string + Transform Transform Extract Extract Encode Encode Write Write @@ -44,8 +59,8 @@ type Pipeline struct { type Ingest struct { Type string File File - Collector string - Kafka string + Collector api.IngestCollector + Kafka api.IngestKafka } type File struct { @@ -58,21 +73,46 @@ type Aws struct { type Decode struct { Type string - Aws string + Aws api.DecodeAws +} + +type Transform struct { + Type string + Generic api.TransformGeneric + Network api.TransformNetwork } type Extract struct { Type string - Aggregates string + Aggregates []api.AggregateDefinition } type Encode struct { Type string - Prom string - Kafka string + Prom api.PromEncode + Kafka api.EncodeKafka } type Write struct { Type string - Loki string + Loki api.WriteLoki +} + +// ParseConfig creates the internal unmarshalled representation from the Pipeline and Parameters json +func ParseConfig() error { + logrus.Debugf("config.Opt.PipeLine = %v ", Opt.PipeLine) + err := json.Unmarshal([]byte(Opt.PipeLine), &PipeLine) + if err != nil { + logrus.Errorf("error when reading config file: %v", err) + return err + } + logrus.Debugf("stages = %v ", PipeLine) + + err = json.Unmarshal([]byte(Opt.Parameters), &Parameters) + if err != nil { + logrus.Errorf("error when reading config file: %v", err) + return err + } + logrus.Debugf("params = %v ", Parameters) + return nil } diff --git a/pkg/pipeline/decode/decode_aws.go b/pkg/pipeline/decode/decode_aws.go index d14bc11b1..b072405a9 100644 --- a/pkg/pipeline/decode/decode_aws.go +++ b/pkg/pipeline/decode/decode_aws.go @@ -18,8 +18,6 @@ package decode import ( - "encoding/json" - "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" log "github.com/sirupsen/logrus" "strings" @@ -72,23 +70,12 @@ func (c *decodeAws) Decode(in []interface{}) []config.GenericMap { } // NewDecodeAws create a new decode -func NewDecodeAws() (Decoder, error) { +func NewDecodeAws(params config.StageParam) (Decoder, error) { log.Debugf("entering NewDecodeAws") - var recordKeys []string - fieldsString := config.Opt.PipeLine.Decode.Aws - log.Debugf("fieldsString = %v", fieldsString) - if fieldsString != "" { - var awsFields api.DecodeAws - err := json.Unmarshal([]byte(fieldsString), &awsFields) - if err != nil { - log.Errorf("NewDecodeAws: error in unmarshalling fields: %v", err) - return nil, err - } - recordKeys = awsFields.Fields - } else { + recordKeys := params.Decode.Aws.Fields + if len(recordKeys) == 0 { recordKeys = defaultKeys } - log.Debugf("recordKeys = %v", recordKeys) return &decodeAws{ keyTags: recordKeys, diff --git a/pkg/pipeline/decode/decode_aws_test.go b/pkg/pipeline/decode/decode_aws_test.go index 142e41301..8ba08f95b 100644 --- a/pkg/pipeline/decode/decode_aws_test.go +++ b/pkg/pipeline/decode/decode_aws_test.go @@ -19,7 +19,6 @@ package decode import ( "bufio" - jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/stretchr/testify/require" @@ -29,46 +28,55 @@ import ( const testConfig1 = ` pipeline: - decode: - type: aws + - name: decode1 +parameters: + - name: decode1 + decode: + type: aws ` const testConfig2 = ` pipeline: - decode: - type: aws - aws: - fields: - - version - - vpc-id - - subnet-id - - instance-id - - interface-id - - account-id - - type - - srcaddr - - dstaddr - - srcport - - dstport - - pkt-srcaddr - - pkt-dstaddr - - protocol - - bytes - - packets - - start - - end - - action - - tcp-flags - - log-status + - name: decode1 +parameters: + - name: decode1 + decode: + type: aws + aws: + fields: + - version + - vpc-id + - subnet-id + - instance-id + - interface-id + - account-id + - type + - srcaddr + - dstaddr + - srcport + - dstport + - pkt-srcaddr + - pkt-dstaddr + - protocol + - bytes + - packets + - start + - end + - action + - tcp-flags + - log-status ` const testConfigErr = ` pipeline: - decode: - type: aws - aws: - fields: - version - vpc-id + - name: decode1 +parameters: + - name: decode1 + decode: + type: aws + aws: + fields: + version + vpc-id ` // aws version 2 standard format @@ -89,21 +97,9 @@ const input2 = `3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-0123456789012 func initNewDecodeAws(t *testing.T, testConfig string) Decoder { v := test.InitConfig(t, testConfig) - val := v.Get("pipeline.decode.type") - require.Equal(t, "aws", val) - - tmp := v.Get("pipeline.decode.aws") - if tmp != nil { - var json = jsoniter.ConfigCompatibleWithStandardLibrary - b, err := json.Marshal(&tmp) - require.Equal(t, err, nil) - // perform initializations usually done in main.go - config.Opt.PipeLine.Decode.Aws = string(b) - } else { - config.Opt.PipeLine.Decode.Aws = "" - } + require.NotNil(t, v) - newDecode, err := NewDecodeAws() + newDecode, err := NewDecodeAws(config.Parameters[0]) require.Equal(t, nil, err) return newDecode } @@ -196,16 +192,5 @@ func TestDecodeAwsCustom(t *testing.T) { func TestConfigErr(t *testing.T) { v := test.InitConfig(t, testConfigErr) - val := v.Get("pipeline.decode.type") - require.Equal(t, "aws", val) - - tmp := v.Get("pipeline.decode.aws") - var json = jsoniter.ConfigCompatibleWithStandardLibrary - b, err := json.Marshal(&tmp) - require.Equal(t, err, nil) - // perform initializations usually done in main.go - config.Opt.PipeLine.Decode.Aws = string(b) - - newDecode, _ := NewDecodeAws() - require.Equal(t, nil, newDecode) + require.Nil(t, v) } diff --git a/pkg/pipeline/encode/encode.go b/pkg/pipeline/encode/encode.go index 05cdbcaa2..4d0a423dc 100644 --- a/pkg/pipeline/encode/encode.go +++ b/pkg/pipeline/encode/encode.go @@ -26,16 +26,12 @@ type encodeNone struct { } type Encoder interface { - Encode(in []config.GenericMap) []interface{} + Encode(in []config.GenericMap) []config.GenericMap } // Encode encodes a flow before being stored -func (t *encodeNone) Encode(in []config.GenericMap) []interface{} { - out := make([]interface{}, len(in)) - for i, v := range in { - out[i] = v - } - return out +func (t *encodeNone) Encode(in []config.GenericMap) []config.GenericMap { + return in } // NewEncodeNone create a new encode diff --git a/pkg/pipeline/encode/encode_json.go b/pkg/pipeline/encode/encode_json.go deleted file mode 100644 index abb7816d3..000000000 --- a/pkg/pipeline/encode/encode_json.go +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (C) 2021 IBM, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package encode - -import ( - "encoding/json" - "github.com/netobserv/flowlogs-pipeline/pkg/config" - log "github.com/sirupsen/logrus" -) - -type encodeJson struct { -} - -// Encode encodes json to byte array -// All entries should be saved as strings -func (e *encodeJson) Encode(inputMetrics []config.GenericMap) []interface{} { - out := make([]interface{}, 0) - for _, metric := range inputMetrics { - log.Debugf("encodeJson, metric = %v", metric) - var line []byte - var err error - line, err = json.Marshal(metric) - if err != nil { - log.Errorf("encodeJson Decode: error marshalling a line: %v", err) - continue - } - out = append(out, line) - } - return out -} - -// NewEndcodeJson create a new encode -func NewEncodeJson() (Encoder, error) { - log.Debugf("entering NewEncodeJson") - return &encodeJson{}, nil -} diff --git a/pkg/pipeline/encode/encode_json_test.go b/pkg/pipeline/encode/encode_json_test.go deleted file mode 100644 index e49d9aff1..000000000 --- a/pkg/pipeline/encode/encode_json_test.go +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (C) 2022 IBM, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package encode - -import ( - "github.com/netobserv/flowlogs-pipeline/pkg/config" - "github.com/stretchr/testify/require" - "testing" -) - -func initNewEncodeJson(t *testing.T) Encoder { - newEncode, err := NewEncodeJson() - require.NoError(t, err) - return newEncode -} - -func TestEncodeJson(t *testing.T) { - newEncode := initNewEncodeJson(t) - encodeByteArray := newEncode.(*encodeJson) - map1 := config.GenericMap{ - "varInt": 12, - "varString": "testString1", - "varBool": true, - } - map2 := config.GenericMap{ - "varString": "testString2", - "varInt": 14, - "varBool": false, - } - map3 := config.GenericMap{} - var out []interface{} - var in []config.GenericMap - out = encodeByteArray.Encode(in) - require.Equal(t, 0, len(out)) - in = append(in, map1) - in = append(in, map2) - in = append(in, map3) - out = encodeByteArray.Encode(in) - require.Equal(t, len(in), len(out)) - expected1 := []byte(`{"varInt":12,"varBool":true,"varString":"testString1"}`) - expected2 := []byte(`{"varInt":14,"varBool":false,"varString":"testString2"}`) - expected3 := []byte(`{}`) - require.JSONEq(t, string(expected1), string(out[0].([]byte))) - require.JSONEq(t, string(expected2), string(out[1].([]byte))) - require.JSONEq(t, string(expected3), string(out[2].([]byte))) -} diff --git a/pkg/pipeline/encode/encode_kafka.go b/pkg/pipeline/encode/encode_kafka.go index 548360ddd..d032199f7 100644 --- a/pkg/pipeline/encode/encode_kafka.go +++ b/pkg/pipeline/encode/encode_kafka.go @@ -42,11 +42,11 @@ type encodeKafka struct { } // Encode writes entries to kafka topic -func (r *encodeKafka) Encode(in []config.GenericMap) []interface{} { +func (r *encodeKafka) Encode(in []config.GenericMap) []config.GenericMap { log.Debugf("entering encodeKafka Encode, #items = %d", len(in)) var msgs []kafkago.Message msgs = make([]kafkago.Message, 0) - out := make([]interface{}, 0) + out := make([]config.GenericMap, 0) for _, entry := range in { var entryByteArray []byte entryByteArray, _ = json.Marshal(entry) @@ -64,15 +64,9 @@ func (r *encodeKafka) Encode(in []config.GenericMap) []interface{} { } // NewEncodeKafka create a new writer to kafka -func NewEncodeKafka() (Encoder, error) { +func NewEncodeKafka(params config.StageParam) (Encoder, error) { log.Debugf("entering NewIngestKafka") - encodeKafkaString := config.Opt.PipeLine.Encode.Kafka - log.Debugf("encodeKafkaString = %s", encodeKafkaString) - var jsonEncodeKafka api.EncodeKafka - err := json.Unmarshal([]byte(encodeKafkaString), &jsonEncodeKafka) - if err != nil { - return nil, err - } + jsonEncodeKafka := params.Encode.Kafka var balancer kafkago.Balancer switch jsonEncodeKafka.Balancer { diff --git a/pkg/pipeline/encode/encode_kafka_test.go b/pkg/pipeline/encode/encode_kafka_test.go index 423355b3a..5eee7d781 100644 --- a/pkg/pipeline/encode/encode_kafka_test.go +++ b/pkg/pipeline/encode/encode_kafka_test.go @@ -19,7 +19,6 @@ package encode import ( "encoding/json" - jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/test" kafkago "github.com/segmentio/kafka-go" @@ -32,11 +31,14 @@ import ( const testKafkaConfig = `--- log-level: debug pipeline: - encode: - type: kafka - kafka: - address: 1.2.3.4:9092 - topic: topic1 + - name: encode1 +parameters: + - name: encode1 + encode: + type: kafka + kafka: + address: 1.2.3.4:9092 + topic: topic1 ` type fakeKafkaWriter struct { @@ -52,13 +54,9 @@ func (f *fakeKafkaWriter) WriteMessages(ctx context.Context, msg ...kafkago.Mess func initNewEncodeKafka(t *testing.T) Encoder { v := test.InitConfig(t, testKafkaConfig) - val := v.Get("pipeline.encode.kafka") - var json = jsoniter.ConfigCompatibleWithStandardLibrary - b, err := json.Marshal(&val) - require.NoError(t, err) + require.NotNil(t, v) - config.Opt.PipeLine.Encode.Kafka = string(b) - newEncode, err := NewEncodeKafka() + newEncode, err := NewEncodeKafka(config.Parameters[0]) require.NoError(t, err) return newEncode } diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index b05b7bb6e..558af7f0b 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -19,7 +19,6 @@ package encode import ( "container/list" - "encoding/json" "fmt" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" @@ -81,11 +80,11 @@ type encodeProm struct { } // Encode encodes a metric before being stored -func (e *encodeProm) Encode(metrics []config.GenericMap) []interface{} { +func (e *encodeProm) Encode(metrics []config.GenericMap) []config.GenericMap { log.Debugf("entering encodeProm Encode") e.mu.Lock() defer e.mu.Unlock() - out := make([]interface{}, 0) + out := make([]config.GenericMap, 0) for _, metric := range metrics { // TODO: We may need different handling for histograms metricOut := e.EncodeMetric(metric) @@ -97,10 +96,10 @@ func (e *encodeProm) Encode(metrics []config.GenericMap) []interface{} { return out } -func (e *encodeProm) EncodeMetric(metric config.GenericMap) []interface{} { +func (e *encodeProm) EncodeMetric(metric config.GenericMap) []config.GenericMap { log.Debugf("entering EncodeMetric metric = %v", metric) // TODO: We may need different handling for histograms - out := make([]interface{}, 0) + out := make([]config.GenericMap, 0) for metricName, mInfo := range e.metrics { metricValue, ok := metric[mInfo.input] if !ok { @@ -125,7 +124,12 @@ func (e *encodeProm) EncodeMetric(metric config.GenericMap) []interface{} { }, value: valueFloat, } - out = append(out, entry) + entryMap := map[string]interface{}{ + "Name": e.prefix + metricName, + "Labels": entryLabels, + "value": valueFloat, + } + out = append(out, entryMap) cEntry := e.saveEntryInCache(entry, entryLabels) cEntry.PromMetric.metricType = mInfo.PromMetric.metricType @@ -249,15 +253,8 @@ func startPrometheusInterface(w *encodeProm) { } } -func NewEncodeProm() (Encoder, error) { - encodePromString := config.Opt.PipeLine.Encode.Prom - log.Debugf("promEncodeString = %s", encodePromString) - var jsonEncodeProm api.PromEncode - err := json.Unmarshal([]byte(encodePromString), &jsonEncodeProm) - if err != nil { - return nil, err - } - +func NewEncodeProm(params config.StageParam) (Encoder, error) { + jsonEncodeProm := params.Encode.Prom portNum := jsonEncodeProm.Port promPrefix := jsonEncodeProm.Prefix expiryTime := int64(jsonEncodeProm.ExpiryTime) diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index 0c6db68cc..50f30cad2 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -19,7 +19,6 @@ package encode import ( "container/list" - jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/prometheus/client_golang/prometheus/testutil" @@ -32,42 +31,41 @@ import ( const testConfig = `--- log-level: debug pipeline: - encode: - type: prom - prom: - port: 9103 - prefix: test_ - expirytime: 1 - metrics: - - name: Bytes - type: gauge - valuekey: bytes - labels: - - srcAddr - - dstAddr - - srcPort - - name: Packets - type: counter - valuekey: packets - labels: - - srcAddr - - dstAddr - - dstPort - - name: subnetHistogram - type: histogram - valuekey: aggregate - labels: + - name: encode1 +parameters: + - name: encode1 + encode: + type: prom + prom: + port: 9103 + prefix: test_ + expirytime: 1 + metrics: + - name: Bytes + type: gauge + valuekey: bytes + labels: + - srcAddr + - dstAddr + - srcPort + - name: Packets + type: counter + valuekey: packets + labels: + - srcAddr + - dstAddr + - dstPort + - name: subnetHistogram + type: histogram + valuekey: aggregate + labels: ` func initNewEncodeProm(t *testing.T) Encoder { v := test.InitConfig(t, testConfig) - val := v.Get("pipeline.encode.prom") - var json = jsoniter.ConfigCompatibleWithStandardLibrary - b, err := json.Marshal(&val) - require.Equal(t, err, nil) + require.NotNil(t, v) - config.Opt.PipeLine.Encode.Prom = string(b) - newEncode, err := NewEncodeProm() + newEncode, err := NewEncodeProm(config.Parameters[0]) require.Equal(t, err, nil) return newEncode } @@ -104,32 +102,33 @@ func Test_NewEncodeProm(t *testing.T) { entryLabels2["srcAddr"] = "10.1.2.3" entryLabels2["dstAddr"] = "10.1.2.4" entryLabels2["dstPort"] = "39504" - gEntryInfo1 := entryInfo{ - value: float64(1234), - eInfo: entrySignature{ - Name: "test_Bytes", - Labels: entryLabels1, - }, + gEntryInfo1 := config.GenericMap{ + "Name": "test_Bytes", + "Labels": entryLabels1, + "value": float64(1234), } - gEntryInfo2 := entryInfo{ - eInfo: entrySignature{ - Name: "test_Packets", - Labels: entryLabels2, - }, - value: float64(34), + gEntryInfo2 := config.GenericMap{ + "Name": "test_Packets", + "Labels": entryLabels2, + "value": float64(34), } require.Contains(t, output, gEntryInfo1) require.Contains(t, output, gEntryInfo2) gaugeA, err := gInfo.promGauge.GetMetricWith(entryLabels1) require.Equal(t, nil, err) bytesA := testutil.ToFloat64(gaugeA) - require.Equal(t, gEntryInfo1.value, bytesA) + require.Equal(t, gEntryInfo1["value"], bytesA) // verify entries are in cache; one for the gauge and one for the counter entriesMap := encodeProm.mCache require.Equal(t, 2, len(entriesMap)) - eInfoBytes := generateCacheKey(&gEntryInfo1.eInfo) + eInfo := entrySignature{ + Name: "test_Bytes", + Labels: entryLabels1, + } + + eInfoBytes := generateCacheKey(&eInfo) encodeProm.mu.Lock() _, found := encodeProm.mCache[string(eInfoBytes)] encodeProm.mu.Unlock() @@ -171,17 +170,15 @@ func Test_EncodeAggregate(t *testing.T) { output := newEncode.Encode(metrics) - gEntryInfo1 := entryInfo{ - eInfo: entrySignature{ - Name: "test_gauge", - Labels: map[string]string{ - "by": "[dstIP srcIP]", - "aggregate": "20.0.0.2,10.0.0.1", - }, + gEntryInfo1 := config.GenericMap{ + "Name": "test_gauge", + "Labels": map[string]string{ + "by": "[dstIP srcIP]", + "aggregate": "20.0.0.2,10.0.0.1", }, - value: float64(7), + "value": float64(7), } - expectedOutput := []interface{}{gEntryInfo1} - require.Equal(t, output, expectedOutput) + expectedOutput := []config.GenericMap{gEntryInfo1} + require.Equal(t, expectedOutput, output) } diff --git a/pkg/pipeline/encode/encode_test.go b/pkg/pipeline/encode/encode_test.go index 4b4b67fad..91bd3d5fa 100644 --- a/pkg/pipeline/encode/encode_test.go +++ b/pkg/pipeline/encode/encode_test.go @@ -43,7 +43,7 @@ func TestEncodeNone(t *testing.T) { "varbool": false, } map3 := config.GenericMap{} - var out []interface{} + var out []config.GenericMap var in []config.GenericMap out = encodeNone.Encode(in) require.Equal(t, 0, len(out)) diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 05ec9201a..f36843ed1 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -20,6 +20,7 @@ package aggregate import ( "fmt" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/api" log "github.com/sirupsen/logrus" "math" "sort" @@ -35,24 +36,14 @@ const ( OperationCount = "count" ) -type By []string -type Operation string - type Labels map[string]string type NormalizedValues string type Aggregate struct { - Definition Definition + Definition api.AggregateDefinition Groups map[NormalizedValues]*GroupState } -type Definition struct { - Name string - By By - Operation Operation - RecordKey string -} - type GroupState struct { normalizedValues NormalizedValues RecentRawValues []float64 diff --git a/pkg/pipeline/extract/aggregate/aggregate_test.go b/pkg/pipeline/extract/aggregate/aggregate_test.go index 24c53d4ab..f36e46c62 100644 --- a/pkg/pipeline/extract/aggregate/aggregate_test.go +++ b/pkg/pipeline/extract/aggregate/aggregate_test.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/test" + "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/stretchr/testify/require" "strconv" "testing" @@ -28,9 +29,9 @@ import ( func GetMockAggregate() Aggregate { aggregate := Aggregate{ - Definition: Definition{ + Definition: api.AggregateDefinition{ Name: "Avg by src and dst IP's", - By: By{"dstIP", "srcIP"}, + By: api.AggregateBy{"dstIP", "srcIP"}, Operation: "avg", RecordKey: "value", }, @@ -73,8 +74,8 @@ func Test_getNormalizedValues(t *testing.T) { func Test_LabelsFromEntry(t *testing.T) { aggregate := Aggregate{ - Definition: Definition{ - By: By{"dstIP", "srcIP"}, + Definition: api.AggregateDefinition{ + By: api.AggregateBy{"dstIP", "srcIP"}, Operation: "count", RecordKey: "", }, diff --git a/pkg/pipeline/extract/aggregate/aggregates.go b/pkg/pipeline/extract/aggregate/aggregates.go index 5623a52ba..58e1c38e8 100644 --- a/pkg/pipeline/extract/aggregate/aggregates.go +++ b/pkg/pipeline/extract/aggregate/aggregates.go @@ -18,15 +18,15 @@ package aggregate import ( - "encoding/json" "fmt" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/api" log "github.com/sirupsen/logrus" "reflect" ) type Aggregates []Aggregate -type Definitions []Definition +type Definitions []api.AggregateDefinition func (aggregates Aggregates) Evaluate(entries []config.GenericMap) error { for _, aggregate := range aggregates { @@ -50,7 +50,7 @@ func (aggregates Aggregates) GetMetrics() []config.GenericMap { return metrics } -func (aggregates Aggregates) AddAggregate(aggregateDefinition Definition) Aggregates { +func (aggregates Aggregates) AddAggregate(aggregateDefinition api.AggregateDefinition) Aggregates { aggregate := Aggregate{ Definition: aggregateDefinition, Groups: map[NormalizedValues]*GroupState{}, @@ -60,29 +60,18 @@ func (aggregates Aggregates) AddAggregate(aggregateDefinition Definition) Aggreg return appendedAggregates } -func (aggregates Aggregates) RemoveAggregate(by By) (Aggregates, error) { +func (aggregates Aggregates) RemoveAggregate(by api.AggregateBy) (Aggregates, error) { for i, other := range aggregates { if reflect.DeepEqual(other.Definition.By, by) { return append(aggregates[:i], aggregates[i+1:]...), nil } } - return aggregates, fmt.Errorf("can't find By = %v", by) + return aggregates, fmt.Errorf("can't find AggregateBy = %v", by) } -func NewAggregatesFromConfig() (Aggregates, error) { - var definitions Definitions +func NewAggregatesFromConfig(definitions []api.AggregateDefinition) (Aggregates, error) { aggregates := Aggregates{} - definitionsAsString := config.Opt.PipeLine.Extract.Aggregates - log.Debugf("aggregatesString = %s", definitionsAsString) - - err := json.Unmarshal([]byte(definitionsAsString), &definitions) - if err != nil { - log.Errorf("error in unmarshalling yaml: %v", err) - return nil, nil - - } - for _, aggregateDefinition := range definitions { aggregates = aggregates.AddAggregate(aggregateDefinition) } diff --git a/pkg/pipeline/extract/aggregate/aggregates_test.go b/pkg/pipeline/extract/aggregate/aggregates_test.go index 4a7c7d50e..7b071aca9 100644 --- a/pkg/pipeline/extract/aggregate/aggregates_test.go +++ b/pkg/pipeline/extract/aggregate/aggregates_test.go @@ -18,50 +18,35 @@ package aggregate import ( - "bytes" - jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/config" - "github.com/spf13/viper" + "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/stretchr/testify/require" "testing" ) func Test_NewAggregatesFromConfig(t *testing.T) { - var yamlConfig = []byte(` + var yamlConfig = ` log-level: debug pipeline: - ingest: - type: collector - decode: - type: json - extract: - type: aggregates - aggregates: - - Name: "Avg by src and dst IP's" - By: - - "dstIP" - - "srcIP" - Operation: "avg" - RecordKey: "value" - encode: - type: prom - write: - type: stdout -`) - v := viper.New() - v.SetConfigType("yaml") - r := bytes.NewReader(yamlConfig) - err := v.ReadConfig(r) - require.Equal(t, err, nil) - val := v.Get("pipeline.extract.aggregates") - var json = jsoniter.ConfigCompatibleWithStandardLibrary - b, err := json.Marshal(&val) - require.Equal(t, err, nil) - config.Opt.PipeLine.Extract.Aggregates = string(b) + - name: extract1 +parameters: + - name: extract1 + extract: + type: aggregates + aggregates: + - Name: "Avg by src and dst IP's" + By: + - "dstIP" + - "srcIP" + Operation: "avg" + RecordKey: "value" +` + v := test.InitConfig(t, yamlConfig) + require.NotNil(t, v) expectedAggregate := GetMockAggregate() - aggregates, err := NewAggregatesFromConfig() + aggregates, err := NewAggregatesFromConfig(config.Parameters[0].Extract.Aggregates) - require.Equal(t, err, nil) + require.NoError(t, err) require.Equal(t, aggregates[0].Definition, expectedAggregate.Definition) } diff --git a/pkg/pipeline/extract/extract_aggregate.go b/pkg/pipeline/extract/extract_aggregate.go index eda135eca..b63fc74c6 100644 --- a/pkg/pipeline/extract/extract_aggregate.go +++ b/pkg/pipeline/extract/extract_aggregate.go @@ -40,9 +40,9 @@ func (ea *extractAggregate) Extract(entries []config.GenericMap) []config.Generi } // NewExtractAggregate creates a new extractor -func NewExtractAggregate() (Extractor, error) { +func NewExtractAggregate(params config.StageParam) (Extractor, error) { log.Debugf("entering NewExtractAggregate") - aggregates, err := aggregate.NewAggregatesFromConfig() + aggregates, err := aggregate.NewAggregatesFromConfig(params.Extract.Aggregates) if err != nil { log.Errorf("error in NewAggregatesFromConfig: %v", err) return nil, err diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index 5982b35e9..0824c551a 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -19,11 +19,11 @@ package extract import ( "fmt" - jsoniter "github.com/json-iterator/go" + "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" + "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/stretchr/testify/require" - "gopkg.in/yaml.v2" "testing" ) @@ -35,7 +35,7 @@ func createAgg(name, recordKey, by, agg, op string, value float64, count int, rr "by": by, "aggregate": agg, by: agg, - "operation": aggregate.Operation(op), + "operation": api.AggregateOperation(op), "value": valueString, fmt.Sprintf("%v_value", name): valueString, "recentRawValues": rrv, @@ -49,47 +49,49 @@ func createAgg(name, recordKey, by, agg, op string, value float64, count int, rr func Test_Extract(t *testing.T) { // Setup yamlConfig := ` -aggregates: -- name: bandwidth_count - by: - - service - operation: count - recordkey: "" +pipeline: + - name: extract1 +parameters: + - name: extract1 + extract: + type: aggregates + aggregates: + - name: bandwidth_count + by: + - service + operation: count + recordkey: "" -- name: bandwidth_sum - by: - - service - operation: sum - recordkey: bytes + - name: bandwidth_sum + by: + - service + operation: sum + recordkey: bytes -- name: bandwidth_max - by: - - service - operation: max - recordkey: bytes + - name: bandwidth_max + by: + - service + operation: max + recordkey: bytes -- name: bandwidth_min - by: - - service - operation: min - recordkey: bytes + - name: bandwidth_min + by: + - service + operation: min + recordkey: bytes -- name: bandwidth_avg - by: - - service - operation: avg - recordkey: bytes + - name: bandwidth_avg + by: + - service + operation: avg + recordkey: bytes ` var err error - yamlData := make(map[string]interface{}) - err = yaml.Unmarshal([]byte(yamlConfig), &yamlData) - require.NoError(t, err) - var json = jsoniter.ConfigCompatibleWithStandardLibrary - jsonBytes, err := json.Marshal(yamlData["aggregates"]) - require.NoError(t, err) - config.Opt.PipeLine.Extract.Aggregates = string(jsonBytes) - extractAggregate, err := NewExtractAggregate() + v := test.InitConfig(t, yamlConfig) + require.NotNil(t, v) + + extractAggregate, err := NewExtractAggregate(config.Parameters[0]) require.NoError(t, err) // Test cases diff --git a/pkg/pipeline/ingest/ingest_collector.go b/pkg/pipeline/ingest/ingest_collector.go index 96f9b02ed..29e1e2982 100644 --- a/pkg/pipeline/ingest/ingest_collector.go +++ b/pkg/pipeline/ingest/ingest_collector.go @@ -22,7 +22,6 @@ import ( "encoding/binary" "encoding/json" "fmt" - "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" pUtils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" "net" @@ -91,20 +90,20 @@ func (w *TransportWrapper) Send(_, data []byte) error { } // Ingest ingests entries from a network collector using goflow2 library (https://github.com/netsampler/goflow2) -func (r *ingestCollector) Ingest(process ProcessFunction) { +func (ingestC *ingestCollector) Ingest(process ProcessFunction) { ctx := context.Background() - r.in = make(chan map[string]interface{}, channelSize) + ingestC.in = make(chan map[string]interface{}, channelSize) // initialize background listeners (a.k.a.netflow+legacy collector) - r.initCollectorListener(ctx) + ingestC.initCollectorListener(ctx) // forever process log lines received by collector - r.processLogLines(process) + ingestC.processLogLines(process) } -func (r *ingestCollector) initCollectorListener(ctx context.Context) { - transporter := NewWrapper(r.in) +func (ingestC *ingestCollector) initCollectorListener(ctx context.Context) { + transporter := NewWrapper(ingestC.in) go func() { formatter, err := goflowFormat.FindFormat(ctx, "pb") if err != nil { @@ -116,8 +115,8 @@ func (r *ingestCollector) initCollectorListener(ctx context.Context) { Logger: log.StandardLogger(), } - log.Infof("listening for netflow on host %s, port = %d", r.hostname, r.port) - err = sNF.FlowRoutine(1, r.hostname, r.port, false) + log.Infof("listening for netflow on host %s, port = %d", ingestC.hostname, ingestC.port) + err = sNF.FlowRoutine(1, ingestC.hostname, ingestC.port, false) log.Fatal(err) }() @@ -133,21 +132,21 @@ func (r *ingestCollector) initCollectorListener(ctx context.Context) { Logger: log.StandardLogger(), } - log.Infof("listening for legacy netflow on host %s, port = %d", r.hostname, r.port+1) - err = sLegacyNF.FlowRoutine(1, r.hostname, r.port+1, false) + log.Infof("listening for legacy netflow on host %s, port = %d", ingestC.hostname, ingestC.port+1) + err = sLegacyNF.FlowRoutine(1, ingestC.hostname, ingestC.port+1, false) log.Fatal(err) }() } -func (r *ingestCollector) processLogLines(process ProcessFunction) { +func (ingestC *ingestCollector) processLogLines(process ProcessFunction) { var records []interface{} for { select { - case <-r.exitChan: + case <-ingestC.exitChan: log.Debugf("exiting ingestCollector because of signal") return - case record := <-r.in: + case record := <-ingestC.in: recordAsBytes, _ := json.Marshal(record) records = append(records, string(recordAsBytes)) case <-time.After(time.Millisecond * batchMaxTimeInMilliSecs): // Maximum batch time for each batch @@ -161,14 +160,8 @@ func (r *ingestCollector) processLogLines(process ProcessFunction) { } // NewIngestCollector create a new ingester -func NewIngestCollector() (Ingester, error) { - ingestCollectorString := config.Opt.PipeLine.Ingest.Collector - log.Debugf("ingestCollectorString = %s", ingestCollectorString) - var jsonIngestCollector api.IngestCollector - err := json.Unmarshal([]byte(ingestCollectorString), &jsonIngestCollector) - if err != nil { - return nil, err - } +func NewIngestCollector(params config.StageParam) (Ingester, error) { + jsonIngestCollector := params.Ingest.Collector if jsonIngestCollector.HostName == "" { return nil, fmt.Errorf("ingest hostname not specified") diff --git a/pkg/pipeline/ingest/ingest_file.go b/pkg/pipeline/ingest/ingest_file.go index 55794621b..ec157c14a 100644 --- a/pkg/pipeline/ingest/ingest_file.go +++ b/pkg/pipeline/ingest/ingest_file.go @@ -28,7 +28,7 @@ import ( ) type IngestFile struct { - fileName string + params config.Ingest exitChan chan bool PrevRecords []interface{} } @@ -36,9 +36,9 @@ type IngestFile struct { const delaySeconds = 10 // Ingest ingests entries from a file and resends the same data every delaySeconds seconds -func (r *IngestFile) Ingest(process ProcessFunction) { +func (ingestF *IngestFile) Ingest(process ProcessFunction) { lines := make([]interface{}, 0) - file, err := os.Open(r.fileName) + file, err := os.Open(ingestF.params.File.Filename) if err != nil { log.Fatal(err) } @@ -52,22 +52,22 @@ func (r *IngestFile) Ingest(process ProcessFunction) { log.Debugf("%s", text) lines = append(lines, text) } - log.Debugf("Ingesting %d log lines from %s", len(lines), r.fileName) - switch config.Opt.PipeLine.Ingest.Type { + log.Debugf("Ingesting %d log lines from %s", len(lines), ingestF.params.File.Filename) + switch ingestF.params.Type { case "file": - r.PrevRecords = lines + ingestF.PrevRecords = lines process(lines) case "file_loop": // loop forever ticker := time.NewTicker(time.Duration(delaySeconds) * time.Second) for { select { - case <-r.exitChan: + case <-ingestF.exitChan: log.Debugf("exiting ingestFile because of signal") return case <-ticker.C: log.Debugf("ingestFile; for loop; before process") - r.PrevRecords = lines + ingestF.PrevRecords = lines process(lines) } } @@ -75,18 +75,18 @@ func (r *IngestFile) Ingest(process ProcessFunction) { } // NewIngestFile create a new ingester -func NewIngestFile() (Ingester, error) { +func NewIngestFile(params config.StageParam) (Ingester, error) { log.Debugf("entering NewIngestFile") - if config.Opt.PipeLine.Ingest.File.Filename == "" { + if params.Ingest.File.Filename == "" { return nil, fmt.Errorf("ingest filename not specified") } - log.Infof("input file name = %s", config.Opt.PipeLine.Ingest.File.Filename) + log.Debugf("input file name = %s", params.Ingest.File.Filename) ch := make(chan bool, 1) utils.RegisterExitChannel(ch) return &IngestFile{ - fileName: config.Opt.PipeLine.Ingest.File.Filename, + params: params.Ingest, exitChan: ch, }, nil } diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index 26c6d528b..1687358aa 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -18,7 +18,6 @@ package ingest import ( - "encoding/json" "errors" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" @@ -46,17 +45,17 @@ const channelSizeKafka = 1000 const defaultBatchReadTimeout = int64(100) // Ingest ingests entries from kafka topic -func (r *ingestKafka) Ingest(process ProcessFunction) { +func (ingestK *ingestKafka) Ingest(process ProcessFunction) { // initialize background listener - r.kafkaListener() + ingestK.kafkaListener() // forever process log lines received by collector - r.processLogLines(process) + ingestK.processLogLines(process) } // background thread to read kafka messages; place received items into ingestKafka input channel -func (r *ingestKafka) kafkaListener() { +func (ingestK *ingestKafka) kafkaListener() { log.Debugf("entering kafkaListener") go func() { @@ -65,13 +64,13 @@ func (r *ingestKafka) kafkaListener() { for { // block until a message arrives log.Debugf("before ReadMessage") - kafkaMessage, err = r.kafkaReader.ReadMessage(context.Background()) + kafkaMessage, err = ingestK.kafkaReader.ReadMessage(context.Background()) if err != nil { log.Errorln(err) } log.Debugf("string(kafkaMessage) = %s\n", string(kafkaMessage.Value)) if len(kafkaMessage.Value) > 0 { - r.in <- string(kafkaMessage.Value) + ingestK.in <- string(kafkaMessage.Value) } } }() @@ -79,22 +78,22 @@ func (r *ingestKafka) kafkaListener() { } // read items from ingestKafka input channel, pool them, and send down the pipeline -func (r *ingestKafka) processLogLines(process ProcessFunction) { +func (ingestK *ingestKafka) processLogLines(process ProcessFunction) { var records []interface{} - duration := time.Duration(r.kafkaParams.BatchReadTimeout) * time.Millisecond + duration := time.Duration(ingestK.kafkaParams.BatchReadTimeout) * time.Millisecond for { select { - case <-r.exitChan: + case <-ingestK.exitChan: log.Debugf("exiting ingestKafka because of signal") return - case record := <-r.in: + case record := <-ingestK.in: records = append(records, record) case <-time.After(duration): // Maximum batch time for each batch // Process batch of records (if not empty) if len(records) > 0 { process(records) - r.prevRecords = records - log.Debugf("prevRecords = %v", r.prevRecords) + ingestK.prevRecords = records + log.Debugf("prevRecords = %v", ingestK.prevRecords) } records = []interface{}{} } @@ -102,15 +101,9 @@ func (r *ingestKafka) processLogLines(process ProcessFunction) { } // NewIngestKafka create a new ingester -func NewIngestKafka() (Ingester, error) { +func NewIngestKafka(params config.StageParam) (Ingester, error) { log.Debugf("entering NewIngestKafka") - ingestKafkaString := config.Opt.PipeLine.Ingest.Kafka - log.Debugf("ingestKafkaString = %s", ingestKafkaString) - var jsonIngestKafka api.IngestKafka - err := json.Unmarshal([]byte(ingestKafkaString), &jsonIngestKafka) - if err != nil { - return nil, err - } + jsonIngestKafka := params.Ingest.Kafka // connect to the kafka server startOffsetString := jsonIngestKafka.StartOffset diff --git a/pkg/pipeline/ingest/ingest_kafka_test.go b/pkg/pipeline/ingest/ingest_kafka_test.go index cf9723a43..9a77347e0 100644 --- a/pkg/pipeline/ingest/ingest_kafka_test.go +++ b/pkg/pipeline/ingest/ingest_kafka_test.go @@ -18,7 +18,6 @@ package ingest import ( - jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/test" kafkago "github.com/segmentio/kafka-go" @@ -32,61 +31,42 @@ import ( const testConfig1 = `--- log-level: debug pipeline: - ingest: - type: kafka - kafka: - brokers: ["1.1.1.1:9092"] - topic: topic1 - groupid: group1 - startoffset: FirstOffset - groupbalancers: ["range", "roundRobin"] - batchReadTimeout: 300 - decode: - type: json - transform: - - type: none - extract: - type: none - encode: - type: none - write: - type: none + - name: ingest1 +parameters: + - name: ingest1 + ingest: + type: kafka + kafka: + brokers: ["1.1.1.1:9092"] + topic: topic1 + groupid: group1 + startoffset: FirstOffset + groupbalancers: ["range", "roundRobin"] + batchReadTimeout: 300 ` const testConfig2 = `--- log-level: debug pipeline: - ingest: - type: kafka - kafka: - brokers: ["1.1.1.2:9092"] - topic: topic2 - groupid: group2 - startoffset: LastOffset - groupbalancers: ["rackAffinity"] - decode: - type: none - transform: - - type: none - extract: - type: none - encode: - type: none - write: - type: none + - name: ingest1 +parameters: + - name: ingest1 + ingest: + type: kafka + kafka: + brokers: ["1.1.1.2:9092"] + topic: topic2 + groupid: group2 + startoffset: LastOffset + groupbalancers: ["rackAffinity"] ` func initNewIngestKafka(t *testing.T, configTemplate string) Ingester { v := test.InitConfig(t, configTemplate) - val := v.Get("pipeline.ingest.kafka") - var json = jsoniter.ConfigCompatibleWithStandardLibrary - b, err := json.Marshal(&val) - require.Equal(t, err, nil) - - config.Opt.PipeLine.Ingest.Kafka = string(b) - config.Opt.PipeLine.Ingest.Type = "kafka" - newIngest, err := NewIngestKafka() - require.Equal(t, err, nil) + require.NotNil(t, v) + + newIngest, err := NewIngestKafka(config.Parameters[0]) + require.NoError(t, err) return newIngest } diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 48282252c..26826d1b1 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -28,152 +28,319 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write" log "github.com/sirupsen/logrus" + "os" ) -// CfgRoot configuration root path - // interface definitions of pipeline components +const ( + StageIngest = "ingest" + StageDecode = "decode" + StageTransform = "transform" + StageExtract = "extract" + StageEncode = "encode" + StageWrite = "write" +) // Pipeline manager type Pipeline struct { - Ingester ingest.Ingester - Decoder decode.Decoder - Transformers []transform.Transformer - Writer write.Writer - Extractor extract.Extractor - Encoder encode.Encoder - IsRunning bool + IsRunning bool + pipelineStages []*PipelineEntry + configStages []config.Stage + configParams []config.StageParam } -func getIngester() (ingest.Ingester, error) { +type PipelineEntry struct { + configStage config.Stage + stageType string + Ingester ingest.Ingester + Decoder decode.Decoder + Transformer transform.Transformer + Extractor extract.Extractor + Encoder encode.Encoder + Writer write.Writer + nextStages []*PipelineEntry +} + +var pipelineEntryMap map[string]*PipelineEntry +var firstStage *PipelineEntry + +func getIngester(stage config.Stage, params config.StageParam) (ingest.Ingester, error) { var ingester ingest.Ingester var err error - log.Debugf("entering getIngester, type = %v", config.Opt.PipeLine.Ingest.Type) - switch config.Opt.PipeLine.Ingest.Type { + switch params.Ingest.Type { case "file", "file_loop": - ingester, err = ingest.NewIngestFile() + ingester, err = ingest.NewIngestFile(params) case "collector": - ingester, err = ingest.NewIngestCollector() + ingester, err = ingest.NewIngestCollector(params) case "kafka": - ingester, err = ingest.NewIngestKafka() + ingester, err = ingest.NewIngestKafka(params) default: - panic("`ingester` not defined") + panic(fmt.Sprintf("`ingest` type %s not defined; if no encoder needed, specify `none`", params.Ingest.Type)) } return ingester, err } -func getDecoder() (decode.Decoder, error) { +func getDecoder(stage config.Stage, params config.StageParam) (decode.Decoder, error) { var decoder decode.Decoder var err error - switch config.Opt.PipeLine.Decode.Type { + switch params.Decode.Type { case "json": decoder, err = decode.NewDecodeJson() case "aws": - decoder, err = decode.NewDecodeAws() + decoder, err = decode.NewDecodeAws(params) case "none": decoder, err = decode.NewDecodeNone() default: - panic("`decode` not defined; if no decoder needed, specify `none`") + panic(fmt.Sprintf("`decode` type %s not defined; if no encoder needed, specify `none`", params.Decode.Type)) } return decoder, err } -func getTransformers() ([]transform.Transformer, error) { - return transform.GetTransformers() -} - -func getWriter() (write.Writer, error) { +func getWriter(stage config.Stage, params config.StageParam) (write.Writer, error) { var writer write.Writer var err error - switch config.Opt.PipeLine.Write.Type { + switch params.Write.Type { case "stdout": - writer, _ = write.NewWriteStdout() + writer, err = write.NewWriteStdout() case "none": - writer, _ = write.NewWriteNone() + writer, err = write.NewWriteNone() case "loki": - writer, _ = write.NewWriteLoki() + writer, err = write.NewWriteLoki(params) default: - panic("`write` not defined; if no writer needed, specify `none`") + panic(fmt.Sprintf("`write` type %s not defined; if no encoder needed, specify `none`", params.Write.Type)) } return writer, err } -func getExtractor() (extract.Extractor, error) { +func getTransformer(stage config.Stage, params config.StageParam) (transform.Transformer, error) { + var transformer transform.Transformer + var err error + switch params.Transform.Type { + case transform.OperationGeneric: + transformer, err = transform.NewTransformGeneric(params) + case transform.OperationNetwork: + transformer, err = transform.NewTransformNetwork(params) + case transform.OperationNone: + transformer, err = transform.NewTransformNone() + default: + panic(fmt.Sprintf("`transform` type %s not defined; if no encoder needed, specify `none`", params.Transform.Type)) + } + return transformer, err +} + +func getExtractor(stage config.Stage, params config.StageParam) (extract.Extractor, error) { var extractor extract.Extractor var err error - switch config.Opt.PipeLine.Extract.Type { + switch params.Extract.Type { case "none": extractor, _ = extract.NewExtractNone() case "aggregates": - extractor, _ = extract.NewExtractAggregate() + extractor, err = extract.NewExtractAggregate(params) default: - panic("`extract` not defined; if no extractor needed, specify `none`") + panic(fmt.Sprintf("`extract` type %s not defined; if no encoder needed, specify `none`", params.Extract.Type)) } return extractor, err } -func getEncoder() (encode.Encoder, error) { +func getEncoder(stage config.Stage, params config.StageParam) (encode.Encoder, error) { var encoder encode.Encoder var err error - switch config.Opt.PipeLine.Encode.Type { + switch params.Encode.Type { case "prom": - encoder, _ = encode.NewEncodeProm() - case "json": - encoder, _ = encode.NewEncodeJson() + encoder, err = encode.NewEncodeProm(params) case "kafka": - encoder, _ = encode.NewEncodeKafka() + encoder, err = encode.NewEncodeKafka(params) case "none": encoder, _ = encode.NewEncodeNone() default: - panic("`encode` not defined; if no encoder needed, specify `none`") + panic(fmt.Sprintf("`encode` type %s not defined; if no encoder needed, specify `none`", params.Encode.Type)) } return encoder, err } +// findStageParameters finds the matching config.param structure and identifies the stage type +func findStageParameters(stage config.Stage, configParams []config.StageParam) (*config.StageParam, string) { + log.Debugf("findStageParameters: stage = %v", stage) + for _, param := range configParams { + log.Debugf("findStageParameters: param = %v", param) + if stage.Name == param.Name { + var stageType string + if param.Ingest.Type != "" { + stageType = StageIngest + } + if param.Decode.Type != "" { + stageType = StageDecode + } + if param.Transform.Type != "" { + stageType = StageTransform + } + if param.Extract.Type != "" { + stageType = StageExtract + } + if param.Encode.Type != "" { + stageType = StageEncode + } + if param.Write.Type != "" { + stageType = StageWrite + } + return ¶m, stageType + } + } + return nil, "" +} + // NewPipeline defines the pipeline elements func NewPipeline() (*Pipeline, error) { log.Debugf("entering NewPipeline") - ingester, _ := getIngester() - decoder, _ := getDecoder() - transformers, _ := getTransformers() - writer, _ := getWriter() - extractor, _ := getExtractor() - encoder, _ := getEncoder() + pipelineEntryMap = make(map[string]*PipelineEntry) - p := &Pipeline{ - Ingester: ingester, - Decoder: decoder, - Transformers: transformers, - Extractor: extractor, - Encoder: encoder, - Writer: writer, + stages := config.PipeLine + log.Debugf("stages = %v ", stages) + configParams := config.Parameters + log.Debugf("configParams = %v ", configParams) + pipeline := Pipeline{ + pipelineStages: make([]*PipelineEntry, 0), + configStages: stages, + configParams: configParams, } - return p, nil + for _, stage := range stages { + log.Debugf("stage = %v", stage) + params, stageType := findStageParameters(stage, configParams) + if params == nil { + err := fmt.Errorf("parameters not defined for stage %s", stage.Name) + log.Errorf("%v", err) + return nil, err + } + pEntry := PipelineEntry{ + configStage: stage, + stageType: stageType, + nextStages: make([]*PipelineEntry, 0), + } + switch pEntry.stageType { + case StageIngest: + pEntry.Ingester, _ = getIngester(stage, *params) + case StageDecode: + pEntry.Decoder, _ = getDecoder(stage, *params) + case StageTransform: + pEntry.Transformer, _ = getTransformer(stage, *params) + case StageExtract: + pEntry.Extractor, _ = getExtractor(stage, *params) + case StageEncode: + pEntry.Encoder, _ = getEncoder(stage, *params) + case StageWrite: + pEntry.Writer, _ = getWriter(stage, *params) + } + pipelineEntryMap[stage.Name] = &pEntry + pipeline.pipelineStages = append(pipeline.pipelineStages, &pEntry) + log.Debugf("pipeline = %v", pipeline.pipelineStages) + } + log.Debugf("pipeline = %v", pipeline.pipelineStages) + + // connect the stages one to another; find the beginning of the pipeline + setPipelineFlow(pipeline) + + return &pipeline, nil +} + +func setPipelineFlow(pipeline Pipeline) { + // verify and identify single Ingester. Assume a single ingester and single decoder for now + for i := range pipeline.pipelineStages { + stagei := pipeline.pipelineStages[i] + if stagei.stageType == StageIngest { + firstStage = stagei + // verify that no other stages are Ingester + for j := i + 1; j < len(pipeline.pipelineStages); j++ { + if pipeline.pipelineStages[j].stageType == StageIngest { + log.Errorf("only a single ingest stage is allowed") + os.Exit(1) + } + } + } else { + // set the follows field + follows, ok := pipelineEntryMap[stagei.configStage.Follows] + if !ok { + log.Errorf("follows stage %s is not yet defined for %s", stagei.configStage.Follows, stagei.configStage.Name) + os.Exit(1) + } + follows.nextStages = append(follows.nextStages, stagei) + log.Debugf("adding stage pEntry = %v to nextStages of follows = %v", stagei, follows) + } + } + if firstStage == nil { + log.Errorf("no ingest stage found") + os.Exit(1) + } + log.Debugf("firstStage = %v", firstStage) + secondStage := firstStage.nextStages[0] + if len(firstStage.nextStages) != 1 || secondStage.stageType != StageDecode { + log.Errorf("second stage is not decoder") + } } func (p *Pipeline) Run() { p.IsRunning = true - p.Ingester.Ingest(p.Process) + firstStage.Ingester.Ingest(p.Process) p.IsRunning = false } +func (p Pipeline) invokeStage(stage *PipelineEntry, entries []config.GenericMap) []config.GenericMap { + log.Debugf("entering invokeStage, stage = %s, type = %s", stage.configStage.Name, stage.stageType) + var out []config.GenericMap + switch stage.stageType { + case StageTransform: + out = transform.ExecuteTransform(stage.Transformer, entries) + case StageExtract: + out = stage.Extractor.Extract(entries) + case StageEncode: + out = stage.Encoder.Encode(entries) + case StageWrite: + out = stage.Writer.Write(entries) + } + return out +} + +func (p Pipeline) processStage(stage *PipelineEntry, entries []config.GenericMap) { + log.Debugf("entering processStage, stage = %s", stage.configStage.Name) + out := p.invokeStage(stage, entries) + if len(stage.nextStages) == 0 { + return + } else if len(stage.nextStages) == 1 { + p.processStage(stage.nextStages[0], out) + return + } + for nextStage := range stage.nextStages { + // make a separate copy of the input data for each following stage + entriesCopy := make([]config.GenericMap, len(out)) + copy(entriesCopy, out) + p.processStage(stage.nextStages[nextStage], entriesCopy) + } +} + // Process is called by the Ingester function func (p Pipeline) Process(entries []interface{}) { log.Debugf("entering pipeline.Process") log.Debugf("number of entries = %d", len(entries)) - decoded := p.Decoder.Decode(entries) - transformed := make([]config.GenericMap, 0) - var flowEntry config.GenericMap - for _, entry := range decoded { - flowEntry = transform.ExecuteTransforms(p.Transformers, entry) - transformed = append(transformed, flowEntry) + // already checked first item is an ingester and it has a single follower a decoder + // Assume for now we have a single decoder + log.Debugf("firstStage = %v", firstStage) + secondStage := firstStage.nextStages[0] + log.Debugf("secondStage = %v", secondStage) + log.Debugf("number of next stages = %d", len(secondStage.nextStages)) + out := secondStage.Decoder.Decode(entries) + if len(secondStage.nextStages) == 0 { + return + } else if len(secondStage.nextStages) == 1 { + p.processStage(secondStage.nextStages[0], out) + return + } + log.Debugf(" pipeline.Process, before for loop") + for nextStage := range secondStage.nextStages { + // make a separate copy of the input data for each following stage + entriesCopy := make([]config.GenericMap, len(out)) + copy(entriesCopy, out) + p.processStage(secondStage.nextStages[nextStage], entriesCopy) } - - _ = p.Writer.Write(transformed) - - extracted := p.Extractor.Extract(transformed) - _ = p.Encoder.Encode(extracted) } func (p *Pipeline) IsReady() healthcheck.Check { diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index a144b6210..98cf9c64f 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -18,7 +18,6 @@ package pipeline import ( - "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" @@ -29,6 +28,17 @@ import ( "testing" ) +var yamlConfigNoParams = ` +log-level: debug +pipeline: + - name: write1 +parameters: + - name: write1 + write: + type: loki + loki: +` + func Test_transformToLoki(t *testing.T) { var transformed []config.GenericMap input := config.GenericMap{"key": "value"} @@ -36,23 +46,34 @@ func Test_transformToLoki(t *testing.T) { require.NoError(t, err) transformed = append(transformed, transform.Transform(input)) - config.Opt.PipeLine.Write.Loki = "{}" - loki, err := write.NewWriteLoki() - loki.Write(transformed) + v := test.InitConfig(t, yamlConfigNoParams) + require.NotNil(t, v) + + loki, err := write.NewWriteLoki(config.Parameters[0]) require.NoError(t, err) + loki.Write(transformed) } const configTemplate = `--- log-level: debug pipeline: - ingest: - type: file - file: - filename: ../../hack/examples/ocp-ipfix-flowlogs.json - decode: - type: json - transform: - - type: generic + - name: ingest1 + - name: decode1 + follows: ingest1 + - name: transform1 + follows: decode1 +parameters: + - name: ingest1 + ingest: + type: file + file: + filename: ../../hack/examples/ocp-ipfix-flowlogs.json + - name: decode1 + decode: + type: json + - name: transform1 + transform: + type: generic generic: - input: Bytes output: flp_bytes @@ -66,31 +87,12 @@ pipeline: output: flp_srcAddr - input: SrcPort output: flp_srcPort - extract: - type: none - encode: - type: none - write: - type: none ` func Test_SimplePipeline(t *testing.T) { - var json = jsoniter.ConfigCompatibleWithStandardLibrary var mainPipeline *Pipeline var err error - var b []byte - v := test.InitConfig(t, configTemplate) - config.Opt.PipeLine.Ingest.Type = "file" - config.Opt.PipeLine.Decode.Type = "json" - config.Opt.PipeLine.Extract.Type = "none" - config.Opt.PipeLine.Encode.Type = "none" - config.Opt.PipeLine.Write.Type = "none" - config.Opt.PipeLine.Ingest.File.Filename = "../../hack/examples/ocp-ipfix-flowlogs.json" - - val := v.Get("pipeline.transform\n") - b, err = json.Marshal(&val) - require.NoError(t, err) - config.Opt.PipeLine.Transform = string(b) + test.InitConfig(t, configTemplate) mainPipeline, err = NewPipeline() require.NoError(t, err) @@ -99,9 +101,7 @@ func Test_SimplePipeline(t *testing.T) { // So we don't need to run it in a separate go-routine mainPipeline.Run() // What is there left to check? Check length of saved data of each stage in private structure. - ingester := mainPipeline.Ingester.(*ingest.IngestFile) - decoder := mainPipeline.Decoder.(*decode.DecodeJson) - writer := mainPipeline.Writer.(*write.WriteNone) + ingester := mainPipeline.pipelineStages[0].Ingester.(*ingest.IngestFile) + decoder := mainPipeline.pipelineStages[1].Decoder.(*decode.DecodeJson) require.Equal(t, len(ingester.PrevRecords), len(decoder.PrevRecords)) - require.Equal(t, len(ingester.PrevRecords), len(writer.PrevRecords)) } diff --git a/pkg/pipeline/transform/transform.go b/pkg/pipeline/transform/transform.go index 1a9621f29..a92725375 100644 --- a/pkg/pipeline/transform/transform.go +++ b/pkg/pipeline/transform/transform.go @@ -18,11 +18,9 @@ package transform import ( - "encoding/json" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" log "github.com/sirupsen/logrus" - "os" ) type Transformer interface { @@ -57,38 +55,12 @@ const ( OperationNone = "none" ) -func GetTransformers() ([]Transformer, error) { - log.Debugf("entering GetTransformers") - var transformers []Transformer - transformDefinitions := Definitions{} - transformListString := config.Opt.PipeLine.Transform - log.Debugf("transformListString = %v", transformListString) - err := json.Unmarshal([]byte(transformListString), &transformDefinitions) - if err != nil { - log.Errorf("error in unmarshalling yaml: %v", err) - os.Exit(1) +func ExecuteTransform(transformer Transformer, in []config.GenericMap) []config.GenericMap { + out := make([]config.GenericMap, 0) + var flowEntry config.GenericMap + for _, entry := range in { + flowEntry = transformer.Transform(entry) + out = append(out, flowEntry) } - log.Debugf("transformDefinitions = %v", transformDefinitions) - for _, item := range transformDefinitions { - var transformer Transformer - log.Debugf("item.Type = %v", item.Type) - switch item.Type { - case OperationGeneric: - transformer, _ = NewTransformGeneric(item.Generic) - case OperationNetwork: - transformer, _ = NewTransformNetwork(item.Network) - case OperationNone: - transformer, _ = NewTransformNone() - } - transformers = append(transformers, transformer) - } - log.Debugf("transformers = %v", transformers) - return transformers, nil -} - -func ExecuteTransforms(transformers []Transformer, flowEntry config.GenericMap) config.GenericMap { - for _, transformer := range transformers { - flowEntry = transformer.Transform(flowEntry) - } - return flowEntry + return out } diff --git a/pkg/pipeline/transform/transform_generic.go b/pkg/pipeline/transform/transform_generic.go index cc71cc951..a4ff23b49 100644 --- a/pkg/pipeline/transform/transform_generic.go +++ b/pkg/pipeline/transform/transform_generic.go @@ -24,7 +24,7 @@ import ( ) type Generic struct { - api.TransformGeneric + Rules []api.GenericTransformRule } // Transform transforms a flow to a new set of keys @@ -40,12 +40,10 @@ func (g *Generic) Transform(f config.GenericMap) config.GenericMap { } // NewTransformGeneric create a new transform -func NewTransformGeneric(generic api.TransformGeneric) (Transformer, error) { +func NewTransformGeneric(params config.StageParam) (Transformer, error) { log.Debugf("entering NewTransformGeneric") - log.Debugf("rules = %v", generic.Rules) - return &Generic{ - api.TransformGeneric{ - Rules: generic.Rules, - }, - }, nil + transformGeneric := &Generic{ + Rules: params.Transform.Generic.Rules, + } + return transformGeneric, nil } diff --git a/pkg/pipeline/transform/transform_generic_test.go b/pkg/pipeline/transform/transform_generic_test.go index 678b09430..d06811a1a 100644 --- a/pkg/pipeline/transform/transform_generic_test.go +++ b/pkg/pipeline/transform/transform_generic_test.go @@ -18,7 +18,6 @@ package transform import ( - "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/stretchr/testify/require" @@ -28,8 +27,11 @@ import ( const testConfigTransformGeneric = `--- log-level: debug pipeline: - transform: - - type: generic + - name: transform1 +parameters: + - name: transform1 + transform: + type: generic generic: rules: - input: srcIP @@ -58,9 +60,9 @@ func getGenericExpectedOutput() config.GenericMap { } func TestNewTransformGeneric(t *testing.T) { - newTransform := InitNewTransform(t, testConfigTransformGeneric) + newTransform := InitNewTransformGeneric(t, testConfigTransformGeneric) transformGeneric := newTransform.(*Generic) - require.Equal(t, len(transformGeneric.Rules), 6) + require.Len(t, transformGeneric.Rules, 6) input := test.GetIngestMockEntry(false) output := transformGeneric.Transform(input) @@ -68,17 +70,12 @@ func TestNewTransformGeneric(t *testing.T) { require.Equal(t, output, expectedOutput) } -func InitNewTransform(t *testing.T, configFile string) Transformer { +func InitNewTransformGeneric(t *testing.T, configFile string) Transformer { v := test.InitConfig(t, configFile) - val := v.Get("pipeline.transform") - var json = jsoniter.ConfigCompatibleWithStandardLibrary - b, err := json.Marshal(&val) - require.Equal(t, err, nil) + require.NotNil(t, v) - // perform initializations usually done in main.go - config.Opt.PipeLine.Transform = string(b) - - newTransforms, err := GetTransformers() - require.Equal(t, err, nil) - return newTransforms[0] + config := config.Parameters[0] + newTransform, err := NewTransformGeneric(config) + require.NoError(t, err) + return newTransform } diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index 291705848..80a1da0aa 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -152,11 +152,12 @@ func (n *Network) Transform(inputEntry config.GenericMap) config.GenericMap { } // NewTransformNetwork create a new transform -func NewTransformNetwork(jsonNetworkTransform api.TransformNetwork) (Transformer, error) { +func NewTransformNetwork(params config.StageParam) (Transformer, error) { var needToInitLocationDB = false var needToInitKubeData = false var needToInitConnectionTracking = false + jsonNetworkTransform := params.Transform.Network for _, rule := range jsonNetworkTransform.Rules { switch rule.Type { case api.TransformNetworkOperationName("AddLocation"): diff --git a/pkg/pipeline/transform/transform_network_test.go b/pkg/pipeline/transform/transform_network_test.go index 5e0c6ef88..4e32236f7 100644 --- a/pkg/pipeline/transform/transform_network_test.go +++ b/pkg/pipeline/transform/transform_network_test.go @@ -18,7 +18,6 @@ package transform import ( - jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/location" @@ -166,26 +165,25 @@ func Test_NewTransformNetwork(t *testing.T) { var yamlConfig = []byte(` log-level: debug pipeline: - transform: - - type: network + - name: transform1 + - name: write1 + follows: transform1 +parameters: + - name: transform1 + transform: + type: network network: rules: - input: srcIP output: subnetSrcIP type: add_subnet parameters: /24 - write: - type: stdout + - name: write1 + write: + type: stdout `) - v := test.InitConfig(t, string(yamlConfig)) - val := v.Get("pipeline.transform") - var json = jsoniter.ConfigCompatibleWithStandardLibrary - b, err := json.Marshal(&val) - require.Equal(t, err, nil) - config.Opt.PipeLine.Transform = string(b) - - newNetworkTransform := InitNewTransform(t, string(yamlConfig)).(*Network) - require.Equal(t, err, nil) + newNetworkTransform := InitNewTransformNetwork(t, string(yamlConfig)).(*Network) + require.NotNil(t, newNetworkTransform) entry := test.GetIngestMockEntry(false) output := newNetworkTransform.Transform(entry) @@ -194,22 +192,37 @@ pipeline: require.Equal(t, "10.0.0.0/24", output["subnetSrcIP"]) } +func InitNewTransformNetwork(t *testing.T, configFile string) Transformer { + v := test.InitConfig(t, configFile) + require.NotNil(t, v) + config := config.Parameters[0] + newTransform, err := NewTransformNetwork(config) + require.NoError(t, err) + return newTransform +} + func Test_ConnTrackingTransformNetwork(t *testing.T) { var yamlConfig = []byte(` log-level: debug pipeline: - transform: - - type: network + - name: transform1 + - name: write1 + follows: transform1 +parameters: + - name: transform1 + transform: + type: network network: rules: - input: "{{.srcIP}},{{.srcPort}},{{.dstIP}},{{.dstPort}},{{.protocol}}" output: isNewFlow type: conn_tracking parameters: "777" - write: - type: stdout + - name: write1 + write: + type: stdout `) - newNetworkTransform := InitNewTransform(t, string(yamlConfig)).(*Network) + newNetworkTransform := InitNewTransformNetwork(t, string(yamlConfig)).(*Network) require.NotNil(t, newNetworkTransform) // first time flow is new @@ -227,8 +240,13 @@ func Test_TransformNetworkDependentRulesAddRegExIf(t *testing.T) { var yamlConfig = []byte(` log-level: debug pipeline: - transform: - - type: network + - name: transform1 + - name: write1 + follows: transform1 +parameters: + - name: transform1 + transform: + type: network network: rules: - input: srcIP @@ -243,18 +261,12 @@ pipeline: output: match-11.0.* type: add_regex_if parameters: 11.0.* - write: - type: stdout + - name: write1 + write: + type: stdout `) - v := test.InitConfig(t, string(yamlConfig)) - val := v.Get("pipeline.transform") - var json = jsoniter.ConfigCompatibleWithStandardLibrary - b, err := json.Marshal(&val) - require.Equal(t, err, nil) - config.Opt.PipeLine.Transform = string(b) - - newNetworkTransform := InitNewTransform(t, string(yamlConfig)).(*Network) - require.Equal(t, err, nil) + newNetworkTransform := InitNewTransformNetwork(t, string(yamlConfig)).(*Network) + require.NotNil(t, newNetworkTransform) entry := test.GetIngestMockEntry(false) output := newNetworkTransform.Transform(entry) diff --git a/pkg/pipeline/transform/transform_multiple_test.go b/pkg/pipeline/transform_multiple_test.go similarity index 53% rename from pkg/pipeline/transform/transform_multiple_test.go rename to pkg/pipeline/transform_multiple_test.go index d44827443..a862251e4 100644 --- a/pkg/pipeline/transform/transform_multiple_test.go +++ b/pkg/pipeline/transform_multiple_test.go @@ -15,11 +15,9 @@ * */ -package transform +package pipeline import ( - "github.com/json-iterator/go" - "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/stretchr/testify/require" "testing" @@ -28,8 +26,27 @@ import ( const testConfigTransformMultiple = `--- log-level: debug pipeline: - transform: - - type: generic + - name: ingest1 + - name: decode1 + follows: ingest1 + - name: transform1 + follows: decode1 + - name: transform2 + follows: transform1 + - name: transform3 + follows: transform2 +parameters: + - name: ingest1 + ingest: + type: file + file: + filename: ../../hack/examples/ocp-ipfix-flowlogs.json + - name: decode1 + decode: + type: json + - name: transform1 + transform: + type: generic generic: rules: - input: srcIP @@ -42,8 +59,12 @@ pipeline: output: SrcPort - input: protocol output: Protocol - - type: none - - type: generic + - name: transform2 + transform: + type: none + - name: transform3 + transform: + type: generic generic: rules: - input: SrcAddr @@ -58,37 +79,17 @@ pipeline: output: Protocol2 ` -func getMultipleExpectedOutput() config.GenericMap { - return config.GenericMap{ - "SrcAddr2": "10.0.0.1", - "SrcPort2": 11777, - "Protocol2": "tcp", - "DstAddr2": "20.0.0.2", - "DstPort2": 22, - } -} - -func InitMultipleTransforms(t *testing.T, configFile string) []Transformer { - v := test.InitConfig(t, configFile) - val := v.Get("pipeline.transform") - var json = jsoniter.ConfigCompatibleWithStandardLibrary - b, err := json.Marshal(&val) - require.Equal(t, err, nil) - - // perform initializations usually done in main.go - config.Opt.PipeLine.Transform = string(b) - - newTransforms, err := GetTransformers() - require.Equal(t, err, nil) - return newTransforms -} +func TestTransformMultiple(t *testing.T) { + var mainPipeline *Pipeline + var err error + v := test.InitConfig(t, testConfigTransformMultiple) + require.NotNil(t, v) -func TestNewTransformMultiple(t *testing.T) { - newTransforms := InitMultipleTransforms(t, testConfigTransformMultiple) - require.Equal(t, len(newTransforms), 3) + mainPipeline, err = NewPipeline() + require.NoError(t, err) - input := test.GetIngestMockEntry(false) - output := ExecuteTransforms(newTransforms, input) - expectedOutput := getMultipleExpectedOutput() - require.Equal(t, output, expectedOutput) + // The file ingester reads the entire file, pushes it down the pipeline, and then exits + // So we don't need to run it in a separate go-routine + mainPipeline.Run() + // TODO: We should test the final outcome to see that it is reasonable } diff --git a/pkg/pipeline/utils/exit.go b/pkg/pipeline/utils/exit.go index f524bb49c..ce52e43c4 100644 --- a/pkg/pipeline/utils/exit.go +++ b/pkg/pipeline/utils/exit.go @@ -1,3 +1,20 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package utils import ( diff --git a/pkg/pipeline/utils/params_parse.go b/pkg/pipeline/utils/params_parse.go new file mode 100644 index 000000000..ae028238f --- /dev/null +++ b/pkg/pipeline/utils/params_parse.go @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package utils + +import ( + "encoding/json" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + log "github.com/sirupsen/logrus" +) + +// ParamString returns its corresponding (json) string from config.parameters for specified params structure +func ParamString(params config.StageParam, stage string, stageType string) string { + log.Debugf("entering paramString") + log.Debugf("params = %v, stage = %s, stageType = %s", params, stage, stageType) + + var configMap []map[string]interface{} + var err error + err = json.Unmarshal([]byte(config.Opt.Parameters), &configMap) + if err != nil { + return "" + } + log.Debugf("configMap = %v", configMap) + + var returnBytes []byte + for index := range config.Parameters { + paramsEntry := &config.Parameters[index] + if params.Name == paramsEntry.Name { + log.Debugf("paramsEntry = %v", paramsEntry) + log.Debugf("data[index][stage] = %v", configMap[index][stage]) + // convert back to string + subField := configMap[index][stage].(map[string]interface{}) + log.Debugf("subField = %v", subField) + returnBytes, err = json.Marshal(subField[stageType]) + if err != nil { + return "" + } + break + } + } + log.Debugf("returnBytes = %s", string(returnBytes)) + return string(returnBytes) +} diff --git a/pkg/pipeline/write/write_loki.go b/pkg/pipeline/write/write_loki.go index 536153f78..8eb696663 100644 --- a/pkg/pipeline/write/write_loki.go +++ b/pkg/pipeline/write/write_loki.go @@ -228,10 +228,10 @@ func (l *Loki) processRecords() { } // NewWriteLoki creates a Loki writer from configuration -func NewWriteLoki() (*Loki, error) { +func NewWriteLoki(params config.StageParam) (*Loki, error) { log.Debugf("entering NewWriteLoki") - writeLokiString := config.Opt.PipeLine.Write.Loki + writeLokiString := pUtils.ParamString(params, "write", "loki") log.Debugf("writeLokiString = %s", writeLokiString) var jsonWriteLoki = api.GetWriteLokiDefaults() err := json.Unmarshal([]byte(writeLokiString), &jsonWriteLoki) @@ -239,6 +239,7 @@ func NewWriteLoki() (*Loki, error) { return nil, err } + // need to combine defaults with parameters that are provided in the config yaml file if err = jsonWriteLoki.Validate(); err != nil { return nil, fmt.Errorf("the provided config is not valid: %w", err) } diff --git a/pkg/pipeline/write/write_loki_test.go b/pkg/pipeline/write/write_loki_test.go index cb4a1db8d..6a3319300 100644 --- a/pkg/pipeline/write/write_loki_test.go +++ b/pkg/pipeline/write/write_loki_test.go @@ -18,12 +18,10 @@ package write import ( - "bytes" "encoding/json" "fmt" - jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/config" - "github.com/spf13/viper" + "github.com/netobserv/flowlogs-pipeline/pkg/test" "testing" "time" @@ -42,40 +40,31 @@ func (f *fakeEmitter) Handle(labels model.LabelSet, timestamp time.Time, record return a.Error(0) } -func initFromString(t *testing.T, yamlConfig []byte) string { - v := viper.New() - v.SetConfigType("yaml") - r := bytes.NewReader(yamlConfig) - err := v.ReadConfig(r) - require.Equal(t, err, nil) - val := v.Get("pipeline.write.loki") - var json = jsoniter.ConfigCompatibleWithStandardLibrary - b, err := json.Marshal(&val) - require.Equal(t, err, nil) - return string(b) -} - func Test_buildLokiConfig(t *testing.T) { - var yamlConfig = []byte(`log-level: debug + var yamlConfig = ` +log-level: debug pipeline: - write: - type: loki - loki: - tenantID: theTenant - url: "https://foo:8888/" - batchWait: 1m - minBackOff: 5s - labels: - - foo - - bar - staticLabels: - baz: bae - tiki: taka -`) - - config.Opt.PipeLine.Write.Loki = initFromString(t, yamlConfig) - - loki, err := NewWriteLoki() + - name: write1 +parameters: + - name: write1 + write: + type: loki + loki: + tenantID: theTenant + url: "https://foo:8888/" + batchWait: 1m + minBackOff: 5s + labels: + - foo + - bar + staticLabels: + baz: bae + tiki: taka +` + v := test.InitConfig(t, yamlConfig) + require.NotNil(t, v) + + loki, err := NewWriteLoki(config.Parameters[0]) require.NoError(t, err) assert.Equal(t, "https://foo:8888/loki/api/v1/push", loki.lokiConfig.URL.String()) @@ -88,23 +77,28 @@ pipeline: } func TestLoki_ProcessRecord(t *testing.T) { - var yamlConfig = []byte(`log-level: debug + var yamlConfig = ` +log-level: debug pipeline: - write: - type: loki - loki: - timestampLabel: ts - ignoreList: - - ignored - staticLabels: - static: label - labels: - - foo - - bar -`) - - config.Opt.PipeLine.Write.Loki = initFromString(t, yamlConfig) - loki, err := NewWriteLoki() + - name: write1 +parameters: + - name: write1 + write: + type: loki + loki: + timestampLabel: ts + ignoreList: + - ignored + staticLabels: + static: label + labels: + - foo + - bar +` + v := test.InitConfig(t, yamlConfig) + require.NotNil(t, v) + + loki, err := NewWriteLoki(config.Parameters[0]) require.NoError(t, err) fe := fakeEmitter{} @@ -146,13 +140,18 @@ func TestTimestampScale(t *testing.T) { t.Run(fmt.Sprintf("unit %v", testCase.unit), func(t *testing.T) { yamlConf := fmt.Sprintf(`log-level: debug pipeline: - write: - type: loki - loki: - timestampScale: %s + - name: write1 +parameters: + - name: write1 + write: + type: loki + loki: + timestampScale: %s `, testCase.unit) - config.Opt.PipeLine.Write.Loki = initFromString(t, []byte(yamlConf)) - loki, err := NewWriteLoki() + v := test.InitConfig(t, yamlConf) + require.NotNil(t, v) + + loki, err := NewWriteLoki(config.Parameters[0]) require.NoError(t, err) fe := fakeEmitter{} @@ -166,6 +165,17 @@ pipeline: } } +var yamlConfigNoParams = ` +log-level: debug +pipeline: + - name: write1 +parameters: + - name: write1 + write: + type: loki + loki: +` + // Tests those cases where the timestamp can't be extracted and reports the current time func TestTimestampExtraction_LocalTime(t *testing.T) { for _, testCase := range []struct { @@ -179,8 +189,10 @@ func TestTimestampExtraction_LocalTime(t *testing.T) { {name: "zero ts value", tsLabel: "ts", input: map[string]interface{}{"ts": 0}}, } { t.Run(testCase.name, func(t *testing.T) { - config.Opt.PipeLine.Write.Loki = initFromString(t, []byte("")) - loki, err := NewWriteLoki() + v := test.InitConfig(t, yamlConfigNoParams) + require.NotNil(t, v) + + loki, err := NewWriteLoki(config.Parameters[0]) require.NoError(t, err) loki.apiConfig.TimestampLabel = testCase.tsLabel @@ -203,19 +215,25 @@ func TestTimestampExtraction_LocalTime(t *testing.T) { // Tests that labels are sanitized before being sent to loki. // Labels that are invalid even if sanitized are ignored func TestSanitizedLabels(t *testing.T) { - var yamlConfig = []byte(`log-level: debug + var yamlConfig = ` +log-level: debug pipeline: - write: - type: loki - loki: - labels: - - "fo.o" - - "ba-r" - - "ba/z" - - "ignored?" -`) - config.Opt.PipeLine.Write.Loki = initFromString(t, yamlConfig) - loki, err := NewWriteLoki() + - name: write1 +parameters: + - name: write1 + write: + type: loki + loki: + labels: + - "fo.o" + - "ba-r" + - "ba/z" + - "ignored?" +` + v := test.InitConfig(t, yamlConfig) + require.NotNil(t, v) + + loki, err := NewWriteLoki(config.Parameters[0]) require.NoError(t, err) fe := fakeEmitter{} diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 77c32f9ee..a651f35e4 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -19,9 +19,12 @@ package test import ( "bytes" + "fmt" + jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/spf13/viper" "github.com/stretchr/testify/require" + "reflect" "testing" ) @@ -47,12 +50,53 @@ func GetIngestMockEntry(missingKey bool) config.GenericMap { } func InitConfig(t *testing.T, conf string) *viper.Viper { + var json = jsoniter.ConfigCompatibleWithStandardLibrary yamlConfig := []byte(conf) v := viper.New() v.SetConfigType("yaml") r := bytes.NewReader(yamlConfig) err := v.ReadConfig(r) - require.Equal(t, err, nil) + require.NoError(t, err) + + // set up global config info + // first clear out the config structures in case they were set by a previous instantiation + p1 := reflect.ValueOf(&config.PipeLine).Elem() + p1.Set(reflect.Zero(p1.Type())) + p2 := reflect.ValueOf(&config.Parameters).Elem() + p2.Set(reflect.Zero(p2.Type())) + + var b []byte + pipelineStr := v.Get("pipeline") + b, err = json.Marshal(&pipelineStr) + if err != nil { + fmt.Printf("error marshaling: %v\n", err) + return nil + } + config.Opt.PipeLine = string(b) + parametersStr := v.Get("parameters") + b, err = json.Marshal(¶metersStr) + if err != nil { + fmt.Printf("error marshaling: %v\n", err) + return nil + } + config.Opt.Parameters = string(b) + err = json.Unmarshal([]byte(config.Opt.PipeLine), &config.PipeLine) + if err != nil { + fmt.Printf("error unmarshaling: %v\n", err) + return nil + } + err = json.Unmarshal([]byte(config.Opt.Parameters), &config.Parameters) + if err != nil { + fmt.Printf("error unmarshaling: %v\n", err) + return nil + } + + err = config.ParseConfig() + if err != nil { + fmt.Printf("error in parsing config file: %v \n", err) + return nil + } + return v } diff --git a/playground/goflow3.yml b/playground/goflow3.yml index f84c70c40..4b30f926d 100644 --- a/playground/goflow3.yml +++ b/playground/goflow3.yml @@ -1,36 +1,45 @@ log-level: debug pipeline: - ingest: - type: file - file: - filename: playground/goflow2_input.txt - decode: - type: json - transform: - - type: generic + - name: ingest1 + - name: decode1 + follows: ingest1 + - name: generic1 + follows: decode1 + - name: write1 + follows: generic1 +parameters: + - name: ingest1 + ingest: + type: file + file: + filename: playground/goflow2_input.txt + - name: decode1 + decode: + type: json + - name: generic1 + transform: + type: generic generic: - - input: Bytes - output: flp_bytes - - input: DstAddr - output: flp_dstAddr - - input: DstHostIP - output: flp_dstHostIP - - input: DstPort - output: flp_dstPort - - input: Packets - output: flp_packets - - input: SrcAddr - output: flp_srcAddr - - input: SrcHostIP - output: flp_srcHostIP - - input: SrcPort - output: flp_srcPort - - input: TimeReceived - output: flp_timestamp - extract: - type: none - encode: - type: none - write: - type: stdout + rules: + - input: Bytes + output: flp_bytes + - input: DstAddr + output: flp_dstAddr + - input: DstHostIP + output: flp_dstHostIP + - input: DstPort + output: flp_dstPort + - input: Packets + output: flp_packets + - input: SrcAddr + output: flp_srcAddr + - input: SrcHostIP + output: flp_srcHostIP + - input: SrcPort + output: flp_srcPort + - input: TimeReceived + output: flp_timestamp + - name: write1 + write: + type: stdout diff --git a/playground/goflow7.yml b/playground/goflow7.yml new file mode 100644 index 000000000..65474487e --- /dev/null +++ b/playground/goflow7.yml @@ -0,0 +1,60 @@ +log-level: info +pipeline: + - stage: ingest + name: ingest1 + - stage: decode + name: decode1 + follows: ingest1 + - stage: transform + name: generic1 + follows: decode1 + - stage: write + name: write1 + follows: generic1 + - stage: transform + name: generic2 + follows: decode1 + - stage: write + name: write2 + follows: generic2 +parameters: + - name: decode1 + decode: + type: json + - name: generic1 + transform: + type: generic + generic: + rules: + - input: Bytes + output: v1_bytes + - input: DstAddr + output: v1_dstAddr + - input: Packets + output: v1_packets + - input: SrcPort + output: v1_srcPort + - name: write1 + write: + type: stdout + - name: generic2 + transform: + type: generic + generic: + rules: + - input: Bytes + output: v2_bytes + - input: DstAddr + output: v2_dstAddr + - input: Packets + output: v2_packets + - input: SrcPort + output: v2_srcPort + - name: ingest1 + ingest: + type: file_loop + file: + filename: playground/goflow2_input.txt + - name: write2 + write: + type: stdout diff --git a/playground/goflow8.yml b/playground/goflow8.yml new file mode 100644 index 000000000..a3bd348fa --- /dev/null +++ b/playground/goflow8.yml @@ -0,0 +1,54 @@ +log-level: info +pipeline: + - name: ingest1 + - name: decode1 + follows: ingest1 + - name: generic1 + follows: decode1 + - name: write1 + follows: generic1 + - name: generic2 + follows: decode1 + - name: write2 + follows: generic2 +parameters: + - name: decode1 + decode: + type: json + - name: generic1 + transform: + type: generic + generic: + rules: + - input: Bytes + output: v1_bytes + - input: DstAddr + output: v1_dstAddr + - input: Packets + output: v1_packets + - input: SrcPort + output: v1_srcPort + - name: write1 + write: + type: stdout + - name: generic2 + transform: + type: generic + generic: + rules: + - input: Bytes + output: v2_bytes + - input: DstAddr + output: v2_dstAddr + - input: Packets + output: v2_packets + - input: SrcPort + output: v2_srcPort + - name: ingest1 + ingest: + type: file_loop + file: + filename: playground/goflow2_input.txt + - name: write2 + write: + type: stdout diff --git a/playground/prom1.yml b/playground/prom1.yml index 8d2a7248e..55880bf40 100644 --- a/playground/prom1.yml +++ b/playground/prom1.yml @@ -1,13 +1,26 @@ log-level: debug pipeline: - ingest: - type: file_loop - file: - filename: playground/goflow2_input.txt - decode: - type: json - transform: - - type: generic + - name: ingest1 + - name: decode1 + follows: ingest1 + - name: transform1 + follows: decode1 + - name: encode1 + follows: transform1 + - name: write1 + follows: encode1 +parameters: + - name: ingest1 + ingest: + type: file_loop + file: + filename: playground/goflow2_input.txt + - name: decode1 + decode: + type: json + - name: transform1 + transform: + type: generic generic: rules: - input: Bytes @@ -28,33 +41,33 @@ pipeline: output: srcPort - input: TimeReceived output: timestamp - extract: - type: none - encode: - type: prom - prom: - port: 9102 - prefix: flp_ - expirytime: 15 - metrics: - - name: totalBytes - type: gauge - valuekey: bytes - labels: - - srcAddr - - dstAddr - - srcPort - - name: totalPackets - type: gauge - valuekey: packets - labels: - - srcAddr - - dstAddr - - dstPort - - name: subnetHistogram - type: histogram - valuekey: aggregate - labels: - write: - type: stdout + - name: encode1 + encode: + type: prom + prom: + port: 9102 + prefix: fl2m_ + expirytime: 15 + metrics: + - name: totalBytes + type: gauge + valuekey: bytes + labels: + - srcAddr + - dstAddr + - srcPort + - name: totalPackets + type: gauge + valuekey: packets + labels: + - srcAddr + - dstAddr + - dstPort + - name: subnetHistogram + type: histogram + valuekey: aggregate + labels: + - name: write1 + write: + type: stdout