From 588bcb54ffee7e6c694ad612fc251f69b96feccb Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Mon, 23 Aug 2021 13:39:13 -0400 Subject: [PATCH] TxMonitor elaboration. (#132) --- txmonitor.go | 88 ++++++++++++---------------------------------------- txportal.go | 2 +- waitlist.go | 69 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 69 deletions(-) create mode 100644 waitlist.go diff --git a/txmonitor.go b/txmonitor.go index 45b5d74..f4a4ada 100644 --- a/txmonitor.go +++ b/txmonitor.go @@ -1,78 +1,30 @@ package dilithium -import "time" +import ( + "sync" +) // TxMonitor is responsible for managing in-flight payloads, retransmitting payloads when their timeout expires. // type TxMonitor struct { - alg TxAlgorithm - transport Transport - rttAvg []uint16 - retxMs int -} - -type waitlist interface { - Add(*WireMessage, int, time.Time) - Update(int) - Remove(*WireMessage) - Size() int - Peek() (*WireMessage, time.Time) - Next() (*WireMessage, time.Time) -} - -type arrayWaitlist struct { - waitlist []*waitlistSubject -} - -type waitlistSubject struct { - deadline time.Time - retxMs int - wm *WireMessage -} - -func newArrayWaitlist() waitlist { - return &arrayWaitlist{} -} - -func (self *arrayWaitlist) Add(wm *WireMessage, retxMs int, t time.Time) { - self.waitlist = append(self.waitlist, &waitlistSubject{t, retxMs, wm}) -} - -func (self *arrayWaitlist) Update(retxMs int) { - for _, waiter := range self.waitlist { - delta := retxMs - waiter.retxMs - waiter.deadline.Add(time.Duration(delta) * time.Millisecond) - } -} - -func (self *arrayWaitlist) Remove(wm *WireMessage) { - i := -1 - for i = 0; i < len(self.waitlist); i++ { - if self.waitlist[i].wm == wm { - break - } - } - if i > -1 { - self.waitlist = append(self.waitlist[:i], self.waitlist[i+1:]...) + lock *sync.Mutex + ready *sync.Cond + alg TxAlgorithm + transport Transport + rttAvg []uint16 + retxMs int + retxCallback func() +} + +func newTxMonitor(lock *sync.Mutex, alg TxAlgorithm, transport Transport) *TxMonitor { + return &TxMonitor{ + lock: lock, + ready: sync.NewCond(lock), + alg: alg, + transport: transport, } } -func (self *arrayWaitlist) Size() int { - return len(self.waitlist) +func (txm *TxMonitor) setRetxCallback(c func()) { + txm.retxCallback = c } - -func (self *arrayWaitlist) Peek() (*WireMessage, time.Time) { - if len(self.waitlist) < 1 { - return nil, time.Time{} - } - return self.waitlist[0].wm, self.waitlist[0].deadline -} - -func (self *arrayWaitlist) Next() (*WireMessage, time.Time) { - if len(self.waitlist) < 1 { - return nil, time.Time{} - } - next := self.waitlist[0] - self.waitlist = self.waitlist[1:] - return next.wm, next.deadline -} \ No newline at end of file diff --git a/txportal.go b/txportal.go index 409a6e5..e0207e4 100644 --- a/txportal.go +++ b/txportal.go @@ -35,6 +35,6 @@ func newTxPortal(transport Transport, alg TxAlgorithm, closer *Closer, pool *Poo alg: alg, closer: closer, } - // txp.monitor = + txp.monitor = newTxMonitor(txp.lock, txp.alg, txp.transport) return txp } diff --git a/waitlist.go b/waitlist.go new file mode 100644 index 0000000..0a13c6b --- /dev/null +++ b/waitlist.go @@ -0,0 +1,69 @@ +package dilithium + +import "time" + +type waitlist interface { + Add(*WireMessage, int, time.Time) + Update(int) + Remove(*WireMessage) + Size() int + Peek() (*WireMessage, time.Time) + Next() (*WireMessage, time.Time) +} + +type arrayWaitlist struct { + waitlist []*waitlistSubject +} + +type waitlistSubject struct { + deadline time.Time + retxMs int + wm *WireMessage +} + +func newArrayWaitlist() waitlist { + return &arrayWaitlist{} +} + +func (self *arrayWaitlist) Add(wm *WireMessage, retxMs int, t time.Time) { + self.waitlist = append(self.waitlist, &waitlistSubject{t, retxMs, wm}) +} + +func (self *arrayWaitlist) Update(retxMs int) { + for _, waiter := range self.waitlist { + delta := retxMs - waiter.retxMs + waiter.deadline.Add(time.Duration(delta) * time.Millisecond) + } +} + +func (self *arrayWaitlist) Remove(wm *WireMessage) { + i := -1 + for i = 0; i < len(self.waitlist); i++ { + if self.waitlist[i].wm == wm { + break + } + } + if i > -1 { + self.waitlist = append(self.waitlist[:i], self.waitlist[i+1:]...) + } +} + +func (self *arrayWaitlist) Size() int { + return len(self.waitlist) +} + +func (self *arrayWaitlist) Peek() (*WireMessage, time.Time) { + if len(self.waitlist) < 1 { + return nil, time.Time{} + } + return self.waitlist[0].wm, self.waitlist[0].deadline +} + +func (self *arrayWaitlist) Next() (*WireMessage, time.Time) { + if len(self.waitlist) < 1 { + return nil, time.Time{} + } + next := self.waitlist[0] + self.waitlist = self.waitlist[1:] + return next.wm, next.deadline +}