diff --git a/README.md b/README.md index badc43868..1cec31441 100644 --- a/README.md +++ b/README.md @@ -655,6 +655,7 @@ kubernetes kind create-kind-cluster Create cluster delete-kind-cluster Delete cluster + kind-load-image Load image to kind metrics generate-configuration Generate metrics configuration diff --git a/pkg/api/conn_track.go b/pkg/api/conn_track.go new file mode 100644 index 000000000..7b9517aeb --- /dev/null +++ b/pkg/api/conn_track.go @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +type ConnTrack struct { + KeyDefinition KeyDefinition `yaml:"keyDefinition" doc:"fields that are used to identify the connection"` + OutputRecordTypes []string `yaml:"outputRecordTypes" doc:"output record types to emit"` + OutputFields []OutputField `yaml:"outputFields" doc:"list of output fields"` +} + +type KeyDefinition struct { + FieldGroups []FieldGroup `yaml:"fieldGroups" doc:"list of field group definitions"` + Hash ConnTrackHash `yaml:"hash" doc:"how to build the connection hash"` +} + +type FieldGroup struct { + Name string `yaml:"name" doc:"field group name"` + Fields []string `yaml:"fields" doc:"list of fields in the group"` +} + +// ConnTrackHash determines how to compute the connection hash. +// A and B are treated as the endpoints of the connection. +// When FieldGroupARef and FieldGroupBRef are set, the hash is computed in a way +// that flow logs from A to B will have the same hash as flow logs from B to A. +// When they are not set, a different hash will be computed for A->B and B->A, +// and they are tracked as different connections. +type ConnTrackHash struct { + FieldGroupRefs []string `yaml:"fieldGroupRefs" doc:"list of field group names to build the hash"` + FieldGroupARef string `yaml:"fieldGroupARef" doc:"field group name of endpoint A"` + FieldGroupBRef string `yaml:"fieldGroupBRef" doc:"field group name of endpoint B"` +} + +type OutputField struct { + Name string `yaml:"name" doc:"output field name"` + Operation string `yaml:"operation" doc:"aggregate operation on the field value"` + SplitAB bool `yaml:"splitAB" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."` + Input string `yaml:"input" doc:"The input field to base the operation on. When omitted, 'name' is used"` +} diff --git a/pkg/pipeline/conntrack/hash.go b/pkg/pipeline/conntrack/hash.go new file mode 100644 index 000000000..331b94d7f --- /dev/null +++ b/pkg/pipeline/conntrack/hash.go @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package conntrack + +import ( + "bytes" + "encoding/gob" + "fmt" + "hash" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + log "github.com/sirupsen/logrus" +) + +type hashType []byte + +// ComputeHash computes the hash of a flow log according to keyDefinition. +// Two flow logs will have the same hash if they belong to the same connection. +func ComputeHash(flowLog config.GenericMap, keyDefinition api.KeyDefinition, hasher hash.Hash) (hashType, error) { + fieldGroup2hash := make(map[string]hashType) + + // Compute the hash of each field group + for _, fg := range keyDefinition.FieldGroups { + h, err := computeHashFields(flowLog, fg.Fields, hasher) + if err != nil { + return nil, fmt.Errorf("compute hash: %w", err) + } + fieldGroup2hash[fg.Name] = h + } + + // Compute the total hash + hasher.Reset() + for _, fgName := range keyDefinition.Hash.FieldGroupRefs { + hasher.Write(fieldGroup2hash[fgName]) + } + if keyDefinition.Hash.FieldGroupARef != "" { + hashA := fieldGroup2hash[keyDefinition.Hash.FieldGroupARef] + hashB := fieldGroup2hash[keyDefinition.Hash.FieldGroupBRef] + // Determine order between A's and B's hash to get the same hash for both flow logs from A to B and from B to A. + if bytes.Compare(hashA, hashB) < 0 { + hasher.Write(hashA) + hasher.Write(hashB) + } else { + hasher.Write(hashB) + hasher.Write(hashA) + } + } + return hasher.Sum([]byte{}), nil +} + +func computeHashFields(flowLog config.GenericMap, fieldNames []string, hasher hash.Hash) (hashType, error) { + hasher.Reset() + for _, fn := range fieldNames { + f, ok := flowLog[fn] + if !ok { + log.Warningf("Missing field %v", fn) + continue + } + bytes, err := toBytes(f) + if err != nil { + return nil, err + } + hasher.Write(bytes) + } + return hasher.Sum([]byte{}), nil +} + +func toBytes(data interface{}) ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(data) + if err != nil { + return nil, err + } + bytes := buf.Bytes() + return bytes, nil +} diff --git a/pkg/pipeline/conntrack/hash_test.go b/pkg/pipeline/conntrack/hash_test.go new file mode 100644 index 000000000..68cb0839c --- /dev/null +++ b/pkg/pipeline/conntrack/hash_test.go @@ -0,0 +1,244 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package conntrack + +import ( + "hash/fnv" + "testing" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/stretchr/testify/require" +) + +func NewFlowLog(srcIP string, srcPort int, dstIP string, dstPort int, protocol int, bytes, packets int) config.GenericMap { + return config.GenericMap{ + "SrcAddr": srcIP, + "SrcPort": srcPort, + "DstAddr": dstIP, + "DstPort": dstPort, + "Proto": protocol, + "Bytes": bytes, + "Packets": packets, + } +} + +var hasher = fnv.New32a() + +func TestComputeHash_Unidirectional(t *testing.T) { + keyDefinition := api.KeyDefinition{ + FieldGroups: []api.FieldGroup{ + { + Name: "src", + Fields: []string{ + "SrcAddr", + "SrcPort", + }, + }, + { + Name: "dst", + Fields: []string{ + "DstAddr", + "DstPort", + }, + }, + { + Name: "protocol", + Fields: []string{ + "Proto", + }, + }, + }, + Hash: api.ConnTrackHash{ + FieldGroupRefs: []string{"src", "dst", "protocol"}, + }, + } + ipA := "10.0.0.1" + ipB := "10.0.0.2" + portA := 9001 + portB := 9002 + protocolA := 6 + protocolB := 7 + table := []struct { + name string + flowLog1 config.GenericMap + flowLog2 config.GenericMap + sameHash bool + }{ + { + "Same IP, port and protocol", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipA, portA, ipB, portB, protocolA, 222, 11), + true, + }, + { + "Alternating ip+port", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipB, portB, ipA, portA, protocolA, 222, 11), + false, + }, + { + "Alternating ip", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipB, portA, ipA, portB, protocolA, 222, 11), + false, + }, + { + "Alternating port", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipA, portB, ipB, portA, protocolA, 222, 11), + false, + }, + { + "Same IP+port, different protocol", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipA, portA, ipB, portB, protocolB, 222, 11), + false, + }, + } + for _, test := range table { + t.Run(test.name, func(t *testing.T) { + h1, err1 := ComputeHash(test.flowLog1, keyDefinition, hasher) + h2, err2 := ComputeHash(test.flowLog2, keyDefinition, hasher) + require.NoError(t, err1) + require.NoError(t, err2) + if test.sameHash { + require.Equal(t, h1, h2) + } else { + require.NotEqual(t, h1, h2) + } + }) + } +} + +func TestComputeHash_Bidirectional(t *testing.T) { + keyDefinition := api.KeyDefinition{ + FieldGroups: []api.FieldGroup{ + { + Name: "src", + Fields: []string{ + "SrcAddr", + "SrcPort", + }, + }, + { + Name: "dst", + Fields: []string{ + "DstAddr", + "DstPort", + }, + }, + { + Name: "protocol", + Fields: []string{ + "Proto", + }, + }, + }, + Hash: api.ConnTrackHash{ + FieldGroupRefs: []string{"protocol"}, + FieldGroupARef: "src", + FieldGroupBRef: "dst", + }, + } + ipA := "10.0.0.1" + ipB := "10.0.0.2" + portA := 1 + portB := 9002 + protocolA := 6 + protocolB := 7 + table := []struct { + name string + flowLog1 config.GenericMap + flowLog2 config.GenericMap + sameHash bool + }{ + { + "Same IP, port and protocol", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipA, portA, ipB, portB, protocolA, 222, 11), + true, + }, + { + "Alternating ip+port", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipB, portB, ipA, portA, protocolA, 222, 11), + true, + }, + { + "Alternating ip", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipB, portA, ipA, portB, protocolA, 222, 11), + false, + }, + { + "Alternating port", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipA, portB, ipB, portA, protocolA, 222, 11), + false, + }, + { + "Same IP+port, different protocol", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipA, portA, ipB, portB, protocolB, 222, 11), + false, + }, + } + for _, test := range table { + t.Run(test.name, func(t *testing.T) { + h1, err1 := ComputeHash(test.flowLog1, keyDefinition, hasher) + h2, err2 := ComputeHash(test.flowLog2, keyDefinition, hasher) + require.NoError(t, err1) + require.NoError(t, err2) + if test.sameHash { + require.Equal(t, h1, h2) + } else { + require.NotEqual(t, h1, h2) + } + }) + } +} + +func TestComputeHash_MissingField(t *testing.T) { + keyDefinition := api.KeyDefinition{ + FieldGroups: []api.FieldGroup{ + { + Name: "src", + Fields: []string{ + "SrcAddr", + "Missing", + }, + }, + }, + Hash: api.ConnTrackHash{ + FieldGroupRefs: []string{"src"}, + }, + } + + ipA := "10.0.0.1" + ipB := "10.0.0.2" + portA := 1 + portB := 9002 + protocolA := 6 + + fl := NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22) + + h, err := ComputeHash(fl, keyDefinition, hasher) + require.NoError(t, err) + require.NotNil(t, h) +} diff --git a/pkg/pipeline/transform/connection_tracking/connection_tracking.go b/pkg/pipeline/transform/connection_tracking/connection_tracking.go index b5fb5a67a..0e8f045b6 100644 --- a/pkg/pipeline/transform/connection_tracking/connection_tracking.go +++ b/pkg/pipeline/transform/connection_tracking/connection_tracking.go @@ -15,6 +15,8 @@ * */ +// TODO: Delete this package once the connection tracking module is done. + package connection_tracking import (