Skip to content

Commit

Permalink
TxMonitor elaboration. (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelquigley committed Aug 23, 2021
1 parent 482a533 commit 588bcb5
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 69 deletions.
88 changes: 20 additions & 68 deletions txmonitor.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,30 @@
package dilithium

import "time"
import (
"sync"
)

// TxMonitor is responsible for managing in-flight payloads, retransmitting payloads when their timeout expires.
//
type TxMonitor struct {
alg TxAlgorithm
transport Transport
rttAvg []uint16
retxMs int
}

type waitlist interface {
Add(*WireMessage, int, time.Time)
Update(int)
Remove(*WireMessage)
Size() int
Peek() (*WireMessage, time.Time)
Next() (*WireMessage, time.Time)
}

type arrayWaitlist struct {
waitlist []*waitlistSubject
}

type waitlistSubject struct {
deadline time.Time
retxMs int
wm *WireMessage
}

func newArrayWaitlist() waitlist {
return &arrayWaitlist{}
}

func (self *arrayWaitlist) Add(wm *WireMessage, retxMs int, t time.Time) {
self.waitlist = append(self.waitlist, &waitlistSubject{t, retxMs, wm})
}

func (self *arrayWaitlist) Update(retxMs int) {
for _, waiter := range self.waitlist {
delta := retxMs - waiter.retxMs
waiter.deadline.Add(time.Duration(delta) * time.Millisecond)
}
}

func (self *arrayWaitlist) Remove(wm *WireMessage) {
i := -1
for i = 0; i < len(self.waitlist); i++ {
if self.waitlist[i].wm == wm {
break
}
}
if i > -1 {
self.waitlist = append(self.waitlist[:i], self.waitlist[i+1:]...)
lock *sync.Mutex
ready *sync.Cond
alg TxAlgorithm
transport Transport
rttAvg []uint16
retxMs int
retxCallback func()
}

func newTxMonitor(lock *sync.Mutex, alg TxAlgorithm, transport Transport) *TxMonitor {
return &TxMonitor{
lock: lock,
ready: sync.NewCond(lock),
alg: alg,
transport: transport,
}
}

func (self *arrayWaitlist) Size() int {
return len(self.waitlist)
func (txm *TxMonitor) setRetxCallback(c func()) {
txm.retxCallback = c
}

func (self *arrayWaitlist) Peek() (*WireMessage, time.Time) {
if len(self.waitlist) < 1 {
return nil, time.Time{}
}
return self.waitlist[0].wm, self.waitlist[0].deadline
}

func (self *arrayWaitlist) Next() (*WireMessage, time.Time) {
if len(self.waitlist) < 1 {
return nil, time.Time{}
}
next := self.waitlist[0]
self.waitlist = self.waitlist[1:]
return next.wm, next.deadline
}
2 changes: 1 addition & 1 deletion txportal.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ func newTxPortal(transport Transport, alg TxAlgorithm, closer *Closer, pool *Poo
alg: alg,
closer: closer,
}
// txp.monitor =
txp.monitor = newTxMonitor(txp.lock, txp.alg, txp.transport)
return txp
}
69 changes: 69 additions & 0 deletions waitlist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package dilithium

import "time"

type waitlist interface {
Add(*WireMessage, int, time.Time)
Update(int)
Remove(*WireMessage)
Size() int
Peek() (*WireMessage, time.Time)
Next() (*WireMessage, time.Time)
}

type arrayWaitlist struct {
waitlist []*waitlistSubject
}

type waitlistSubject struct {
deadline time.Time
retxMs int
wm *WireMessage
}

func newArrayWaitlist() waitlist {
return &arrayWaitlist{}
}

func (self *arrayWaitlist) Add(wm *WireMessage, retxMs int, t time.Time) {
self.waitlist = append(self.waitlist, &waitlistSubject{t, retxMs, wm})
}

func (self *arrayWaitlist) Update(retxMs int) {
for _, waiter := range self.waitlist {
delta := retxMs - waiter.retxMs
waiter.deadline.Add(time.Duration(delta) * time.Millisecond)
}
}

func (self *arrayWaitlist) Remove(wm *WireMessage) {
i := -1
for i = 0; i < len(self.waitlist); i++ {
if self.waitlist[i].wm == wm {
break
}
}
if i > -1 {
self.waitlist = append(self.waitlist[:i], self.waitlist[i+1:]...)
}
}

func (self *arrayWaitlist) Size() int {
return len(self.waitlist)
}

func (self *arrayWaitlist) Peek() (*WireMessage, time.Time) {
if len(self.waitlist) < 1 {
return nil, time.Time{}
}
return self.waitlist[0].wm, self.waitlist[0].deadline
}

func (self *arrayWaitlist) Next() (*WireMessage, time.Time) {
if len(self.waitlist) < 1 {
return nil, time.Time{}
}
next := self.waitlist[0]
self.waitlist = self.waitlist[1:]
return next.wm, next.deadline
}

0 comments on commit 588bcb5

Please sign in to comment.