Skip to content

Commit

Permalink
Missing wiring (RxPortal.rxer) for reading from the Adapter. (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelquigley committed Sep 17, 2021
1 parent bed118d commit 67c61cd
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 19 deletions.
2 changes: 1 addition & 1 deletion algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func DefaultTxProfile() *TxProfile {
return &TxProfile{
MaxSegmentSize: 1450,
RetxBatchMs: 2,
SendKeepalive: true,
SendKeepalive: false,
ConnectionTimeout: 15000,
MaxTreeSize: 64 * 1024,
ReadsQueueSize: 1024,
Expand Down
106 changes: 88 additions & 18 deletions rxportal.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"io"
"math"
"sync"
"time"
)

type RxPortal struct {
Expand All @@ -21,7 +20,8 @@ type RxPortal struct {
reads chan *RxRead
readBuffer *bytes.Buffer
rxPortalSize int
readPool *sync.Pool
rawReadPool *sync.Pool
readPool *Pool
ackPool *Pool
txp *TxPortal
seq *util.Sequence
Expand All @@ -37,22 +37,24 @@ type RxRead struct {

func NewRxPortal(adapter Adapter, txp *TxPortal, seq *util.Sequence, closer *Closer) *RxPortal {
rxp := &RxPortal{
adapter: adapter,
tree: btree.NewWith(txp.alg.Profile().MaxTreeSize, utils.Int32Comparator),
accepted: -1,
rxs: make(chan *WireMessage),
reads: make(chan *RxRead, txp.alg.Profile().ReadsQueueSize),
readBuffer: new(bytes.Buffer),
readPool: new(sync.Pool),
ackPool: NewPool("ackPool", uint32(txp.alg.Profile().PoolBufferSize)),
txp: txp,
seq: seq,
closer: closer,
adapter: adapter,
tree: btree.NewWith(txp.alg.Profile().MaxTreeSize, utils.Int32Comparator),
accepted: -1,
rxs: make(chan *WireMessage),
reads: make(chan *RxRead, txp.alg.Profile().ReadsQueueSize),
readBuffer: new(bytes.Buffer),
rawReadPool: new(sync.Pool),
readPool: NewPool("readPool", uint32(txp.alg.Profile().PoolBufferSize)),
ackPool: NewPool("ackPool", uint32(txp.alg.Profile().PoolBufferSize)),
txp: txp,
seq: seq,
closer: closer,
}
rxp.readPool.New = func() interface{} {
rxp.rawReadPool.New = func() interface{} {
return make([]byte, txp.alg.Profile().PoolBufferSize)
}
go rxp.run()
go rxp.rxer()
return rxp
}

Expand Down Expand Up @@ -101,7 +103,7 @@ preread:
if n != read.Size {
return 0, errors.Wrap(err, "short buffer")
}
rxp.readPool.Put(read.Buf)
rxp.rawReadPool.Put(read.Buf)

return rxp.readBuffer.Read(p)

Expand Down Expand Up @@ -149,9 +151,9 @@ func (rxp *RxPortal) run() {
return
}

case <-time.After(time.Duration(rxp.txp.alg.Profile().ConnectionTimeout) * time.Millisecond):
//case <-time.After(time.Duration(rxp.txp.alg.Profile().ConnectionTimeout) * time.Millisecond):
// rxp.Closer.timeout()
return
//return
}

switch wm.messageType() {
Expand Down Expand Up @@ -201,7 +203,7 @@ func (rxp *RxPortal) run() {
if key.(int32) == next {
v, _ := rxp.tree.Get(key)
wm := v.(*WireMessage)
buf := rxp.readPool.Get().([]byte)
buf := rxp.rawReadPool.Get().([]byte)
if data, _, err := wm.asData(); err == nil {
n := copy(buf, data)
rxp.reads <- &RxRead{buf, n, false}
Expand Down Expand Up @@ -251,3 +253,71 @@ func (rxp *RxPortal) run() {
}
}
}

func (rxp *RxPortal) rxer() {
logrus.Info("started")
defer logrus.Warn("exited")

for {
wm, err := readWireMessage(rxp.adapter, rxp.readPool)
if err != nil {
logrus.Errorf("error reading (%v)", err)
rxp.closer.EmergencyStop()
return
}

switch wm.messageType() {
case DATA:
_, rttTs, err := wm.asData()
if err != nil {
logrus.Errorf("as data error (%v)", err)
continue
}
if rttTs != nil {
// self.txPortal.rtt(*rttTs)
}
if err := rxp.Rx(wm); err != nil {
logrus.Errorf("error rx-ing (%v)", err)
continue
}

case ACK:
acks, rxPortalSz, rttTs, err := wm.asAck()
if err != nil {
logrus.Errorf("as ack error (%v)", err)
continue
}
if rttTs != nil {
//self.txPortal.rtt(*rttTs)
}
rxp.txp.alg.UpdateRxPortalSize(int(rxPortalSz))
if err := rxp.txp.ack(acks); err != nil {
logrus.Errorf("error acking (%v)", err)
continue
}
wm.buf.Unref()

case KEEPALIVE:
rxPortalSz, err := wm.asKeepalive()
if err != nil {
logrus.Errorf("as keepalive error (%v)", err)
continue
}
rxp.txp.alg.UpdateRxPortalSize(rxPortalSz)
if err := rxp.Rx(wm); err != nil {
logrus.Errorf("error forwarding keepalive to rxPortal (%v)", err)
continue
}
wm.buf.Unref()

case CLOSE:
if err := rxp.Rx(wm); err != nil {
logrus.Errorf("error rx-ing close (%v)", err)
}

default:
logrus.Errorf("unexpected message type: %d", wm.messageType())
wm.buf.Unref()
}
}
}

0 comments on commit 67c61cd

Please sign in to comment.