Skip to content

Commit

Permalink
'Complete' TxMonitor. (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelquigley committed Aug 23, 2021
1 parent 588bcb5 commit 417dc81
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 2 deletions.
92 changes: 90 additions & 2 deletions txmonitor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package dilithium

import (
"github.com/openziti/dilithium/util"
"github.com/sirupsen/logrus"
"sync"
"time"
)

// TxMonitor is responsible for managing in-flight payloads, retransmitting payloads when their timeout expires.
Expand All @@ -11,8 +14,8 @@ type TxMonitor struct {
ready *sync.Cond
alg TxAlgorithm
transport Transport
rttAvg []uint16
retxMs int
waitlist waitlist
closed bool
retxCallback func()
}

Expand All @@ -28,3 +31,88 @@ func newTxMonitor(lock *sync.Mutex, alg TxAlgorithm, transport Transport) *TxMon
func (txm *TxMonitor) setRetxCallback(c func()) {
txm.retxCallback = c
}

func (txm *TxMonitor) start() {
go txm.run()
}

func (txm *TxMonitor) add(wm *WireMessage) {
retxMs, deadline := txm.retxDeadline()
txm.waitlist.Add(wm, retxMs, deadline)
txm.ready.Broadcast()
}

func (txm *TxMonitor) remove(wm *WireMessage) {
txm.waitlist.Remove(wm)
}

func (txm *TxMonitor) close() {
txm.closed = true
txm.ready.Broadcast()
}

func (txm *TxMonitor) retxDeadline() (int, time.Time) {
retxMs := txm.alg.RetxMs()
deadline := time.Now().Add(time.Duration(retxMs) * time.Millisecond)
return retxMs, deadline
}

func (txm *TxMonitor) run() {
logrus.Info("started")
defer logrus.Warn("exited")

for {
var headline time.Time
var timeout time.Duration

txm.lock.Lock()
{
for txm.waitlist.Size() < 1 && !txm.closed {
txm.ready.Wait()
}

if txm.closed {
txm.lock.Unlock()
return
}

_, headline = txm.waitlist.Peek()
timeout = time.Until(headline)
}
txm.lock.Unlock()

time.Sleep(timeout)

txm.lock.Lock()
{
if txm.waitlist.Size() > 0 {
i := 0
x := txm.waitlist.Size()
for ; i < x; i ++ {
_, t := txm.waitlist.Peek()
delta := t.Sub(headline).Milliseconds()
if delta <= int64(txm.alg.RetxBatchMs()) {
wm, _ := txm.waitlist.Next()
if wm.hasFlag(RTT) {
util.WriteUint16(wm.buf.Data[dataStart:], uint16(time.Now().UnixNano()/int64(time.Millisecond)))
}

if err := writeWireMessage(wm, txm.transport); err != nil {
logrus.Errorf("retx (%v)", err)
}
if txm.retxCallback != nil {
txm.retxCallback()
}

retxMs, deadline := txm.retxDeadline()
txm.waitlist.Add(wm, retxMs, deadline)

} else {
break
}
}
}
}
txm.lock.Unlock()
}
}
3 changes: 3 additions & 0 deletions txportal.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ type TxAlgorithm interface {
Success(int)
DuplicateAck()
Retransmission(int)
UpdateRTT(rttMs int)
RetxMs() int
RetxBatchMs() int
}

// TxPortal manages the outgoing data transmitted by a communication instance. It is one half of a TxPortal->RxPortal
Expand Down
11 changes: 11 additions & 0 deletions westworld.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ func (self *WestworldAlgorithm) DuplicateAck() {
func (self *WestworldAlgorithm) Retransmission(size int) {
}

func (self *WestworldAlgorithm) UpdateRTT(rttMs int) {
}

func (self *WestworldAlgorithm) RetxMs() int {
return 200
}

func (self *WestworldAlgorithm) RetxBatchMs() int {
return 2
}

type WestworldProfile struct {
StartSize int
MinSize int
Expand Down

0 comments on commit 417dc81

Please sign in to comment.