From 417dc81b87c7faca6097df49dca8cc43e54bcef4 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Mon, 23 Aug 2021 15:03:05 -0400 Subject: [PATCH] 'Complete' TxMonitor. (#132) --- txmonitor.go | 92 ++++++++++++++++++++++++++++++++++++++++++++++++++-- txportal.go | 3 ++ westworld.go | 11 +++++++ 3 files changed, 104 insertions(+), 2 deletions(-) diff --git a/txmonitor.go b/txmonitor.go index f4a4ada..662f18c 100644 --- a/txmonitor.go +++ b/txmonitor.go @@ -1,7 +1,10 @@ package dilithium import ( + "github.com/openziti/dilithium/util" + "github.com/sirupsen/logrus" "sync" + "time" ) // TxMonitor is responsible for managing in-flight payloads, retransmitting payloads when their timeout expires. @@ -11,8 +14,8 @@ type TxMonitor struct { ready *sync.Cond alg TxAlgorithm transport Transport - rttAvg []uint16 - retxMs int + waitlist waitlist + closed bool retxCallback func() } @@ -28,3 +31,88 @@ func newTxMonitor(lock *sync.Mutex, alg TxAlgorithm, transport Transport) *TxMon func (txm *TxMonitor) setRetxCallback(c func()) { txm.retxCallback = c } + +func (txm *TxMonitor) start() { + go txm.run() +} + +func (txm *TxMonitor) add(wm *WireMessage) { + retxMs, deadline := txm.retxDeadline() + txm.waitlist.Add(wm, retxMs, deadline) + txm.ready.Broadcast() +} + +func (txm *TxMonitor) remove(wm *WireMessage) { + txm.waitlist.Remove(wm) +} + +func (txm *TxMonitor) close() { + txm.closed = true + txm.ready.Broadcast() +} + +func (txm *TxMonitor) retxDeadline() (int, time.Time) { + retxMs := txm.alg.RetxMs() + deadline := time.Now().Add(time.Duration(retxMs) * time.Millisecond) + return retxMs, deadline +} + +func (txm *TxMonitor) run() { + logrus.Info("started") + defer logrus.Warn("exited") + + for { + var headline time.Time + var timeout time.Duration + + txm.lock.Lock() + { + for txm.waitlist.Size() < 1 && !txm.closed { + txm.ready.Wait() + } + + if txm.closed { + txm.lock.Unlock() + return + } + + _, headline = txm.waitlist.Peek() + timeout = time.Until(headline) + } + txm.lock.Unlock() + + time.Sleep(timeout) + + txm.lock.Lock() + { + if txm.waitlist.Size() > 0 { + i := 0 + x := txm.waitlist.Size() + for ; i < x; i ++ { + _, t := txm.waitlist.Peek() + delta := t.Sub(headline).Milliseconds() + if delta <= int64(txm.alg.RetxBatchMs()) { + wm, _ := txm.waitlist.Next() + if wm.hasFlag(RTT) { + util.WriteUint16(wm.buf.Data[dataStart:], uint16(time.Now().UnixNano()/int64(time.Millisecond))) + } + + if err := writeWireMessage(wm, txm.transport); err != nil { + logrus.Errorf("retx (%v)", err) + } + if txm.retxCallback != nil { + txm.retxCallback() + } + + retxMs, deadline := txm.retxDeadline() + txm.waitlist.Add(wm, retxMs, deadline) + + } else { + break + } + } + } + } + txm.lock.Unlock() + } +} diff --git a/txportal.go b/txportal.go index e0207e4..616d7c1 100644 --- a/txportal.go +++ b/txportal.go @@ -13,6 +13,9 @@ type TxAlgorithm interface { Success(int) DuplicateAck() Retransmission(int) + UpdateRTT(rttMs int) + RetxMs() int + RetxBatchMs() int } // TxPortal manages the outgoing data transmitted by a communication instance. It is one half of a TxPortal->RxPortal diff --git a/westworld.go b/westworld.go index 06fbbcf..4a1279f 100644 --- a/westworld.go +++ b/westworld.go @@ -26,6 +26,17 @@ func (self *WestworldAlgorithm) DuplicateAck() { func (self *WestworldAlgorithm) Retransmission(size int) { } +func (self *WestworldAlgorithm) UpdateRTT(rttMs int) { +} + +func (self *WestworldAlgorithm) RetxMs() int { + return 200 +} + +func (self *WestworldAlgorithm) RetxBatchMs() int { + return 2 +} + type WestworldProfile struct { StartSize int MinSize int