Skip to content

Commit

Permalink
NETOBSERV-889: fix too many colons in address error (netobserv#98)
Browse files Browse the repository at this point in the history
Signed-off-by: msherif1234 <mmahmoud@redhat.com>
  • Loading branch information
msherif1234 authored and shach33 committed Apr 6, 2023
1 parent e04d40c commit d0129ca
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 31 deletions.
11 changes: 3 additions & 8 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
}
target := fmt.Sprintf("%s:%d", cfg.TargetHost, cfg.TargetPort)
grpcExporter, err := exporter.StartGRPCProto(target, cfg.GRPCMessageMaxFlows)
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -262,9 +261,7 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
}
target := fmt.Sprintf("%s:%d", cfg.TargetHost, cfg.TargetPort)

ipfix, err := exporter.StartIPFIXExporter(target, "udp")
ipfix, err := exporter.StartIPFIXExporter(cfg.TargetHost, cfg.TargetPort, "udp")
if err != nil {
return nil, err
}
Expand All @@ -274,9 +271,7 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
}
target := fmt.Sprintf("%s:%d", cfg.TargetHost, cfg.TargetPort)

ipfix, err := exporter.StartIPFIXExporter(target, "tcp")
ipfix, err := exporter.StartIPFIXExporter(cfg.TargetHost, cfg.TargetPort, "tcp")
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ func fromExternal(ipType string) (net.IP, error) {
// This will just establish an external dialer where we can pickup the external
// host address
addrStr := "8.8.8.8:80"
if ipType == IPTypeIPV6 {
// When IPType is "any" and we have interface with IPv6 address only then use ipv6 dns address
ip, _ := fromLocal(IPTypeIPV4)
if ipType == IPTypeIPV6 || (ipType == IPTypeAny && ip == nil) {
addrStr = "[2001:4860:4860::8888]:80"
}
conn, err := dial("udp", addrStr)
Expand Down
12 changes: 8 additions & 4 deletions pkg/exporter/grpc_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/grpc"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
"github.com/sirupsen/logrus"
)

Expand All @@ -14,20 +15,22 @@ var glog = logrus.WithField("component", "exporter/GRPCProto")
// by its input channel, converts them to *pbflow.Records instances, and submits
// them to the collector.
type GRPCProto struct {
hostPort string
hostIP string
hostPort int
clientConn *grpc.ClientConnection
// maxFlowsPerMessage limits the maximum number of flows per GRPC message.
// If a message contains more flows than this number, the GRPC message will be split into
// multiple messages.
maxFlowsPerMessage int
}

func StartGRPCProto(hostPort string, maxFlowsPerMessage int) (*GRPCProto, error) {
clientConn, err := grpc.ConnectClient(hostPort)
func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int) (*GRPCProto, error) {
clientConn, err := grpc.ConnectClient(hostIP, hostPort)
if err != nil {
return nil, err
}
return &GRPCProto{
hostIP: hostIP,
hostPort: hostPort,
clientConn: clientConn,
maxFlowsPerMessage: maxFlowsPerMessage,
Expand All @@ -37,7 +40,8 @@ func StartGRPCProto(hostPort string, maxFlowsPerMessage int) (*GRPCProto, error)
// ExportFlows accepts slices of *flow.Record by its input channel, converts them
// to *pbflow.Records instances, and submits them to the collector.
func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) {
log := glog.WithField("collector", g.hostPort)
socket := utils.GetSocket(g.hostIP, g.hostPort)
log := glog.WithField("collector", socket)
for inputRecords := range input {
for _, pbRecords := range flowsToPB(inputRecords, g.maxFlowsPerMessage) {
log.Debugf("sending %d records", len(pbRecords.Entries))
Expand Down
49 changes: 45 additions & 4 deletions pkg/exporter/grpc_proto_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package exporter

import (
"fmt"
"net"
"testing"
"time"
Expand All @@ -19,7 +18,7 @@ import (

const timeout = 2 * time.Second

func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
func TestIPv4GRPCProto_ExportFlows_AgentIP(t *testing.T) {
// start remote ingestor
port, err := test.FreeTCPPort()
require.NoError(t, err)
Expand All @@ -29,7 +28,7 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
defer coll.Close()

// Start GRPCProto exporter stage
exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port), 1000)
exporter, err := StartGRPCProto("127.0.0.1", port, 1000)
require.NoError(t, err)

// Send some flows to the input of the exporter stage
Expand Down Expand Up @@ -61,6 +60,48 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
}
}

func TestIPv6GRPCProto_ExportFlows_AgentIP(t *testing.T) {
// start remote ingestor
port, err := test.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
coll, err := grpc.StartCollector(port, serverOut)
require.NoError(t, err)
defer coll.Close()

// Start GRPCProto exporter stage
exporter, err := StartGRPCProto("::1", port, 1000)
require.NoError(t, err)

// Send some flows to the input of the exporter stage
flows := make(chan []*flow.Record, 10)
flows <- []*flow.Record{
{AgentIP: net.ParseIP("10.11.12.13")},
}
flows <- []*flow.Record{
{RawRecord: flow.RawRecord{Id: ebpf.BpfFlowId{EthProtocol: flow.IPv6Type}},
AgentIP: net.ParseIP("9999::2222")},
}
go exporter.ExportFlows(flows)

rs := test2.ReceiveTimeout(t, serverOut, timeout)
assert.Len(t, rs.Entries, 1)
r := rs.Entries[0]
assert.EqualValues(t, 0x0a0b0c0d, r.GetAgentIp().GetIpv4())

rs = test2.ReceiveTimeout(t, serverOut, timeout)
assert.Len(t, rs.Entries, 1)
r = rs.Entries[0]
assert.EqualValues(t, net.ParseIP("9999::2222"), r.GetAgentIp().GetIpv6())

select {
case rs = <-serverOut:
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
default:
//ok!
}
}

func TestGRPCProto_SplitLargeMessages(t *testing.T) {
// start remote ingestor
port, err := test.FreeTCPPort()
Expand All @@ -72,7 +113,7 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {

const msgMaxLen = 10000
// Start GRPCProto exporter stage
exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port), msgMaxLen)
exporter, err := StartGRPCProto("127.0.0.1", port, msgMaxLen)
require.NoError(t, err)

// Send a message much longer than the limit length
Expand Down
19 changes: 12 additions & 7 deletions pkg/exporter/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"

"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
"github.com/sirupsen/logrus"
"github.com/vmware/go-ipfix/pkg/entities"
ipfixExporter "github.com/vmware/go-ipfix/pkg/exporter"
Expand All @@ -16,7 +17,8 @@ var ilog = logrus.WithField("component", "exporter/IPFIXProto")
// compatible with OVN-K.

type IPFIX struct {
hostPort string
hostIP string
hostPort int
exporter *ipfixExporter.ExportingProcess
templateIDv4 uint16
templateIDv6 uint16
Expand Down Expand Up @@ -199,23 +201,24 @@ func SendTemplateRecordv6(log *logrus.Entry, exporter *ipfixExporter.ExportingPr
}

// Sends out Template record to the IPFIX collector
func StartIPFIXExporter(hostPort string, transportProto string) (*IPFIX, error) {
log := ilog.WithField("collector", hostPort)
func StartIPFIXExporter(hostIP string, hostPort int, transportProto string) (*IPFIX, error) {
socket := utils.GetSocket(hostIP, hostPort)
log := ilog.WithField("collector", socket)

registry.LoadRegistry()
// Create exporter using local server info
input := ipfixExporter.ExporterInput{
CollectorAddress: hostPort,
CollectorAddress: socket,
CollectorProtocol: transportProto,
ObservationDomainID: 1,
TempRefTimeout: 1,
}
exporter, err := ipfixExporter.InitExportingProcess(input)
if err != nil {
log.Fatalf("Got error when connecting to local server %s: %v", hostPort, err)
log.Fatalf("Got error when connecting to local server %s: %v", socket, err)
return nil, err
}
log.Infof("Created exporter connecting to local server with address: %s", hostPort)
log.Infof("Created exporter connecting to local server with address: %s", socket)

templateIDv4, entitiesV4, err := SendTemplateRecordv4(log, exporter)
if err != nil {
Expand All @@ -232,6 +235,7 @@ func StartIPFIXExporter(hostPort string, transportProto string) (*IPFIX, error)
log.Infof("entities v6 %+v", entitiesV6)

return &IPFIX{
hostIP: hostIP,
hostPort: hostPort,
exporter: exporter,
templateIDv4: templateIDv4,
Expand Down Expand Up @@ -338,7 +342,8 @@ func (ipf *IPFIX) sendDataRecord(log *logrus.Entry, record *flow.Record, v6 bool
// ExportFlows accepts slices of *flow.Record by its input channel, converts them
// to IPFIX Records, and submits them to the collector.
func (ipf *IPFIX) ExportFlows(input <-chan []*flow.Record) {
log := ilog.WithField("collector", ipf.hostPort)
socket := utils.GetSocket(ipf.hostIP, ipf.hostPort)
log := ilog.WithField("collector", socket)
for inputRecords := range input {
for _, record := range inputRecords {
if record.Id.EthProtocol == flow.IPv6Type {
Expand Down
6 changes: 4 additions & 2 deletions pkg/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpc

import (
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
Expand All @@ -13,9 +14,10 @@ type ClientConnection struct {
conn *grpc.ClientConn
}

func ConnectClient(address string) (*ClientConnection, error) {
func ConnectClient(hostIP string, hostPort int) (*ClientConnection, error) {
// TODO: allow configuring some options (keepalive, backoff...)
conn, err := grpc.Dial(address,
socket := utils.GetSocket(hostIP, hostPort)
conn, err := grpc.Dial(socket,
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
Expand Down
58 changes: 53 additions & 5 deletions pkg/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package grpc

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -22,7 +21,7 @@ func TestGRPCCommunication(t *testing.T) {
serverOut := make(chan *pbflow.Records)
_, err = StartCollector(port, serverOut)
require.NoError(t, err)
cc, err := ConnectClient(fmt.Sprintf("127.0.0.1:%d", port))
cc, err := ConnectClient("127.0.0.1", port)
require.NoError(t, err)
client := cc.Client()

Expand Down Expand Up @@ -110,7 +109,7 @@ func TestConstructorOptions(t *testing.T) {
return handler(ctx, req)
})))
require.NoError(t, err)
cc, err := ConnectClient(fmt.Sprintf("127.0.0.1:%d", port))
cc, err := ConnectClient("127.0.0.1", port)
require.NoError(t, err)
client := cc.Client()

Expand All @@ -127,14 +126,14 @@ func TestConstructorOptions(t *testing.T) {
}
}

func BenchmarkGRPCCommunication(b *testing.B) {
func BenchmarkIPv4GRPCCommunication(b *testing.B) {
port, err := test.FreeTCPPort()
require.NoError(b, err)
serverOut := make(chan *pbflow.Records, 1000)
collector, err := StartCollector(port, serverOut)
require.NoError(b, err)
defer collector.Close()
cc, err := ConnectClient(fmt.Sprintf("127.0.0.1:%d", port))
cc, err := ConnectClient("127.0.0.1", port)
require.NoError(b, err)
defer cc.Close()
client := cc.Client()
Expand Down Expand Up @@ -175,3 +174,52 @@ func BenchmarkGRPCCommunication(b *testing.B) {
<-serverOut
}
}

func BenchmarkIPv6GRPCCommunication(b *testing.B) {
port, err := test.FreeTCPPort()
require.NoError(b, err)
serverOut := make(chan *pbflow.Records, 1000)
collector, err := StartCollector(port, serverOut)
require.NoError(b, err)
defer collector.Close()
cc, err := ConnectClient("::1", port)
require.NoError(b, err)
defer cc.Close()
client := cc.Client()

f := &pbflow.Record{
EthProtocol: 2048,
Bytes: 456,
Flags: 1,
Direction: pbflow.Direction_EGRESS,
TimeFlowStart: timestamppb.Now(),
TimeFlowEnd: timestamppb.Now(),
Network: &pbflow.Network{
SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv6{Ipv6: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}},
},
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv6{Ipv6: []byte{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 24, 24, 25, 26}},
},
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
SrcMac: 0x665544332211,
},
Transport: &pbflow.Transport{
Protocol: 1,
SrcPort: 23000,
DstPort: 443,
},
}
records := &pbflow.Records{}
for i := 0; i < 100; i++ {
records.Entries = append(records.Entries, f)
}
for i := 0; i < b.N; i++ {
if _, err := client.Send(context.Background(), records); err != nil {
require.Fail(b, "error", err)
}
<-serverOut
}
}
16 changes: 16 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package utils

import (
"fmt"
"net"
)

// GetSocket returns socket string in the correct format based on address family
func GetSocket(hostIP string, hostPort int) string {
socket := fmt.Sprintf("%s:%d", hostIP, hostPort)
ipAddr := net.ParseIP(hostIP)
if ipAddr != nil && ipAddr.To4() == nil {
socket = fmt.Sprintf("[%s]:%d", hostIP, hostPort)
}
return socket
}

0 comments on commit d0129ca

Please sign in to comment.