Skip to content

Commit

Permalink
mempool: reactor test
Browse files Browse the repository at this point in the history
  • Loading branch information
ebuchman committed Sep 5, 2017
1 parent 5ad0da1 commit 88138c3
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 19 deletions.
10 changes: 6 additions & 4 deletions mempool/mempool.go
Expand Up @@ -179,7 +179,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
},
})
}
return nil
return nil // TODO: return an error (?)
}
mem.cache.Push(tx)
// END CACHE
Expand Down Expand Up @@ -216,21 +216,23 @@ func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) {
func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
tx := req.GetCheckTx().Tx
if r.CheckTx.Code == abci.CodeType_OK {
mem.counter++
memTx := &mempoolTx{
counter: mem.counter,
height: int64(mem.height),
tx: req.GetCheckTx().Tx,
tx: tx,
}
mem.txs.PushBack(memTx)
mem.logger.Info("Added good transaction", "tx", tx, "res", r)
mem.notifyTxsAvailable()
} else {
// ignore bad transaction
mem.logger.Info("Bad Transaction", "res", r)
mem.logger.Info("Rejected bad transaction", "tx", tx, "res", r)

// remove from cache (it might be good later)
mem.cache.Remove(req.GetCheckTx().Tx)
mem.cache.Remove(tx)

// TODO: handle other retcodes
}
Expand Down
18 changes: 8 additions & 10 deletions mempool/mempool_test.go
Expand Up @@ -15,14 +15,12 @@ import (
"github.com/tendermint/tendermint/types"
)

func newMempoolWithApp(t *testing.T, cc proxy.ClientCreator) *Mempool {
func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
config := cfg.ResetTestRoot("mempool_test")

appConnMem, _ := cc.NewABCIClient()
appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
if _, err := appConnMem.Start(); err != nil {
t.Fatalf("Error starting ABCI client: %v", err.Error())
}
appConnMem.Start()
mempool := NewMempool(config.Mempool, appConnMem, 0)
mempool.SetLogger(log.TestingLogger())
return mempool
Expand All @@ -46,7 +44,7 @@ func ensureFire(t *testing.T, ch <-chan int, timeoutMS int) {
}
}

func sendTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
txs := make(types.Txs, count)
for i := 0; i < count; i++ {
txBytes := make([]byte, 20)
Expand All @@ -63,7 +61,7 @@ func sendTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
func TestTxsAvailable(t *testing.T) {
app := dummy.NewDummyApplication()
cc := proxy.NewLocalClientCreator(app)
mempool := newMempoolWithApp(t, cc)
mempool := newMempoolWithApp(cc)
mempool.EnableTxsAvailable()

timeoutMS := 500
Expand All @@ -72,7 +70,7 @@ func TestTxsAvailable(t *testing.T) {
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)

// send a bunch of txs, it should only fire once
txs := sendTxs(t, mempool, 100)
txs := checkTxs(t, mempool, 100)
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)

Expand All @@ -85,7 +83,7 @@ func TestTxsAvailable(t *testing.T) {
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)

// send a bunch more txs. we already fired for this height so it shouldnt fire again
moreTxs := sendTxs(t, mempool, 50)
moreTxs := checkTxs(t, mempool, 50)
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)

// now call update with all the txs. it should not fire as there are no txs left
Expand All @@ -94,7 +92,7 @@ func TestTxsAvailable(t *testing.T) {
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)

// send a bunch more txs, it should only fire once
sendTxs(t, mempool, 100)
checkTxs(t, mempool, 100)
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
}
Expand All @@ -104,7 +102,7 @@ func TestSerialReap(t *testing.T) {
app.SetOption("serial", "on")
cc := proxy.NewLocalClientCreator(app)

mempool := newMempoolWithApp(t, cc)
mempool := newMempoolWithApp(cc)
appConnCon, _ := cc.NewABCIClient()
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
if _, err := appConnCon.Start(); err != nil {
Expand Down
13 changes: 8 additions & 5 deletions mempool/reactor.go
Expand Up @@ -9,6 +9,7 @@ import (
abci "github.com/tendermint/abci/types"
wire "github.com/tendermint/go-wire"
"github.com/tendermint/tmlibs/clist"
"github.com/tendermint/tmlibs/log"

cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p"
Expand Down Expand Up @@ -40,6 +41,12 @@ func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReac
return memR
}

// SetLogger sets the Logger on the reactor and the underlying Mempool.
func (memR *MempoolReactor) SetLogger(l log.Logger) {
memR.Logger = l
memR.Mempool.SetLogger(l)
}

// GetChannels implements Reactor.
// It returns the list of channels for this reactor.
func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
Expand Down Expand Up @@ -76,11 +83,7 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
case *TxMessage:
err := memR.Mempool.CheckTx(msg.Tx, nil)
if err != nil {
// Bad, seen, or conflicting tx.
memR.Logger.Info("Could not add tx", "tx", msg.Tx)
return
} else {
memR.Logger.Info("Added valid tx", "tx", msg.Tx)
memR.Logger.Info("Could not check tx", "tx", msg.Tx, "err", err)
}
// broadcasting happens from go routines per peer
default:
Expand Down
108 changes: 108 additions & 0 deletions mempool/reactor_test.go
@@ -0,0 +1,108 @@
package mempool

import (
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/go-kit/kit/log/term"

"github.com/tendermint/abci/example/dummy"
"github.com/tendermint/tmlibs/log"

cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)

// mempoolLogger is a TestingLogger which uses a different
// color for each validator ("validator" key must exist).
func mempoolLogger() log.Logger {
return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor {
for i := 0; i < len(keyvals)-1; i += 2 {
if keyvals[i] == "validator" {
return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))}
}
}
return term.FgBgColor{}
})
}

