Skip to content

Commit

Permalink
drain channels before Unsubscribe(All)
Browse files Browse the repository at this point in the history
  • Loading branch information
melekes committed Nov 8, 2018
1 parent aec480c commit ee43715
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [p2p] \#2771 Fix `peer-id` label name in prometheus metrics
- [abci] unlock mutex in localClient so even when app panics (e.g. during CheckTx), consensus continue working
- [abci] fix DATA RACE in localClient
- [rpc] drain channel before calling Unsubscribe(All) in /broadcast_tx_commit
14 changes: 12 additions & 2 deletions consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error {
if err != nil {
return errors.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)
}
defer cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
defer func() {
for range newStepCh {
// drain newStepCh to make sure we don't block
}
cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
}()

// just open the file for reading, no need to use wal
fp, err := os.OpenFile(file, os.O_RDONLY, 0600)
Expand Down Expand Up @@ -221,7 +226,12 @@ func (pb *playback) replayConsoleLoop() int {
if err != nil {
cmn.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep))
}
defer pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
defer func() {
for range newStepCh {
// drain newStepCh to make sure we don't block
}
pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
}()

if len(tokens) == 1 {
if err := pb.replayReset(1, newStepCh); err != nil {
Expand Down
7 changes: 6 additions & 1 deletion rpc/client/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,12 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
}

// make sure to unregister after the test is over
defer c.UnsubscribeAll(ctx, subscriber)
defer func() {
for range evts {
// drain evts to make sure we don't block
}
c.UnsubscribeAll(ctx, subscriber)
}()

select {
case evt := <-evts:
Expand Down
7 changes: 6 additions & 1 deletion rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,12 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
logger.Error("Error on broadcastTxCommit", "err", err)
return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err)
}
defer eventBus.Unsubscribe(context.Background(), "mempool", q)
defer func() {
for range deliverTxResCh {
// drain deliverTxResCh to make sure we don't block
}
eventBus.Unsubscribe(context.Background(), "mempool", q)
}()

// broadcast the tx and register checktx callback
checkTxResCh := make(chan *abci.Response, 1)
Expand Down

0 comments on commit ee43715

Please sign in to comment.