-
Notifications
You must be signed in to change notification settings - Fork 29
/
grpc_proto.go
98 lines (88 loc) · 2.97 KB
/
grpc_proto.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package exporter
import (
"context"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/grpc"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
)
var glog = logrus.WithField("component", "exporter/GRPCProto")
// GRPCProto flow exporter. Its ExportFlows method accepts slices of *flow.Record
// by its input channel, converts them to *pbflow.Records instances, and submits
// them to the collector.
type GRPCProto struct {
hostPort string
clientConn *grpc.ClientConnection
}
func StartGRPCProto(hostPort string) (*GRPCProto, error) {
clientConn, err := grpc.ConnectClient(hostPort)
if err != nil {
return nil, err
}
return &GRPCProto{
hostPort: hostPort,
clientConn: clientConn,
}, nil
}
// ExportFlows accepts slices of *flow.Record by its input channel, converts them
// to *pbflow.Records instances, and submits them to the collector.
func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) {
log := glog.WithField("collector", g.hostPort)
for inputRecords := range input {
entries := make([]*pbflow.Record, 0, len(inputRecords))
for _, record := range inputRecords {
entries = append(entries, flowToPB(record))
}
log.Debugf("sending %d records", len(entries))
if _, err := g.clientConn.Client().Send(context.TODO(), &pbflow.Records{
Entries: entries,
}); err != nil {
log.WithError(err).Error("couldn't send flow records to collector")
}
}
if err := g.clientConn.Close(); err != nil {
log.WithError(err).Warn("couldn't close flow export client")
}
}
func flowToPB(fr *flow.Record) *pbflow.Record {
return &pbflow.Record{
EthProtocol: uint32(fr.Protocol),
Direction: pbflow.Direction(fr.Direction),
DataLink: &pbflow.DataLink{
SrcMac: macToUint64(&fr.DataLink.SrcMac),
DstMac: macToUint64(&fr.DataLink.DstMac),
},
// TODO: change this when se support IPV6 addresses
Network: &pbflow.Network{
SrcAddr: &pbflow.IP{IpFamily: &pbflow.IP_Ipv4{Ipv4: uint32(fr.Network.SrcAddr)}},
DstAddr: &pbflow.IP{IpFamily: &pbflow.IP_Ipv4{Ipv4: uint32(fr.Network.DstAddr)}},
},
Transport: &pbflow.Transport{
Protocol: uint32(fr.Transport.Protocol),
SrcPort: uint32(fr.Transport.SrcPort),
DstPort: uint32(fr.Transport.DstPort),
},
Bytes: uint64(fr.Bytes),
TimeFlowStart: ×tamppb.Timestamp{
Seconds: fr.TimeFlowStart.Unix(),
Nanos: int32(fr.TimeFlowStart.Nanosecond()),
},
TimeFlowEnd: ×tamppb.Timestamp{
Seconds: fr.TimeFlowEnd.Unix(),
Nanos: int32(fr.TimeFlowEnd.Nanosecond()),
},
Packets: uint64(fr.Packets),
Interface: fr.Interface,
}
}
// Mac bytes are encoded in the same order as in the array. This is, a Mac
// like 11:22:33:44:55:66 will be encoded as 0x112233445566
func macToUint64(m *flow.MacAddr) uint64 {
return uint64(m[5]) |
(uint64(m[4]) << 8) |
(uint64(m[3]) << 16) |
(uint64(m[2]) << 24) |
(uint64(m[1]) << 32) |
(uint64(m[0]) << 40)
}