// connect N mempool reactors through N switches
func makeAndConnectMempoolReactors(config *cfg.Config, N int) []*MempoolReactor {
reactors := make([]*MempoolReactor, N)
logger := mempoolLogger()
for i := 0; i < N; i++ {
app := dummy.NewDummyApplication()
cc := proxy.NewLocalClientCreator(app)
mempool := newMempoolWithApp(cc)

reactors[i] = NewMempoolReactor(config.Mempool, mempool) // so we dont start the consensus states
reactors[i].SetLogger(logger.With("validator", i))
}

p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("MEMPOOL", reactors[i])
return s

}, p2p.Connect2Switches)
return reactors
}

// wait for all txs on all reactors
func waitForTxs(t *testing.T, txs types.Txs, reactors []*MempoolReactor) {
// wait for the txs in all mempools
wg := new(sync.WaitGroup)
for i := 0; i < len(reactors); i++ {
wg.Add(1)
go _waitForTxs(t, wg, txs, i, reactors)
}

done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

timer := time.After(TIMEOUT)
select {
case <-timer:
t.Fatal("Timed out waiting for txs")
case <-done:
}
}

// wait for all txs on a single mempool
func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int, reactors []*MempoolReactor) {

mempool := reactors[reactorIdx].Mempool
for mempool.Size() != len(txs) {
time.Sleep(time.Second)
}

reapedTxs := mempool.Reap(len(txs))
for i, tx := range txs {
assert.Equal(t, tx, reapedTxs[i], fmt.Sprintf("txs at index %d on reactor %d don't match: %v vs %v", i, reactorIdx, tx, reapedTxs[i]))
}
wg.Done()
}

var (
NUM_TXS = 1000
TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow
)

func TestReactorBroadcastTxMessage(t *testing.T) {
config := cfg.TestConfig()
N := 4
reactors := makeAndConnectMempoolReactors(config, N)

// send a bunch of txs to the first reactor's mempool
// and wait for them all to be received in the others
txs := checkTxs(t, reactors[0].Mempool, NUM_TXS)
waitForTxs(t, txs, reactors)
}
1 change: 1 addition & 0 deletions p2p/connection.go
Expand Up @@ -471,6 +471,7 @@ FOR_LOOP:
}
if msgBytes != nil {
c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
// NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
c.onReceive(pkt.ChannelID, msgBytes)
}
default:
Expand Down

0 comments on commit 88138c3

Please sign in to comment.