Skip to content

Commit

Permalink
closer (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelquigley committed Aug 27, 2021
1 parent bf21a4b commit dae294b
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 6 deletions.
41 changes: 39 additions & 2 deletions closer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,43 @@
package dilithium

// Closer manages the state machine for the shutdown of a TxPortal and RxPortal pair (one side of a communication).
import (
"github.com/openziti/dilithium/util"
"github.com/sirupsen/logrus"
"time"
)

// closer manages the state machine for the shutdown of a TxPortal and RxPortal pair (one side of a communication).
//
type Closer struct {
type closer struct {
seq *util.Sequence
rxCloseSeq int32
rxCloseSeqIn chan int32
txCloseSeq int32
txCloseSeqIn chan int32
txp *TxPortal
rxp *RxPortal
lastEvent time.Time
closeHook func()
}

func newCloser(seq *util.Sequence, closeHook func()) *closer {
return &closer{
seq: seq,
rxCloseSeq: notClosed,
rxCloseSeqIn: make(chan int32, 1),
txCloseSeq: notClosed,
txCloseSeqIn: make(chan int32, 1),
closeHook: closeHook,
}
}

func (c *closer) emergencyStop() {
logrus.Info("broken glass")
c.txp.close()
c.rxp.Close()
if c.closeHook != nil {
c.closeHook()
}
}

const notClosed = int32(-99)
4 changes: 2 additions & 2 deletions rxportal.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type RxPortal struct {
ackPool *Pool
txp *TxPortal
seq *util.Sequence
closer *Closer
closer *closer
closed bool
}

Expand All @@ -35,7 +35,7 @@ type RxRead struct {
Eof bool
}

func NewRxPortal(transport Transport, txp *TxPortal, seq *util.Sequence, closer *Closer) *RxPortal {
func NewRxPortal(transport Transport, txp *TxPortal, seq *util.Sequence, closer *closer) *RxPortal {
rxp := &RxPortal{
transport: transport,
tree: btree.NewWith(txp.alg.Profile().MaxTreeSize, utils.Int32Comparator),
Expand Down
9 changes: 7 additions & 2 deletions txportal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ type TxPortal struct {
transport Transport
alg TxAlgorithm
monitor *TxMonitor
closer *Closer
closer *closer
closed bool
pool *Pool
}

func newTxPortal(transport Transport, alg TxAlgorithm, closer *Closer, pool *Pool) *TxPortal {
func newTxPortal(transport Transport, alg TxAlgorithm, closer *closer, pool *Pool) *TxPortal {
txp := &TxPortal{
lock: new(sync.Mutex),
tree: btree.NewWith(alg.Profile().MaxTreeSize, utils.Int32Comparator),
Expand Down Expand Up @@ -127,6 +127,11 @@ func (txp *TxPortal) ack(acks []Ack) error {
return nil
}

func (txp *TxPortal) close() {
txp.closed = true
txp.monitor.close()
}

func (txp *TxPortal) keepaliveSender() {
logrus.Info("started")
defer logrus.Info("exited")
Expand Down

0 comments on commit dae294b

Please sign in to comment.