/
common.go
390 lines (334 loc) · 12.8 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
// Package common providers common OTG flow helper functions.
package common
import (
"fmt"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/open-traffic-generator/snappi/gosnappi/otg"
"github.com/openconfig/magna/lwotg"
"k8s.io/klog/v2"
)
var (
// packetBytes is the number of bytes to read from an input packet.
packetBytes int = 100
)
// hdrsFunc is a function which specifies which packet headers to create. It is
// also used to determine the correctness of the headers received.
type hdrsFunc func(*otg.Flow) ([]gopacket.SerializableLayer, error)
// matchFunc is a function which determines if a packet p matches the headers
// hdrs.
type matchFunc func([]gopacket.SerializableLayer, gopacket.Packet) bool
// bpfFunc is a function that generates a BPF filter that matches packets in
// hdrs.
type bpfFunc func([]gopacket.SerializableLayer) (string, error)
// HandleCreator is an interface for creating handles.
type HandleCreator interface {
CreateHandle(name string) (Port, error)
}
// Port is interface for reading and writing to a port.
type Port interface {
gopacket.PacketDataSource
// WritePacketData writes the frame to the port.
WritePacketData(data []byte) error
// Close closes the port.
Close()
// LinkType specifies the layer at which packet written to the port are processed.
LinkType() layers.LinkType
// SetBPFFilter sets a Berkeley Packet Filtering (BPF) filter for the port.
// When specified, only packets matching the BPF filter are read from the port.
SetBPFFilter(string) error
}
// pcapPort contains both an inactive and active handle.
type pcapPort struct {
*pcap.Handle
ih *pcap.InactiveHandle
}
// Close cleans up the inactive handle and closes the active one.
func (p *pcapPort) Close() {
p.ih.CleanUp()
p.Handle.Close()
}
// pcapCreator implements the PortCreator interface for pcap handles.
type pcapCreator struct {
}
var _ HandleCreator = &pcapCreator{}
// CreateHandle creates a new port using a pcap handle.
func (p *pcapCreator) CreateHandle(name string) (Port, error) {
ih, err := pcap.NewInactiveHandle(name)
if err != nil {
return nil, fmt.Errorf("cannot create handle, err: %v", err)
}
if err := ih.SetImmediateMode(true); err != nil {
return nil, fmt.Errorf("cannot set immediate mode on handle, err: %v", err)
}
if err := ih.SetPromisc(false); err != nil {
return nil, fmt.Errorf("cannot set promiscous mode, err: %v", err)
}
if err := ih.SetSnapLen(packetBytes); err != nil {
return nil, fmt.Errorf("cannot set packet length, err: %v", err)
}
handle, err := ih.Activate()
if err != nil {
return nil, fmt.Errorf("%s Tx error: %v", name, err)
}
return &pcapPort{
Handle: handle,
ih: ih,
}, nil
}
var handleCreator HandleCreator = &pcapCreator{}
// Handler creates a new flow generator function based on the header and match
// function provided.
func Handler(fn hdrsFunc, bpfFn bpfFunc, match matchFunc, reporter *Reporter) lwotg.FlowGeneratorFn {
return func(flow *otg.Flow, intfs []*lwotg.OTGIntf) (lwotg.TXRXFn, bool, error) {
hdrs, err := fn(flow)
if err != nil {
return nil, false, err
}
fc := NewCounters()
fc.Headers = hdrs
pps, err := Rate(flow, hdrs)
if err != nil {
return nil, false, fmt.Errorf("cannot calculate rate, %v", err)
}
packetsToSend, err := flowPackets(flow)
if err != nil {
return nil, false, fmt.Errorf("cannot extract number of flow packets, %v", err)
}
tx, rx, err := Ports(flow, intfs)
if err != nil {
return nil, false, fmt.Errorf("cannot determine ports, %v", err)
}
fc.Name = &val{s: flow.GetName(), ts: flowTimeFn()}
klog.Infof("generating flow %s: tx: %s, rx: %s, rate: %d pps", flow.GetName(), tx, rx, pps)
reporter.AddFlow(flow.GetName(), fc)
// TODO(robjs): In the future we should wrap the PCAP handle in a library so that we can test our
// logic by writing into a test. Today, we're relying on integration test coverage here.
genFunc := func(controllerID string, stop, rxReady chan struct{}) {
// Don't proceed to set up the transmit function until the listener has already been created
// and is listening, this avoids us sending packets into the void when we know no-one is listening
// for them to account the flow.
<-rxReady
f := reporter.Flow(flow.GetName())
klog.Infof("%s send function started.", flow.GetName())
f.clearStats(time.Now().UnixNano())
buf := gopacket.NewSerializeBuffer()
gopacket.SerializeLayers(buf, gopacket.SerializeOptions{
FixLengths: true,
ComputeChecksums: true,
}, hdrs...)
size := len(buf.Bytes())
klog.Infof("%s Tx interface %s", flow.GetName(), tx)
handle, err := handleCreator.CreateHandle(tx)
if err != nil {
klog.Errorf("failed to create handle, err: %v", err)
}
defer handle.Close()
f.setTransmit(true)
// runSentPackets is the total number of packets that we have sent this run - i.e., since we
// were asked to start transmitting.
runSentPackets := uint32(0)
// stopFlow indicates whether we should stop sending packets on the flow, it is set
// when the flow specification says that we should only send a limited number of
// packets.
var stopFlow bool
for {
select {
case <-stop:
klog.Infof("controller ID %s, flow %s, exiting on %s", controllerID, flow.GetName(), tx)
f.setTransmit(false)
return
default:
switch stopFlow {
case true:
// avoid busy looping.
time.Sleep(100 * time.Millisecond)
default:
klog.Infof("%s sending %d packets", flow.GetName(), pps)
sendStart := time.Now()
loopSentPackets := 0
for i := 1; i <= int(pps); i++ {
// packetsToSend == 0 means that we need to keep sending, as there is no limit specified
// by the user.
if packetsToSend != 0 && runSentPackets >= packetsToSend {
klog.Infof("%s: finished sending, sent %d packets", flow.GetName(), runSentPackets)
stopFlow = true
break
}
if err := handle.WritePacketData(buf.Bytes()); err != nil {
klog.Errorf("%s cannot write packet on interface %s, %v", flow.GetName(), tx, err)
return
}
runSentPackets += 1
loopSentPackets += 1
}
klog.Infof("%s: sent %d packets (total: %d) in %s", flow.GetName(), loopSentPackets, runSentPackets, time.Since(sendStart))
f.updateTx(int(loopSentPackets), size)
sleepDur := (1 * time.Second) - time.Since(sendStart)
time.Sleep(sleepDur)
}
}
}
}
recvFunc := func(controllerID string, stop, readyForTx chan struct{}) {
klog.Infof("%s receive function started on interface %s", flow.GetName(), rx)
handle, err := handleCreator.CreateHandle(rx)
if err != nil {
klog.Errorf("cannot create handle, err: %v", err)
return
}
defer handle.Close()
// Set a BPF filter on the handle, this avoids our code needing to compare every
// packet that it receives, and rather asks the PCAP library to filter for us.
// This avoids additional CPU being used by our code.
filter, err := bpfFn(hdrs)
switch {
case err != nil:
klog.Errorf("%s: cannot generate BPF filter, running unfiltered: %v", flow.GetName(), err)
case filter == "":
klog.Warningf("%s: filter was nil, all goroutines will receive all packets, possible scale reduction", flow.GetName())
default:
if err := handle.SetBPFFilter(filter); err != nil {
klog.Errorf("%s: cannot set packet filter, err: %v", flow.GetName(), err)
return
}
}
ps := gopacket.NewPacketSource(handle, handle.LinkType())
packetCh := ps.Packets()
f := reporter.Flow(flow.GetName())
// Close the readyForTx channel so that the transmitter knows that we are ready to
// receive packets.
close(readyForTx)
for {
select {
case <-stop:
klog.Infof("controller ID %s, flow %s, exiting on %s", controllerID, flow.GetName(), rx)
return
case p := <-packetCh:
if err := rxPacket(f, p, match(hdrs, p)); err != nil {
klog.Errorf("%s cannot receive packet on interface %s, %v", flow.GetName(), rx, err)
}
}
}
}
return func(tx, rx *lwotg.FlowController) {
// Make the channel that is used for co-ordination between the sender and receiver.
ch := make(chan struct{})
// TODO(robjs): Currently, we potentially leave a packet in the channel that is being
// read from since rx.Stop is acted on immediately. This can cause send+recv numbers
// not to match exactly just based on this timing. See https://github.com/openconfig/magna/pull/39
// for discussion of whether this is an issue.
go genFunc(tx.ID, tx.Stop, ch)
go recvFunc(rx.ID, rx.Stop, ch)
}, true, nil
}
}
// Ports returns the transmit and receive ports in terms of the
// physical interfaces of the underlying system for a flow.
func Ports(flow *otg.Flow, intfs []*lwotg.OTGIntf) (tx string, rx string, err error) {
if flow.GetTxRx() == nil || flow.GetTxRx().GetChoice() != otg.FlowTxRx_Choice_port {
return "", "", fmt.Errorf("unsupported type of Tx/Rx specification, %v", flow.GetTxRx())
}
txName := flow.GetTxRx().GetPort().GetTxName()
var rxName string
switch rxList := flow.GetTxRx().GetPort().GetRxNames(); len(rxList) {
case 0:
rxName = flow.GetTxRx().GetPort().GetRxName()
if rxName == "" {
return "", "", fmt.Errorf("flows specified single port, but it was not specified")
}
case 1:
rxName = flow.GetTxRx().GetPort().GetRxNames()[0]
default:
return "", "", fmt.Errorf("flows received at multiple ports are not supported, got: %d ports (%v)", len(rxList), rxList)
}
for _, i := range intfs {
if i.OTGPortName == txName {
tx = i.SystemName
}
if i.OTGPortName == rxName {
rx = i.SystemName
}
}
if tx == "" || rx == "" {
return "", "", fmt.Errorf("unknown interface, tx: %q, rx: %q", tx, rx)
}
return tx, rx, nil
}
// Rate returns the number of packets per second that should be sent
// for the flow to meet the specified rate. The specified headers are
// used where packet size calculations are required.
//
// It returns a default rate of 1000 packets per second per the OTG
// specification if there is no rate specified.
//
// TODO(robjs): support specifications other than simple PPS.
func Rate(flow *otg.Flow, hdrs []gopacket.SerializableLayer) (uint64, error) {
if flowT := flow.GetRate().GetChoice(); flowT != otg.FlowRate_Choice_pps && flowT != otg.FlowRate_Choice_unspecified {
return 0, fmt.Errorf("unsupported flow rate specification, %v", flowT)
}
pps := flow.GetRate().GetPps()
if pps == 0 {
return 1000, nil
}
return pps, nil
}
// flowPackets returns the number of packets that should be sent for a
// particular flow. If the specification is not provided it returns 0,
// which should be interpreted as sending continuous packets.
//
// This function does not support delay or gap specifications that are
// included in OTG, and will return an error for each.
func flowPackets(flow *otg.Flow) (uint32, error) {
if durT := flow.GetDuration().GetChoice(); durT != otg.FlowDuration_Choice_fixed_packets && durT != otg.FlowDuration_Choice_unspecified {
return 0, fmt.Errorf("unsupported flow duration %s", durT)
}
// 12 is the OTG default for the packet gap.
if (flow.GetDuration().GetFixedPackets().GetGap() != 0 && flow.GetDuration().GetFixedPackets().GetGap() != 12) || flow.GetDuration().GetFixedPackets().GetDelay() != nil {
return 0, fmt.Errorf("gap and delay specifications are unsupported, got gap: %v, delay: %v", flow.GetDuration().GetFixedPackets().GetGap(), flow.GetDuration().GetFixedPackets().GetDelay())
}
return flow.GetDuration().GetFixedPackets().GetPackets(), nil
}
var (
// timeFn is a function that returns a time.Time that can be overloaded in unit tests.
timeFn = time.Now
)
// rxPacket is called for each packet that is received. It takes arguments of the statistics
// tracking the flow, the set of headers that are expected, and the received packet.
func rxPacket(f *counters, p gopacket.Packet, match bool) error {
if !match {
return nil
}
f.updateRx(timeFn(), len(p.Data()))
return nil
}
// val is used to store a timestamped telemetry value.
type val struct {
// ts is the timestamp in nanoseconds since the unix epoch that the value was
// collected.
ts int64
// f is the value if it is of type float32.
f float32
// u is the value if it is of type uint64.
u uint64
// b is the value if it is of type bool.
b bool
// s is the value if it is of type string.
s string
}
// stats stores metrics that are tracked for an individual flow direction.
type stats struct {
// rate indicates the rate at which packets are being sent or received according
// to the specific context.
Rate *val
// octets indicates the total number of octets that have been sent.
Octets *val
// pkts indicates the total number of packets that have been sent.
Pkts *val
}
// OverrideHandleCreator sets a custom implementation of the handle creator.
func OverrideHandleCreator(pc HandleCreator) {
handleCreator = pc
}