Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ Following is the supported API format for the kafka ingest:
startoffset: FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition
batchreadtimeout: how often (in milliseconds) to process input
</pre>
## Ingest GRPC from Network Observability eBPF Agent
Following is the supported API format for the Network Observability eBPF ingest:

<pre>
grpc:
port: the port number to listen on
buffer_length: the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)
</pre>
## Aws ingest API
Following is the supported API format for Aws flow entries:

Expand Down Expand Up @@ -126,6 +134,13 @@ Following is the supported API format for writing to loki:
timestampLabel: label to use for time indexing
timestampScale: timestamp units scale (e.g. for UNIX = 1s)
</pre>
## Write Standard Output
Following is the supported API format for writing to standard output:

<pre>
stdout:
format: the format of each line: printf (default) or json
</pre>
## Aggregate metrics API
Following is the supported API format for specifying metrics aggregations:

Expand Down
13 changes: 5 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ require (
github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb
github.com/ip2location/ip2location-go/v9 v9.2.0
github.com/json-iterator/go v1.1.12
github.com/mariomac/guara v0.0.0-20220321135847-54b7fb6a8657
github.com/mitchellh/mapstructure v1.4.3
github.com/netobserv/gopipes v0.1.0
github.com/netobserv/gopipes v0.1.1
github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9
github.com/netobserv/netobserv-agent v0.0.0-20220328101628-406b2999d580
github.com/netsampler/goflow2 v1.0.5-0.20220106210010-20e8e567090c
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/common v0.32.1
Expand All @@ -20,7 +22,7 @@ require (
github.com/spf13/cobra v1.3.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.10.1
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
github.com/vladimirvivien/gexe v0.1.1
github.com/vmware/go-ipfix v0.5.12
golang.org/x/net v0.0.0-20220225172249-27dd8689420f
Expand All @@ -34,13 +36,10 @@ require (
)

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/alessio/shellescape v1.4.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.2.0 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-kit/log v0.2.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
Expand All @@ -57,7 +56,6 @@ require (
github.com/klauspost/compress v1.13.6 // indirect
github.com/libp2p/go-reuseport v0.1.0 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand All @@ -82,7 +80,7 @@ require (
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.43.0 // indirect
google.golang.org/grpc v1.45.0 // indirect
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand All @@ -93,7 +91,6 @@ require (
k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect
sigs.k8s.io/controller-runtime v0.11.0 // indirect
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
sigs.k8s.io/kind v0.11.0 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
202 changes: 10 additions & 192 deletions go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ type API struct {
KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"`
IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the netflow collector:\n"`
IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"`
DecodeAws DecodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"`
TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"`
TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"`
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`
WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"`
WriteStdout WriteStdout `yaml:"stdout" doc:"## Write Standard Output\nFollowing is the supported API format for writing to standard output:\n"`
ExtractAggregate AggregateDefinition `yaml:"aggregates" doc:"## Aggregate metrics API\nFollowing is the supported API format for specifying metrics aggregations:\n"`
}
6 changes: 6 additions & 0 deletions pkg/api/ingest_grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package api

type IngestGRPCProto struct {
Port int `yaml:"port" doc:"the port number to listen on"`
BufferLen int `yaml:"buffer_length" doc:"the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)"`
}
5 changes: 5 additions & 0 deletions pkg/api/write_stdout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package api

type WriteStdout struct {
Format string `yaml:"format" doc:"the format of each line: printf (default) or json"`
}
6 changes: 4 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Ingest struct {
File File
Collector api.IngestCollector
Kafka api.IngestKafka
GRPC api.IngestGRPCProto
}

type File struct {
Expand Down Expand Up @@ -98,8 +99,9 @@ type Encode struct {
}

type Write struct {
Type string
Loki api.WriteLoki
Type string
Loki api.WriteLoki
Stdout api.WriteStdout
}

// ParseConfig creates the internal unmarshalled representation from the Pipeline and Parameters json
Expand Down
85 changes: 85 additions & 0 deletions pkg/pipeline/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package decode

import (
"fmt"
"net"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/netobserv-agent/pkg/pbflow"
"github.com/sirupsen/logrus"
)

var pflog = logrus.WithField("component", "Protobuf")

// Protobuf decodes protobuf flow records definitions, as forwarded by
// ingest.NetObservAgent, into a Generic Map that follows the same naming conventions
// as the IPFIX flows from ingest.IngestCollector
type Protobuf struct {
}

func NewProtobuf() (Decoder, error) {
return &Protobuf{}, nil
}

// Decode decodes input strings to a list of flow entries
func (p *Protobuf) Decode(in []interface{}) []config.GenericMap {
if len(in) == 0 {
pflog.Warn("empty input. Skipping")
return []config.GenericMap{}
}
pb, ok := in[0].(*pbflow.Records)
if !ok {
pflog.WithField("type", fmt.Sprintf("%T", pb)).
Warn("expecting input to be *pbflow.Records. Skipping")
}
out := make([]config.GenericMap, 0, len(pb.Entries))
for _, entry := range pb.Entries {
out = append(out, pbFlowToMap(entry))
}
return out
}

func pbFlowToMap(flow *pbflow.Record) config.GenericMap {
if flow == nil {
return config.GenericMap{}
}
out := config.GenericMap{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be built somehow dynamic from https://github.com/netobserv/netobserv-agent/blob/main/proto/flow.proto ????

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would involve flattening/changing the flow.proto definition to match the map structure, and then create some sort of code generation or use reflection to create the map. Since the number of fields is still relatively small, I'd keep using this explicit conversion, and consider another approach in the future if we feel that the fields grow until making this conversion unmaintainable.

Copy link
Collaborator

@eranra eranra Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariomac this is exactly my fear :-( .... that this will grow and no one will be able to find where this came from and why this is not working. Any chance to someone connect that to the source (i.e. to the https://github.com/netobserv/netobserv-agent) project ... if not in code then at least with some remarks maybe?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, actually the *pbflow.Records type is imported from the github.com/netobserv/netobserv-agent directly, so IDE navigation leads you to the original protobuf definition (at least in Goland IDE).

But I agree that we should investigate a more automatic way to map protobuf --> generic map fields that does not penalize readability nor performance.

"FlowDirection": int(flow.Direction.Number()),
"Bytes": flow.Bytes,
"SrcAddr": ipToStr(flow.Network.GetSrcAddr()),
"DstAddr": ipToStr(flow.Network.GetDstAddr()),
"SrcMac": macToStr(flow.DataLink.GetSrcMac()),
"DstMac": macToStr(flow.DataLink.GetDstMac()),
"SrcPort": flow.Transport.GetSrcPort(),
"DstPort": flow.Transport.GetDstPort(),
"Etype": flow.EthProtocol,
"Packets": flow.Packets,
"Proto": flow.Transport.GetProtocol(),
"TimeFlowStart": flow.TimeFlowStart.GetSeconds(),
"TimeFlowEnd": flow.TimeFlowEnd.GetSeconds(),
"TimeReceived": time.Now().Unix(),
"Interface": flow.Interface,
}
return out
}

func ipToStr(ip *pbflow.IP) string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ipToStr and macToStr are generic, maybe move to some utility or common file

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept them as private functions because they rely on very concrete implementations of the encoding, as they were defined by us. I can move it to public functions if you think it's better to do it, but maybe they won't be used anywhere else and will increase the surface of the public API.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariomac ok ... so if they are specific to the protocol you are correct and they should be kept in this context ... the only sub-idea would be to move to a file called decode_protobuf_utils.go but this is really minor idea. resolving the conversation :-)

if ip.GetIpv6() != nil {
return net.IP(ip.GetIpv6()).String()
} else {
n := ip.GetIpv4()
return fmt.Sprintf("%d.%d.%d.%d",
byte(n>>24), byte(n>>16), byte(n>>8), byte(n))
}
}

func macToStr(mac uint64) string {
return fmt.Sprintf("%02X:%02X:%02X:%02X:%02X:%02X",
uint8(mac>>40),
uint8(mac>>32),
uint8(mac>>24),
uint8(mac>>16),
uint8(mac>>8),
uint8(mac))
}
66 changes: 66 additions & 0 deletions pkg/pipeline/decode/decode_protobuf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package decode

import (
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/netobserv-agent/pkg/pbflow"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestDecodePBFlows(t *testing.T) {
decoder := Protobuf{}

someTime := time.Now()
flow := &pbflow.Record{
Interface: "eth0",
EthProtocol: 2048,
Bytes: 456,
Packets: 123,
Direction: pbflow.Direction_EGRESS,
TimeFlowStart: timestamppb.New(someTime),
TimeFlowEnd: timestamppb.New(someTime),
Network: &pbflow.Network{
SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304},
},
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
SrcMac: 0x010203040506,
},
Transport: &pbflow.Transport{
Protocol: 1,
SrcPort: 23000,
DstPort: 443,
},
}

out := decoder.Decode([]interface{}{&pbflow.Records{Entries: []*pbflow.Record{flow}}})
require.Len(t, out, 1)
assert.NotZero(t, out[0]["TimeReceived"])
delete(out[0], "TimeReceived")
assert.Equal(t, config.GenericMap{
"FlowDirection": 1,
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"SrcPort": uint32(23000),
"DstPort": uint32(443),
"Etype": uint32(2048),
"Packets": uint64(123),
"Proto": uint32(1),
"TimeFlowStart": someTime.Unix(),
"TimeFlowEnd": someTime.Unix(),
"Interface": "eth0",
}, out[0])

}
49 changes: 49 additions & 0 deletions pkg/pipeline/ingest/ingest_grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package ingest

import (
"fmt"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/netobserv-agent/pkg/grpc"
"github.com/netobserv/netobserv-agent/pkg/pbflow"
)

const defaultBufferLen = 100

// GRPCProtobuf ingests data from the NetObserv eBPF Agent, using Protocol Buffers over gRPC
type GRPCProtobuf struct {
collector *grpc.CollectorServer
flowPackets chan *pbflow.Records
}

func NewGRPCProtobuf(params config.StageParam) (*GRPCProtobuf, error) {
netObserv := params.Ingest.GRPC
if netObserv.Port == 0 {
return nil, fmt.Errorf("ingest port not specified")
}
bufLen := netObserv.BufferLen
if bufLen == 0 {
bufLen = defaultBufferLen
}
flowPackets := make(chan *pbflow.Records, bufLen)
collector, err := grpc.StartCollector(netObserv.Port, flowPackets)
if err != nil {
return nil, err
}
return &GRPCProtobuf{
collector: collector,
flowPackets: flowPackets,
}, nil
}

func (no *GRPCProtobuf) Ingest(out chan<- []interface{}) {
for fp := range no.flowPackets {
out <- []interface{}{fp}
}
}

func (no *GRPCProtobuf) Close() error {
err := no.collector.Close()
close(no.flowPackets)
return err
}
6 changes: 5 additions & 1 deletion pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ func getIngester(params config.StageParam) (ingest.Ingester, error) {
ingester, err = ingest.NewIngestCollector(params)
case "kafka":
ingester, err = ingest.NewIngestKafka(params)
case "grpc":
ingester, err = ingest.NewGRPCProtobuf(params)
default:
panic(fmt.Sprintf("`ingest` type %s not defined", params.Ingest.Type))
}
Expand All @@ -287,6 +289,8 @@ func getDecoder(params config.StageParam) (decode.Decoder, error) {
decoder, err = decode.NewDecodeJson()
case "aws":
decoder, err = decode.NewDecodeAws(params)
case "protobuf":
decoder, err = decode.NewProtobuf()
case "none":
decoder, err = decode.NewDecodeNone()
default:
Expand All @@ -300,7 +304,7 @@ func getWriter(params config.StageParam) (write.Writer, error) {
var err error
switch params.Write.Type {
case "stdout":
writer, err = write.NewWriteStdout()
writer, err = write.NewWriteStdout(params)
case "none":
writer, err = write.NewWriteNone()
case "loki":
Expand Down
Loading