Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ A possible output would look like:
{
"_RecordType": "endConnection",
"_HashId": "3e8ba98164baecaf",
"_IsFirst": true,
"SrcAddr": "10.0.0.1",
"SrcPort": 1234,
"DstAddr": "10.0.0.2",
Expand Down Expand Up @@ -646,6 +647,11 @@ Output fields that set `splitAB: true` (like in `Bytes`) are split into 2 fields
aggregate values separately based on direction A->B and B->A respectively.
When `splitAB` is absent, its default value is `false`.

The boolean field `_IsFirst` exists only in records of type `newConnection`, `updateConnection` and `endConnection`.
It is set to true only on the first record of the connection.
The `_IsFirst` fields is useful in cases where `newConnection` records are not outputted (to reduce the number output records)
and there is a need to count the total number of connections: simply counting `_IsFirst=true`

### Timebased TopK

It is sometimes desirable to return only a subset of records, such as those connections that use the most bandwidth.
Expand Down
1 change: 1 addition & 0 deletions pkg/api/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
const (
HashIdFieldName = "_HashId"
RecordTypeFieldName = "_RecordType"
IsFirstFieldName = "_IsFirst"
)

type ConnTrack struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/conntrack_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func TestConnTrack(t *testing.T) {
"TimeFlowStart": 1_637_501_079.0,
"_HashId": "d28db42bcd8aea8f",
"_RecordType": "endConnection",
"_IsFirst": false,
"numFlowLogs": 5.0,
}
// Wait for the record to be eventually forwarded to the writer
Expand Down
16 changes: 14 additions & 2 deletions pkg/pipeline/extract/conntrack/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type connection interface {
getNextUpdateReportTime() time.Time
toGenericMap() config.GenericMap
getHash() totalHashType
// markReported marks the connection as has been reported. That is, at least one connection record has been emitted
// for this connection (i.e. newConnection, updateConnection, endConnection).
// It returns true on the first invocation to indicate the first report. Otherwise, it returns false.
markReported() bool
}

type connType struct {
Expand All @@ -44,6 +48,7 @@ type connType struct {
aggFields map[string]float64
expiryTime time.Time
nextUpdateReportTime time.Time
isReported bool
}

func (c *connType) addAgg(fieldName string, initValue float64) {
Expand Down Expand Up @@ -95,15 +100,22 @@ func (c *connType) getHash() totalHashType {
return c.hash
}

func (c *connType) markReported() bool {
isFirst := !c.isReported
c.isReported = true
return isFirst
}

type connBuilder struct {
conn *connType
}

func NewConnBuilder() *connBuilder {
return &connBuilder{
conn: &connType{
aggFields: make(map[string]float64),
keys: config.GenericMap{},
aggFields: make(map[string]float64),
keys: config.GenericMap{},
isReported: false,
},
}
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/pipeline/extract/conntrack/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM
record := conn.toGenericMap()
addHashField(record, computedHash.hashTotal)
addTypeField(record, api.ConnTrackOutputRecordTypeName("NewConnection"))
isFirst := conn.markReported()
addIsFirstField(record, isFirst)
outputRecords = append(outputRecords, record)
ct.metrics.outputRecords.WithLabelValues("newConnection").Inc()
}
Expand Down Expand Up @@ -123,6 +125,11 @@ func (ct *conntrackImpl) popEndConnections() []config.GenericMap {
record := conn.toGenericMap()
addHashField(record, conn.getHash().hashTotal)
addTypeField(record, api.ConnTrackOutputRecordTypeName("EndConnection"))
var isFirst bool
if ct.shouldOutputEndConnection {
isFirst = conn.markReported()
}
addIsFirstField(record, isFirst)
outputRecords = append(outputRecords, record)
shouldDelete, shouldStop = true, false
} else {
Expand All @@ -144,6 +151,11 @@ func (ct *conntrackImpl) prepareUpdateConnectionRecords() []config.GenericMap {
record := conn.toGenericMap()
addHashField(record, conn.getHash().hashTotal)
addTypeField(record, api.ConnTrackOutputRecordTypeName("UpdateConnection"))
var isFirst bool
if ct.shouldOutputUpdateConnection {
isFirst = conn.markReported()
}
addIsFirstField(record, isFirst)
outputRecords = append(outputRecords, record)
newNextUpdate := ct.clock.Now().Add(ct.config.UpdateConnectionInterval.Duration)
ct.connStore.updateNextReportTime(conn.getHash().hashTotal, newNextUpdate)
Expand Down Expand Up @@ -236,3 +248,7 @@ func addHashField(record config.GenericMap, hashId uint64) {
func addTypeField(record config.GenericMap, recordType string) {
record[api.RecordTypeFieldName] = recordType
}

func addIsFirstField(record config.GenericMap, isFirst bool) {
record[api.IsFirstFieldName] = isFirst
}
Loading