Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/pipeline/conntrack/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (agg *aSum) update(conn connection, flowLog config.GenericMap, d direction)
outputField := agg.getOutputField(d)
v, err := agg.getInputFieldValue(flowLog)
if err != nil {
log.Errorf("error updating connection %v: %v", string(conn.getHash().hashTotal), err)
log.Errorf("error updating connection %x: %v", conn.getHash().hashTotal, err)
return
}
conn.updateAggValue(outputField, func(curr float64) float64 {
Expand All @@ -141,7 +141,7 @@ func (agg *aMin) update(conn connection, flowLog config.GenericMap, d direction)
outputField := agg.getOutputField(d)
v, err := agg.getInputFieldValue(flowLog)
if err != nil {
log.Errorf("error updating connection %v: %v", string(conn.getHash().hashTotal), err)
log.Errorf("error updating connection %x: %v", conn.getHash().hashTotal, err)
return
}

Expand All @@ -154,7 +154,7 @@ func (agg *aMax) update(conn connection, flowLog config.GenericMap, d direction)
outputField := agg.getOutputField(d)
v, err := agg.getInputFieldValue(flowLog)
if err != nil {
log.Errorf("error updating connection %v: %v", string(conn.getHash().hashTotal), err)
log.Errorf("error updating connection %x: %v", conn.getHash().hashTotal, err)
return
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/pipeline/conntrack/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *connType) toGenericMap() config.GenericMap {
}

func (c *connType) getHash() totalHashType {
return copyTotalHash(c.hash)
return c.hash

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a private type and attribute, maybe the getHash() method is not needed anymore and you can invoke directly c.hash

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want the API of connection to have the property that once a connection object is created, its hash is read-only.
The other types that interact with connection (e.g. conntrackImpl and aggregators) are doing so using the connection interface rather than the connType struct. So in the current implementation I can't remove the method.

}

// TODO: Should connBuilder get a file of its own?
Expand All @@ -100,8 +100,8 @@ func NewConnBuilder() *connBuilder {
}
}

func (cb *connBuilder) Hash(h *totalHashType) *connBuilder {
cb.conn.hash = copyTotalHash(*h)
func (cb *connBuilder) Hash(h totalHashType) *connBuilder {
cb.conn.hash = h
return cb
}

Expand Down
43 changes: 20 additions & 23 deletions pkg/pipeline/conntrack/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package conntrack

import (
"container/list"
"encoding/hex"
"fmt"
"hash"
"hash/fnv"
Expand Down Expand Up @@ -52,35 +51,34 @@ type ConnectionTracker interface {
// connectionStore provides both retrieving a connection by its hash and iterating connections sorted by their last
// update time.
type connectionStore struct {
// TODO: should the key of the map be a custom hashStrType instead of string?
hash2conn map[string]*list.Element
hash2conn map[uint64]*list.Element
connList *list.List
}

type processConnF func(connection) (shouldDelete, shouldStop bool)

func (ct *connectionStore) addConnection(hashStr string, conn connection) {
_, ok := ct.getConnection(hashStr)
func (ct *connectionStore) addConnection(hashId uint64, conn connection) {
_, ok := ct.getConnection(hashId)
if ok {
log.Errorf("BUG. connection with hash %v already exists in store. %v", hashStr, conn)
log.Errorf("BUG. connection with hash %x already exists in store. %v", hashId, conn)
}
e := ct.connList.PushBack(conn)
ct.hash2conn[hashStr] = e
ct.hash2conn[hashId] = e
}

func (ct *connectionStore) getConnection(hashStr string) (connection, bool) {
elem, ok := ct.hash2conn[hashStr]
func (ct *connectionStore) getConnection(hashId uint64) (connection, bool) {
elem, ok := ct.hash2conn[hashId]
if ok {
conn := elem.Value.(connection)
return conn, ok
}
return nil, ok
}

func (ct *connectionStore) updateConnectionTime(hashStr string, t time.Time) {
elem, ok := ct.hash2conn[hashStr]
func (ct *connectionStore) updateConnectionTime(hashId uint64, t time.Time) {
elem, ok := ct.hash2conn[hashId]
if !ok {
log.Errorf("BUG. connection hash %v doesn't exist", hashStr)
log.Errorf("BUG. connection hash %x doesn't exist", hashId)
}
elem.Value.(connection).setLastUpdate(t)
// move to end of list
Expand All @@ -92,7 +90,7 @@ func (ct *connectionStore) iterateOldToNew(f processConnF) {
conn := e.Value.(connection)
shouldDelete, shouldStop := f(conn)
if shouldDelete {
delete(ct.hash2conn, hex.EncodeToString(conn.getHash().hashTotal))
delete(ct.hash2conn, conn.getHash().hashTotal)
ct.connList.Remove(e)
}
if shouldStop {
Expand All @@ -103,7 +101,7 @@ func (ct *connectionStore) iterateOldToNew(f processConnF) {

func newConnectionStore() *connectionStore {
return &connectionStore{
hash2conn: make(map[string]*list.Element),
hash2conn: make(map[uint64]*list.Element),
connList: list.New(),
}
}
Expand All @@ -113,7 +111,7 @@ func newConnectionStore() *connectionStore {
type conntrackImpl struct {
clock clock.Clock
config api.ConnTrack
hashProvider func() hash.Hash32
hashProvider func() hash.Hash64
connStore *connectionStore
aggregators []aggregator
shouldOutputFlowLogs bool
Expand All @@ -132,16 +130,15 @@ func (ct *conntrackImpl) Track(flowLogs []config.GenericMap) []config.GenericMap
log.Warningf("skipping flow log %v: %v", fl, err)
continue
}
hashStr := hex.EncodeToString(computedHash.hashTotal)
conn, exists := ct.connStore.getConnection(hashStr)
conn, exists := ct.connStore.getConnection(computedHash.hashTotal)
if !exists {
builder := NewConnBuilder()
conn = builder.
Hash(computedHash).
KeysFrom(fl, ct.config.KeyDefinition).
Aggregators(ct.aggregators).
Build()
ct.connStore.addConnection(hashStr, conn)
ct.connStore.addConnection(computedHash.hashTotal, conn)
ct.updateConnection(conn, fl, computedHash)
if ct.shouldOutputNewConnection {
outputRecords = append(outputRecords, conn.toGenericMap())
Expand Down Expand Up @@ -181,18 +178,18 @@ func (ct *conntrackImpl) popEndConnections() []config.GenericMap {
return outputRecords
}

func (ct *conntrackImpl) updateConnection(conn connection, flowLog config.GenericMap, flowLogHash *totalHashType) {
func (ct *conntrackImpl) updateConnection(conn connection, flowLog config.GenericMap, flowLogHash totalHashType) {
d := ct.getFlowLogDirection(conn, flowLogHash)
for _, agg := range ct.aggregators {
agg.update(conn, flowLog, d)
}
ct.connStore.updateConnectionTime(hex.EncodeToString(flowLogHash.hashTotal), ct.clock.Now())
ct.connStore.updateConnectionTime(flowLogHash.hashTotal, ct.clock.Now())
}

func (ct *conntrackImpl) getFlowLogDirection(conn connection, flowLogHash *totalHashType) direction {
func (ct *conntrackImpl) getFlowLogDirection(conn connection, flowLogHash totalHashType) direction {
d := dirNA
if ct.config.KeyDefinition.Hash.FieldGroupARef != "" {
if areHashEqual(conn.getHash().hashA, flowLogHash.hashA) {
if conn.getHash().hashA == flowLogHash.hashA {
// A -> B
d = dirAB
} else {
Expand Down Expand Up @@ -233,7 +230,7 @@ func NewConnectionTrack(config api.ConnTrack, clock clock.Clock) (ConnectionTrac
clock: clock,
connStore: newConnectionStore(),
config: config,
hashProvider: fnv.New32a,
hashProvider: fnv.New64a,
aggregators: aggregators,
shouldOutputFlowLogs: shouldOutputFlowLogs,
shouldOutputNewConnection: shouldOutputNewConnection,
Expand Down
69 changes: 24 additions & 45 deletions pkg/pipeline/conntrack/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package conntrack

import (
"bytes"
"encoding/binary"
"encoding/gob"
"fmt"
"hash"
Expand All @@ -28,78 +29,50 @@ import (
log "github.com/sirupsen/logrus"
)

type hashType []byte

// TODO: what's a better name for this struct?
type totalHashType struct {
hashA hashType
hashB hashType
hashTotal hashType
}

func areHashEqual(h1, h2 hashType) bool {
if len(h1) != len(h2) {
return false
}
for i := range h1 {
if h1[i] != h2[i] {
return false
}
}
return true
}

func copyTotalHash(h totalHashType) totalHashType {
newHashA := make([]byte, len(h.hashA))
newHashB := make([]byte, len(h.hashB))
newHashTotal := make([]byte, len(h.hashTotal))
copy(newHashA, h.hashA)
copy(newHashB, h.hashB)
copy(newHashTotal, h.hashTotal)
return totalHashType{
hashA: newHashA,
hashB: newHashB,
hashTotal: newHashTotal,
}
hashA uint64
hashB uint64
hashTotal uint64
}

// ComputeHash computes the hash of a flow log according to keyDefinition.
// Two flow logs will have the same hash if they belong to the same connection.
func ComputeHash(flowLog config.GenericMap, keyDefinition api.KeyDefinition, hasher hash.Hash) (*totalHashType, error) {
fieldGroup2hash := make(map[string]hashType)
func ComputeHash(flowLog config.GenericMap, keyDefinition api.KeyDefinition, hasher hash.Hash64) (totalHashType, error) {
fieldGroup2hash := make(map[string]uint64)

// Compute the hash of each field group
for _, fg := range keyDefinition.FieldGroups {
h, err := computeHashFields(flowLog, fg.Fields, hasher)
if err != nil {
return nil, fmt.Errorf("compute hash: %w", err)
return totalHashType{}, fmt.Errorf("compute hash: %w", err)
}
fieldGroup2hash[fg.Name] = h
}

// Compute the total hash
th := &totalHashType{}
th := totalHashType{}
hasher.Reset()
for _, fgName := range keyDefinition.Hash.FieldGroupRefs {
hasher.Write(fieldGroup2hash[fgName])
hasher.Write(uint64ToBytes(fieldGroup2hash[fgName]))
}
if keyDefinition.Hash.FieldGroupARef != "" {
th.hashA = fieldGroup2hash[keyDefinition.Hash.FieldGroupARef]
th.hashB = fieldGroup2hash[keyDefinition.Hash.FieldGroupBRef]
// Determine order between A's and B's hash to get the same hash for both flow logs from A to B and from B to A.
if bytes.Compare(th.hashA, th.hashB) < 0 {
hasher.Write(th.hashA)
hasher.Write(th.hashB)
if th.hashA < th.hashB {
hasher.Write(uint64ToBytes(th.hashA))
hasher.Write(uint64ToBytes(th.hashB))
} else {
hasher.Write(th.hashB)
hasher.Write(th.hashA)
hasher.Write(uint64ToBytes(th.hashB))
hasher.Write(uint64ToBytes(th.hashA))
}
}
th.hashTotal = hasher.Sum([]byte{})
th.hashTotal = hasher.Sum64()
return th, nil
}

func computeHashFields(flowLog config.GenericMap, fieldNames []string, hasher hash.Hash) (hashType, error) {
func computeHashFields(flowLog config.GenericMap, fieldNames []string, hasher hash.Hash64) (uint64, error) {
hasher.Reset()
for _, fn := range fieldNames {
f, ok := flowLog[fn]
Expand All @@ -109,11 +82,17 @@ func computeHashFields(flowLog config.GenericMap, fieldNames []string, hasher ha
}
bytes, err := toBytes(f)
if err != nil {
return nil, err
return 0, err
}
hasher.Write(bytes)
}
return hasher.Sum([]byte{}), nil
return hasher.Sum64(), nil
}

func uint64ToBytes(data uint64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, data)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the encoding order (little vs big endian) does not matter here, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right.

return b
}

func toBytes(data interface{}) ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/conntrack/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/stretchr/testify/require"
)

var testHasher = fnv.New32a()
var testHasher = fnv.New64a()

func TestComputeHash_Unidirectional(t *testing.T) {
keyDefinition := api.KeyDefinition{
Expand Down