-
Notifications
You must be signed in to change notification settings - Fork 34
Connection tracking - hash mechanism #201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
da38508
957823e
1da5f24
a6776a0
3bd4bfc
5c21b9f
ebe9aa8
a68f132
8a0532b
747a19b
74e8a18
909ef58
1a052c5
487643c
3e909c1
acb30a0
bb8c08a
862a303
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| /* | ||
| * Copyright (C) 2022 IBM, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| * | ||
| */ | ||
|
|
||
| package api | ||
|
|
||
| type ConnTrack struct { | ||
| KeyDefinition KeyDefinition `yaml:"keyDefinition" doc:"fields that are used to identify the connection"` | ||
| OutputRecordTypes []string `yaml:"outputRecordTypes" doc:"output record types to emit"` | ||
| OutputFields []OutputField `yaml:"outputFields" doc:"list of output fields"` | ||
| } | ||
|
|
||
| type KeyDefinition struct { | ||
| FieldGroups []FieldGroup `yaml:"fieldGroups" doc:"list of field group definitions"` | ||
| Hash ConnTrackHash `yaml:"hash" doc:"how to build the connection hash"` | ||
| } | ||
|
|
||
| type FieldGroup struct { | ||
| Name string `yaml:"name" doc:"field group name"` | ||
| Fields []string `yaml:"fields" doc:"list of fields in the group"` | ||
| } | ||
|
|
||
| // ConnTrackHash determines how to compute the connection hash. | ||
| // A and B are treated as the endpoints of the connection. | ||
| // When FieldGroupARef and FieldGroupBRef are set, the hash is computed in a way | ||
| // that flow logs from A to B will have the same hash as flow logs from B to A. | ||
| // When they are not set, a different hash will be computed for A->B and B->A, | ||
| // and they are tracked as different connections. | ||
| type ConnTrackHash struct { | ||
| FieldGroupRefs []string `yaml:"fieldGroupRefs" doc:"list of field group names to build the hash"` | ||
| FieldGroupARef string `yaml:"fieldGroupARef" doc:"field group name of endpoint A"` | ||
| FieldGroupBRef string `yaml:"fieldGroupBRef" doc:"field group name of endpoint B"` | ||
| } | ||
|
|
||
| type OutputField struct { | ||
| Name string `yaml:"name" doc:"output field name"` | ||
| Operation string `yaml:"operation" doc:"aggregate operation on the field value"` | ||
| SplitAB bool `yaml:"splitAB" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."` | ||
| Input string `yaml:"input" doc:"The input field to base the operation on. When omitted, 'name' is used"` | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| /* | ||
| * Copyright (C) 2022 IBM, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| * | ||
| */ | ||
|
|
||
| package conntrack | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "encoding/gob" | ||
| "fmt" | ||
| "hash" | ||
|
|
||
| "github.com/netobserv/flowlogs-pipeline/pkg/api" | ||
| "github.com/netobserv/flowlogs-pipeline/pkg/config" | ||
| log "github.com/sirupsen/logrus" | ||
| ) | ||
|
|
||
| type hashType []byte | ||
|
|
||
| // ComputeHash computes the hash of a flow log according to keyDefinition. | ||
| // Two flow logs will have the same hash if they belong to the same connection. | ||
| func ComputeHash(flowLog config.GenericMap, keyDefinition api.KeyDefinition, hasher hash.Hash) (hashType, error) { | ||
| fieldGroup2hash := make(map[string]hashType) | ||
|
|
||
| // Compute the hash of each field group | ||
| for _, fg := range keyDefinition.FieldGroups { | ||
| h, err := computeHashFields(flowLog, fg.Fields, hasher) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("compute hash: %w", err) | ||
| } | ||
| fieldGroup2hash[fg.Name] = h | ||
| } | ||
|
|
||
| // Compute the total hash | ||
| hasher.Reset() | ||
| for _, fgName := range keyDefinition.Hash.FieldGroupRefs { | ||
| hasher.Write(fieldGroup2hash[fgName]) | ||
| } | ||
| if keyDefinition.Hash.FieldGroupARef != "" { | ||
| hashA := fieldGroup2hash[keyDefinition.Hash.FieldGroupARef] | ||
| hashB := fieldGroup2hash[keyDefinition.Hash.FieldGroupBRef] | ||
| // Determine order between A's and B's hash to get the same hash for both flow logs from A to B and from B to A. | ||
| if bytes.Compare(hashA, hashB) < 0 { | ||
eranra marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| hasher.Write(hashA) | ||
| hasher.Write(hashB) | ||
| } else { | ||
| hasher.Write(hashB) | ||
| hasher.Write(hashA) | ||
| } | ||
| } | ||
| return hasher.Sum([]byte{}), nil | ||
| } | ||
|
|
||
| func computeHashFields(flowLog config.GenericMap, fieldNames []string, hasher hash.Hash) (hashType, error) { | ||
| hasher.Reset() | ||
| for _, fn := range fieldNames { | ||
| f, ok := flowLog[fn] | ||
| if !ok { | ||
| log.Warningf("Missing field %v", fn) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be risky. It could end up flooding the log file. Is there a way to avoid repeated logs?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can replace it with a prometheus metric. WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't believe this needs to be tracked as a metric. It doesn't seem like it's likely to occur but can if someone wants to be malicious. For now, maybe let it go, but we should have a general solution to avoid spamming the log file. |
||
| continue | ||
| } | ||
| bytes, err := toBytes(f) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| hasher.Write(bytes) | ||
| } | ||
| return hasher.Sum([]byte{}), nil | ||
| } | ||
|
|
||
| func toBytes(data interface{}) ([]byte, error) { | ||
| var buf bytes.Buffer | ||
| enc := gob.NewEncoder(&buf) | ||
| err := enc.Encode(data) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| bytes := buf.Bytes() | ||
| return bytes, nil | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.