Skip to content

Commit

Permalink
drain channels before Unsubscribe(All)
Browse files Browse the repository at this point in the history
  • Loading branch information
melekes committed Nov 8, 2018
1 parent aec480c commit f4f3109
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [p2p] \#2771 Fix `peer-id` label name in prometheus metrics
- [abci] unlock mutex in localClient so even when app panics (e.g. during CheckTx), consensus continue working
- [abci] fix DATA RACE in localClient
- [rpc] drain channel before calling Unsubscribe(All) in /broadcast_tx_commit
26 changes: 24 additions & 2 deletions consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,18 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error {
if err != nil {
return errors.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)
}
defer cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
defer func() {
// drain newStepCh to make sure we don't block
LOOP:
for {
select {
case <-newStepCh:
default:
break LOOP
}
}
cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
}()

// just open the file for reading, no need to use wal
fp, err := os.OpenFile(file, os.O_RDONLY, 0600)
Expand Down Expand Up @@ -221,7 +232,18 @@ func (pb *playback) replayConsoleLoop() int {
if err != nil {
cmn.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep))
}
defer pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
defer func() {
// drain newStepCh to make sure we don't block
LOOP:
for {
select {
case <-newStepCh:
default:
break LOOP
}
}
pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
}()

if len(tokens) == 1 {
if err := pb.replayReset(1, newStepCh); err != nil {
Expand Down
12 changes: 9 additions & 3 deletions libs/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,15 @@
//
// s.Subscribe(ctx, sub, qry, out)
// defer func() {
// for range out {
// // drain out to make sure we don't block
// }
// // drain out to make sure we don't block
// LOOP:
// for {
// select {
// case <-out:
// default:
// break LOOP
// }
// }
// s.UnsubscribeAll(ctx, sub)
// }()
// for msg := range out {
Expand Down
13 changes: 12 additions & 1 deletion rpc/client/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,18 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
}

// make sure to unregister after the test is over
defer c.UnsubscribeAll(ctx, subscriber)
defer func() {
// drain evts to make sure we don't block
LOOP:
for {
select {
case <-evts:
default:
break LOOP
}
}
c.UnsubscribeAll(ctx, subscriber)
}()

select {
case evt := <-evts:
Expand Down
15 changes: 13 additions & 2 deletions rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,26 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// subscribe to tx being committed in block
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
deliverTxResCh := make(chan interface{})
deliverTxResCh := make(chan interface{}, 1)
q := types.EventQueryTxFor(tx)
err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh)
if err != nil {
err = errors.Wrap(err, "failed to subscribe to tx")
logger.Error("Error on broadcastTxCommit", "err", err)
return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err)
}
defer eventBus.Unsubscribe(context.Background(), "mempool", q)
defer func() {
// drain deliverTxResCh to make sure we don't block
LOOP:
for {
select {
case <-deliverTxResCh:
default:
break LOOP
}
}
eventBus.Unsubscribe(context.Background(), "mempool", q)
}()

// broadcast the tx and register checktx callback
checkTxResCh := make(chan *abci.Response, 1)
Expand Down

0 comments on commit f4f3109

Please sign in to comment.