Skip to content

Commit

Permalink
feat: introduce full forwarding algorithm (#9)
Browse files Browse the repository at this point in the history
This algorithm is an evolution of the current link forwarding algorithm
attempting to make TCP behave ~reasonably.

We will soon replace the current implementation with a more complex one
that selects the right forwarding algorithm.
  • Loading branch information
bassosimone committed Mar 8, 2023
1 parent 5b12ed6 commit ed4f5e2
Show file tree
Hide file tree
Showing 3 changed files with 354 additions and 0 deletions.
41 changes: 41 additions & 0 deletions linkfwdcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,23 @@ package netem
//

import (
"math/rand"
"sort"
"sync"
"time"
)

// LinkFwdRNG is a [LinkFwdFunc] view of a [rand.Rand] abstracted for festability.
type LinkFwdRNG interface {
// Float64 is like [rand.Rand.Float64].
Float64() float64

// Int63n is like [rand.Rand.Int63n].
Int63n(n int64) int64
}

var _ LinkFwdRNG = &rand.Rand{}

// LinkFwdConfig contains config for frame forwarding algorithms. Make sure
// you initialize all the fields marked as MANDATORY.
type LinkFwdConfig struct {
Expand All @@ -18,6 +31,10 @@ type LinkFwdConfig struct {
// Logger is the MANDATORY logger.
Logger Logger

// NewLinkFwdRNG is an OPTIONAL factory that creates a new
// random number generator, used for writing tests.
NewLinkFwdRNG func() LinkFwdRNG

// OneWayDelay is the OPTIONAL link one-way delay.
OneWayDelay time.Duration

Expand All @@ -37,3 +54,27 @@ type LinkFwdConfig struct {

// LinkFwdFunc is type type of a link forwarding function.
type LinkFwdFunc func(cfg *LinkFwdConfig)

// newLinkFwdRNG creates a new [LinkFwdRNG]
func (cfg *LinkFwdConfig) newLinkgFwdRNG() LinkFwdRNG {
if cfg.NewLinkFwdRNG != nil {
return cfg.NewLinkFwdRNG()
}
return rand.New(rand.NewSource(time.Now().UnixNano()))
}

// maybeInspectWithDPI inspects a packet with DPI if configured.
func (cfg *LinkFwdConfig) maybeInspectWithDPI(payload []byte) (*DPIPolicy, bool) {
if cfg.DPIEngine != nil {
return cfg.DPIEngine.inspect(payload)
}
return nil, false
}

// linkFwdSortFrameSliceInPlace is a convenience function to sort
// a slice containing frames in place.
func linkFwdSortFrameSliceInPlace(frames []*Frame) {
sort.SliceStable(frames, func(i, j int) bool {
return frames[i].Deadline.Before(frames[j].Deadline)
})
}
187 changes: 187 additions & 0 deletions linkfwdfull.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package netem

//
// Link frame forwarding: full implementation
//

import (
"fmt"
"time"
)

// LinkFwdFull is a full implementation of link forwarding that
// deals with delays, packet losses, and DPI.
//
// The kind of half-duplex link modeled by this function will
// look much more like a shared geographical link than an
// ethernet link. For example, this link allows out-of-order
// delivery of packets.
func LinkFwdFull(cfg *LinkFwdConfig) {

//
// 🚨 This algorithm is a bit complex. Be careful to check
// you still preserve packet level properties after you have
// modified it. In particular, we care about:
//
// - jitter scattering packets to mitigate bursts;
//
// - packet pacing at the TX, also to mitigate bursts;
//
// - out-of-order delivery both at the TX and at the RX
// such that jitter actually works _and_ we can delay
// specific flows using DPI;
//
// - drop-tail, small-buffer TX queue discipline;
//
// - tcptrace seqeunce graphs genXXX cmd/calibrate should generally
// sustain losses and do fast recovery.
//

// informative logging
linkName := fmt.Sprintf(
"linkFwdFull %s<->%s",
cfg.Reader.InterfaceName(),
cfg.Writer.InterfaceName(),
)
cfg.Logger.Infof("netem: %s up", linkName)
defer cfg.Logger.Infof("netem: %s down", linkName)

// synchronize with stop
defer cfg.Wg.Done()

// outgoing contains outgoing frames
var outgoing []*Frame

// accouting for queued bytes
var queuedBytes int

// inflight contains the frames currently in flight
var inflight []*Frame

// We assume that we can send 100 bit/µs (i.e., 100 Mbit/s). We also assume
// that a packet is 1500 bytes (i.e., 12000 bits). The constant TX rate
// is 120µs, and our code wakes up every 120µs to check for I/O.
const bitsPerMicrosecond = 100
const constantRate = 120 * time.Microsecond

// We assume the TX buffer cannot hold more than this amount of bytes
const maxQueuedBytes = 1 << 16

// ticker to schedule I/O
ticker := time.NewTicker(constantRate)
defer ticker.Stop()

// random number generator for jitter and PLR
rng := cfg.newLinkgFwdRNG()

for {
select {
case <-cfg.Reader.StackClosed():
return

// Userspace handler
//
// Whenever there is an IP packet, we enqueue it into a virtual
// interface, account for the queuing delay, and moderate the queue
// to avoid the most severe bufferbloat.
case <-cfg.Reader.FrameAvailable():
frame, err := cfg.Reader.ReadFrameNonblocking()
if err != nil {
cfg.Logger.Warnf("netem: ReadFrameNonblocking: %s", err.Error())
continue
}

// drop incoming packet if the buffer is full
if queuedBytes > maxQueuedBytes {
continue
}

// avoid potential data races
frame = frame.ShallowCopy()

// create frame TX deadline accounting for time to send all the
// previously queued frames in the outgoing buffer
d := time.Now().Add(time.Duration(queuedBytes*8) / bitsPerMicrosecond)
frame.Deadline = d

// add to queue and wait for the TX to wakeup
outgoing = append(outgoing, frame)
queuedBytes += len(frame.Payload)

// Ticker to emulate (slotted) sending and receiving over the channel
case <-ticker.C:
// wake up the transmitter first
if len(outgoing) > 0 {
// avoid head of line blocking that may be caused by adding jitter
linkFwdSortFrameSliceInPlace(outgoing)

// if the front frame is still pending, waste a cycle
frame := outgoing[0]
if d := time.Until(frame.Deadline); d > 0 {
continue
}

// dequeue the first frame in the buffer
queuedBytes -= len(frame.Payload)
outgoing = outgoing[1:]

// add random jitter to offset the effect of bursts
jitter := time.Duration(rng.Int63n(1000)) * time.Microsecond

// compute baseline frame PLR
framePLR := cfg.PLR

// run the DPI engine, if configured
policy, match := cfg.maybeInspectWithDPI(frame.Payload)
if match {
frame.Flags |= policy.Flags
framePLR += policy.PLR
}

// check whether we need to drop this frame (we will drop it
// at the RX so we simulate it being dropped in flight)
if rng.Float64() < framePLR {
frame.Flags |= FrameFlagDrop
}

// create frame RX deadline
d := time.Now().Add(cfg.OneWayDelay + jitter)
frame.Deadline = d

// congratulations, the frame is now in flight 🚀
inflight = append(inflight, frame)
}

// now wake up the receiver
if len(inflight) > 0 {
// avoid head of line blocking that may be caused by adding jitter
linkFwdSortFrameSliceInPlace(inflight)

// if the front frame is still pending, waste a cycle
frame := inflight[0]
if d := time.Until(frame.Deadline); d > 0 {
continue
}

// the frame is no longer in flight
inflight = inflight[1:]

// don't leak the deadline to the destination NIC
frame.Deadline = time.Time{}

// deliver or drop the frame
linkFwdDeliveryOrDrop(cfg.Writer, frame)
}
}
}
}

// linkFwdDeliveryOrDrop delivers or drops a frame depending
// on the configured frame flags.
func linkFwdDeliveryOrDrop(writer WriteableNIC, frame *Frame) {
if frame.Flags&FrameFlagDrop == 0 {
_ = writer.WriteFrame(frame)
}
}

var _ = LinkFwdFunc(LinkFwdFull)
126 changes: 126 additions & 0 deletions linkfwdfull_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package netem

import (
"bytes"
"sort"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"
)

func TestLinkFwdFull(t *testing.T) {

// testcase describes a test case for [LinkFwdFull]
type testcase struct {
// name is the name of this test case
name string

// delay is the one-way delay to use for forwarding frames.
delay time.Duration

// contains the list of frames that we should emit
emit []*Frame

// expect contains the list of frames we expect
expect []*Frame

// expectRuntimeAtLeast is the minimum runtime we expect
// to see when running this test case
expectRuntimeAtLeast time.Duration
}

var testcases = []testcase{{
name: "when we send no frame",
delay: 0,
emit: []*Frame{},
expect: []*Frame{},
expectRuntimeAtLeast: 0,
}, {
name: "when we send some frames",
delay: time.Second,
emit: []*Frame{{
Deadline: time.Time{},
Flags: 0,
Payload: []byte("abcdef"),
}, {
Deadline: time.Time{},
Flags: 0,
Payload: []byte("ghi"),
}},
expect: []*Frame{{
Deadline: time.Time{},
Flags: 0,
Payload: []byte("abcdef"),
}, {
Deadline: time.Time{},
Flags: 0,
Payload: []byte("ghi"),
}},
expectRuntimeAtLeast: time.Second,
}}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
// create the NIC from which to read
reader := NewStaticReadableNIC("eth0", tc.emit...)

// create a NIC that will collect frames
writer := NewStaticWriteableNIC("eth1")

// create the link configuration
cfg := &LinkFwdConfig{
DPIEngine: nil,
Logger: &NullLogger{},
OneWayDelay: tc.delay,
PLR: 0,
Reader: reader,
Writer: writer,
Wg: &sync.WaitGroup{},
}

// save the time before starting the link
t0 := time.Now()

// run the link forwarding algorithm in the background
cfg.Wg.Add(1)
go LinkFwdFull(cfg)

// read the expected number of frames or timeout after a minute.
got := []*Frame{}
timer := time.NewTimer(time.Minute)
defer timer.Stop()
for len(got) < len(tc.expect) {
select {
case frame := <-writer.Frames():
got = append(got, frame)
case <-timer.C:
t.Fatal("we have been reading frames for too much time")
}
}

// tell the network stack it can shut down now.
reader.CloseNetworkStack()

// wait for the algorithm to terminate.
cfg.Wg.Wait()

elapsed := time.Since(t0)
if elapsed < tc.expectRuntimeAtLeast {
t.Fatal("expected runtime to be at least", tc.expectRuntimeAtLeast, "got", elapsed)
}

// sort the frames we obtained by payload because this
// forwarder may deliver them out of order
sort.SliceStable(got, func(i, j int) bool {
return bytes.Compare(got[i].Payload, got[j].Payload) < 0
})

// compare the frames we obtained.
if diff := cmp.Diff(tc.expect, got); diff != "" {
t.Fatal(diff)
}
})
}
}

0 comments on commit ed4f5e2

Please sign in to comment.