Skip to content

Commit

Permalink
TxAlgorithm refinements. Keepalive. (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelquigley committed Aug 24, 2021
1 parent ffdc817 commit c80469c
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 17 deletions.
27 changes: 27 additions & 0 deletions algorithm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package dilithium

import "time"

// TxAlgorithm is an abstraction of an extensible flow-control implementation, which can be plugged into a TxPortal
// instance.
//
type TxAlgorithm interface {
Ready(int)
Tx(int)
Success(int)
DuplicateAck()
Retransmission(int)
UpdateRTT(rttMs int)
RetxMs() int
RxPortalSize() int
Profile() *TxProfile
}

// TxProfile defines all of the configurable values that are requested by a flow control algorithm.
//
type TxProfile struct {
RetxBatchMs int
SendKeepalive bool
ConnectionTimeout time.Duration
MaxTreeSize int
}
2 changes: 1 addition & 1 deletion txmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (txm *TxMonitor) run() {
for ; i < x; i ++ {
_, t := txm.waitlist.Peek()
delta := t.Sub(headline).Milliseconds()
if delta <= int64(txm.alg.RetxBatchMs()) {
if delta <= int64(txm.alg.Profile().RetxBatchMs) {
wm, _ := txm.waitlist.Next()
if wm.hasFlag(RTT) {
util.WriteUint16(wm.buf.Data[dataStart:], uint16(time.Now().UnixNano()/int64(time.Millisecond)))
Expand Down
49 changes: 35 additions & 14 deletions txportal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,65 @@ package dilithium

import (
"github.com/emirpasic/gods/trees/btree"
"github.com/emirpasic/gods/utils"
"github.com/sirupsen/logrus"
"sync"
"time"
)

// TxAlgorithm is an abstraction of an extensible flow-control implementation, which can be plugged into a TxPortal
// instance.
//
type TxAlgorithm interface {
Ready(int)
Tx(int)
Success(int)
DuplicateAck()
Retransmission(int)
UpdateRTT(rttMs int)
RetxMs() int
RetxBatchMs() int
}

// TxPortal manages the outgoing data transmitted by a communication instance. It is one half of a TxPortal->RxPortal
// communication pair. TxPortal is primarily concerned with optimizing the transmission rate over lossy Transport
// implementations, while ensuring reliability.
//
type TxPortal struct {
lock *sync.Mutex
tree *btree.Tree
lastTx time.Time
transport Transport
alg TxAlgorithm
monitor *TxMonitor
closer *Closer
closed bool
pool *Pool
}

func newTxPortal(transport Transport, alg TxAlgorithm, closer *Closer, pool *Pool) *TxPortal {
txp := &TxPortal{
lock: new(sync.Mutex),
tree: btree.NewWith(alg.Profile().MaxTreeSize, utils.Int32Comparator),
transport: transport,
alg: alg,
closer: closer,
}
txp.monitor = newTxMonitor(txp.lock, txp.alg, txp.transport)
//txp.monitor.setRetxCallback()
return txp
}

func (txp *TxPortal) start() {
txp.monitor.start()
if txp.alg.Profile().SendKeepalive {
go txp.keepaliveSender()
}
}

func (txp *TxPortal) keepaliveSender() {
logrus.Info("started")
defer logrus.Info("exited")

for {
time.Sleep(1 * time.Second)
if txp.closed {
return
}
if time.Since(txp.lastTx).Milliseconds() >= txp.alg.Profile().ConnectionTimeout.Milliseconds()/2 {
if keepalive, err := newKeepalive(txp.alg.RxPortalSize(), txp.pool); err == nil {
if err := writeWireMessage(keepalive, txp.transport); err == nil {
txp.lastTx = time.Now()
} else {
logrus.Errorf("error sending keepalive (%v)", err)
}
}
}
}
}
8 changes: 6 additions & 2 deletions westworld.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ func (self *WestworldAlgorithm) RetxMs() int {
return 200
}

func (self *WestworldAlgorithm) RetxBatchMs() int {
return 2
func (self *WestworldAlgorithm) RxPortalSize() int {
return 0
}

func (self *WestworldAlgorithm) Profile() *TxProfile {
return nil
}

type WestworldProfile struct {
Expand Down

0 comments on commit c80469c

Please sign in to comment.