Skip to content

Commit

Permalink
start adapting other pkgs to new pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
melekes committed Jan 30, 2019
1 parent 8a04aa6 commit 0a1285a
Show file tree
Hide file tree
Showing 14 changed files with 280 additions and 255 deletions.
11 changes: 6 additions & 5 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
Expand Down Expand Up @@ -49,7 +50,7 @@ func TestByzantine(t *testing.T) {
switches[i].SetLogger(p2pLogger.With("validator", i))
}

eventChans := make([]chan interface{}, N)
eventSubs := make([]*tmpubsub.Subscription, N)
reactors := make([]p2p.Reactor, N)
for i := 0; i < N; i++ {
// make first val byzantine
Expand All @@ -68,8 +69,8 @@ func TestByzantine(t *testing.T) {
eventBus := css[i].eventBus
eventBus.SetLogger(logger.With("module", "events", "validator", i))

eventChans[i] = make(chan interface{}, 1)
err := eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, eventChans[i])
var err error
eventSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)

conR := NewConsensusReactor(css[i], true) // so we dont start the consensus states
Expand Down Expand Up @@ -135,7 +136,7 @@ func TestByzantine(t *testing.T) {
p2p.Connect2Switches(switches, ind1, ind2)

// wait for someone in the big partition (B) to make a block
<-eventChans[ind2]
<-eventSubs[ind2].Out()

t.Log("A block has been committed. Healing partition")
p2p.Connect2Switches(switches, ind0, ind1)
Expand All @@ -147,7 +148,7 @@ func TestByzantine(t *testing.T) {
wg.Add(2)
for i := 1; i < N-1; i++ {
go func(j int) {
<-eventChans[j]
<-eventSubs[j].Out()
wg.Done()
}(i)
}
Expand Down
108 changes: 54 additions & 54 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/privval"
Expand Down Expand Up @@ -220,18 +221,17 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo

// genesis
func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
voteCh0 := make(chan interface{})
err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote, voteCh0)
voteCh0Sub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote)
if err != nil {
panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote))
}
voteCh := make(chan interface{})
go func() {
for v := range voteCh0 {
vote := v.(types.EventDataVote)
for msgAndTags := range voteCh0Sub.Out() {
vote := msgAndTags.Msg.(types.EventDataVote)
// we only fire for our own votes
if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
voteCh <- v
voteCh <- msgAndTags.Msg
}
}
}()
Expand Down Expand Up @@ -311,7 +311,7 @@ func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {

//-------------------------------------------------------------------------------

func ensureNoNewEvent(ch <-chan interface{}, timeout time.Duration,
func ensureNoNewEvent(ch <-chan tmpubsub.MsgAndTags, timeout time.Duration,
errorMessage string) {
select {
case <-time.After(timeout):
Expand All @@ -321,28 +321,28 @@ func ensureNoNewEvent(ch <-chan interface{}, timeout time.Duration,
}
}

func ensureNoNewEventOnChannel(ch <-chan interface{}) {
func ensureNoNewEventOnChannel(ch <-chan tmpubsub.MsgAndTags) {
ensureNoNewEvent(
ch,
ensureTimeout,
"We should be stuck waiting, not receiving new event on the channel")
}

func ensureNoNewRoundStep(stepCh <-chan interface{}) {
func ensureNoNewRoundStep(stepCh <-chan tmpubsub.MsgAndTags) {
ensureNoNewEvent(
stepCh,
ensureTimeout,
"We should be stuck waiting, not receiving NewRoundStep event")
}

func ensureNoNewUnlock(unlockCh <-chan interface{}) {
func ensureNoNewUnlock(unlockCh <-chan tmpubsub.MsgAndTags) {
ensureNoNewEvent(
unlockCh,
ensureTimeout,
"We should be stuck waiting, not receiving Unlock event")
}

func ensureNoNewTimeout(stepCh <-chan interface{}, timeout int64) {
func ensureNoNewTimeout(stepCh <-chan tmpubsub.MsgAndTags, timeout int64) {
timeoutDuration := time.Duration(timeout*5) * time.Nanosecond
ensureNoNewEvent(
stepCh,
Expand All @@ -351,7 +351,7 @@ func ensureNoNewTimeout(stepCh <-chan interface{}, timeout int64) {
}

func ensureNewEvent(
ch <-chan interface{},
ch <-chan tmpubsub.MsgAndTags,
height int64,
round int,
timeout time.Duration,
Expand All @@ -361,7 +361,7 @@ func ensureNewEvent(
case <-time.After(timeout):
panic(errorMessage)
case ev := <-ch:
rs, ok := ev.(types.EventDataRoundState)
rs, ok := ev.Msg.(types.EventDataRoundState)
if !ok {
panic(
fmt.Sprintf(
Expand All @@ -378,7 +378,7 @@ func ensureNewEvent(
}
}

func ensureNewRoundStep(stepCh <-chan interface{}, height int64, round int) {
func ensureNewRoundStep(stepCh <-chan tmpubsub.MsgAndTags, height int64, round int) {
ensureNewEvent(
stepCh,
height,
Expand All @@ -387,12 +387,12 @@ func ensureNewRoundStep(stepCh <-chan interface{}, height int64, round int) {
"Timeout expired while waiting for NewStep event")
}

func ensureNewVote(voteCh <-chan interface{}, height int64, round int) {
func ensureNewVote(voteCh <-chan tmpubsub.MsgAndTags, height int64, round int) {
select {
case <-time.After(ensureTimeout):
break
case v := <-voteCh:
edv, ok := v.(types.EventDataVote)
edv, ok := v.Msg.(types.EventDataVote)
if !ok {
panic(fmt.Sprintf("expected a *types.Vote, "+
"got %v. wrong subscription channel?",
Expand All @@ -408,12 +408,12 @@ func ensureNewVote(voteCh <-chan interface{}, height int64, round int) {
}
}

func ensureNewRound(roundCh <-chan interface{}, height int64, round int) {
func ensureNewRound(roundCh <-chan tmpubsub.MsgAndTags, height int64, round int) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewRound event")
case ev := <-roundCh:
rs, ok := ev.(types.EventDataNewRound)
rs, ok := ev.Msg.(types.EventDataNewRound)
if !ok {
panic(
fmt.Sprintf(
Expand All @@ -429,18 +429,18 @@ func ensureNewRound(roundCh <-chan interface{}, height int64, round int) {
}
}

func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, timeout int64) {
func ensureNewTimeout(timeoutCh <-chan tmpubsub.MsgAndTags, height int64, round int, timeout int64) {
timeoutDuration := time.Duration(timeout*3) * time.Nanosecond
ensureNewEvent(timeoutCh, height, round, timeoutDuration,
"Timeout expired while waiting for NewTimeout event")
}

func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) {
func ensureNewProposal(proposalCh <-chan tmpubsub.MsgAndTags, height int64, round int) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewProposal event")
case ev := <-proposalCh:
rs, ok := ev.(types.EventDataCompleteProposal)
rs, ok := ev.Msg.(types.EventDataCompleteProposal)
if !ok {
panic(
fmt.Sprintf(
Expand All @@ -456,17 +456,17 @@ func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) {
}
}

func ensureNewValidBlock(validBlockCh <-chan interface{}, height int64, round int) {
func ensureNewValidBlock(validBlockCh <-chan tmpubsub.MsgAndTags, height int64, round int) {
ensureNewEvent(validBlockCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewValidBlock event")
}

func ensureNewBlock(blockCh <-chan interface{}, height int64) {
func ensureNewBlock(blockCh <-chan tmpubsub.MsgAndTags, height int64) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewBlock event")
case ev := <-blockCh:
block, ok := ev.(types.EventDataNewBlock)
block, ok := ev.Msg.(types.EventDataNewBlock)
if !ok {
panic(fmt.Sprintf("expected a *types.EventDataNewBlock, "+
"got %v. wrong subscription channel?",
Expand All @@ -478,12 +478,12 @@ func ensureNewBlock(blockCh <-chan interface{}, height int64) {
}
}

func ensureNewBlockHeader(blockCh <-chan interface{}, height int64, blockHash cmn.HexBytes) {
func ensureNewBlockHeader(blockCh <-chan tmpubsub.MsgAndTags, height int64, blockHash cmn.HexBytes) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewBlockHeader event")
case ev := <-blockCh:
blockHeader, ok := ev.(types.EventDataNewBlockHeader)
blockHeader, ok := ev.Msg.(types.EventDataNewBlockHeader)
if !ok {
panic(fmt.Sprintf("expected a *types.EventDataNewBlockHeader, "+
"got %v. wrong subscription channel?",
Expand All @@ -498,42 +498,17 @@ func ensureNewBlockHeader(blockCh <-chan interface{}, height int64, blockHash cm
}
}

func ensureNewUnlock(unlockCh <-chan interface{}, height int64, round int) {
func ensureNewUnlock(unlockCh <-chan tmpubsub.MsgAndTags, height int64, round int) {
ensureNewEvent(unlockCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewUnlock event")
}

func ensureVote(voteCh <-chan interface{}, height int64, round int,
voteType types.SignedMsgType) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewVote event")
case v := <-voteCh:
edv, ok := v.(types.EventDataVote)
if !ok {
panic(fmt.Sprintf("expected a *types.Vote, "+
"got %v. wrong subscription channel?",
reflect.TypeOf(v)))
}
vote := edv.Vote
if vote.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height))
}
if vote.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, vote.Round))
}
if vote.Type != voteType {
panic(fmt.Sprintf("expected type %v, got %v", voteType, vote.Type))
}
}
}

func ensureProposal(proposalCh <-chan interface{}, height int64, round int, propId types.BlockID) {
func ensureProposal(proposalCh <-chan tmpubsub.MsgAndTags, height int64, round int, propId types.BlockID) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewProposal event")
case ev := <-proposalCh:
rs, ok := ev.(types.EventDataCompleteProposal)
rs, ok := ev.Msg.(types.EventDataCompleteProposal)
if !ok {
panic(
fmt.Sprintf(
Expand All @@ -560,7 +535,32 @@ func ensurePrevote(voteCh <-chan interface{}, height int64, round int) {
ensureVote(voteCh, height, round, types.PrevoteType)
}

func ensureNewEventOnChannel(ch <-chan interface{}) {
func ensureVote(voteCh <-chan interface{}, height int64, round int,
voteType types.SignedMsgType) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewVote event")
case v := <-voteCh:
edv, ok := v.(types.EventDataVote)
if !ok {
panic(fmt.Sprintf("expected a *types.Vote, "+
"got %v. wrong subscription channel?",
reflect.TypeOf(v)))
}
vote := edv.Vote
if vote.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height))
}
if vote.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, vote.Round))
}
if vote.Type != voteType {
panic(fmt.Sprintf("expected type %v, got %v", voteType, vote.Type))
}
}
}

func ensureNewEventOnChannel(ch <-chan tmpubsub.MsgAndTags) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for new activity on the channel")
Expand Down
2 changes: 1 addition & 1 deletion consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
ticker := time.NewTicker(time.Second * 30)
select {
case b := <-newBlockCh:
evt := b.(types.EventDataNewBlock)
evt := b.Msg.(types.EventDataNewBlock)
nTxs += int(evt.Block.Header.NumTxs)
case <-ticker.C:
panic("Timed out waiting to commit blocks with transactions")
Expand Down

0 comments on commit 0a1285a

Please sign in to comment.