diff --git a/mempool/mempool.go b/mempool/mempool.go index 9638baf26b1..fc6e40c1a32 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -179,7 +179,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { }, }) } - return nil + return nil // TODO: return an error (?) } mem.cache.Push(tx) // END CACHE @@ -216,21 +216,23 @@ func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) { func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { switch r := res.Value.(type) { case *abci.Response_CheckTx: + tx := req.GetCheckTx().Tx if r.CheckTx.Code == abci.CodeType_OK { mem.counter++ memTx := &mempoolTx{ counter: mem.counter, height: int64(mem.height), - tx: req.GetCheckTx().Tx, + tx: tx, } mem.txs.PushBack(memTx) + mem.logger.Info("Added good transaction", "tx", tx, "res", r) mem.notifyTxsAvailable() } else { // ignore bad transaction - mem.logger.Info("Bad Transaction", "res", r) + mem.logger.Info("Rejected bad transaction", "tx", tx, "res", r) // remove from cache (it might be good later) - mem.cache.Remove(req.GetCheckTx().Tx) + mem.cache.Remove(tx) // TODO: handle other retcodes } diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index ee61a468b56..46401e88bab 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -15,14 +15,12 @@ import ( "github.com/tendermint/tendermint/types" ) -func newMempoolWithApp(t *testing.T, cc proxy.ClientCreator) *Mempool { +func newMempoolWithApp(cc proxy.ClientCreator) *Mempool { config := cfg.ResetTestRoot("mempool_test") appConnMem, _ := cc.NewABCIClient() appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) - if _, err := appConnMem.Start(); err != nil { - t.Fatalf("Error starting ABCI client: %v", err.Error()) - } + appConnMem.Start() mempool := NewMempool(config.Mempool, appConnMem, 0) mempool.SetLogger(log.TestingLogger()) return mempool @@ -46,7 +44,7 @@ func ensureFire(t *testing.T, ch <-chan int, timeoutMS int) { } } -func sendTxs(t *testing.T, mempool *Mempool, count int) types.Txs { +func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs { txs := make(types.Txs, count) for i := 0; i < count; i++ { txBytes := make([]byte, 20) @@ -63,7 +61,7 @@ func sendTxs(t *testing.T, mempool *Mempool, count int) types.Txs { func TestTxsAvailable(t *testing.T) { app := dummy.NewDummyApplication() cc := proxy.NewLocalClientCreator(app) - mempool := newMempoolWithApp(t, cc) + mempool := newMempoolWithApp(cc) mempool.EnableTxsAvailable() timeoutMS := 500 @@ -72,7 +70,7 @@ func TestTxsAvailable(t *testing.T) { ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // send a bunch of txs, it should only fire once - txs := sendTxs(t, mempool, 100) + txs := checkTxs(t, mempool, 100) ensureFire(t, mempool.TxsAvailable(), timeoutMS) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) @@ -85,7 +83,7 @@ func TestTxsAvailable(t *testing.T) { ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // send a bunch more txs. we already fired for this height so it shouldnt fire again - moreTxs := sendTxs(t, mempool, 50) + moreTxs := checkTxs(t, mempool, 50) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // now call update with all the txs. it should not fire as there are no txs left @@ -94,7 +92,7 @@ func TestTxsAvailable(t *testing.T) { ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // send a bunch more txs, it should only fire once - sendTxs(t, mempool, 100) + checkTxs(t, mempool, 100) ensureFire(t, mempool.TxsAvailable(), timeoutMS) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) } @@ -104,7 +102,7 @@ func TestSerialReap(t *testing.T) { app.SetOption("serial", "on") cc := proxy.NewLocalClientCreator(app) - mempool := newMempoolWithApp(t, cc) + mempool := newMempoolWithApp(cc) appConnCon, _ := cc.NewABCIClient() appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) if _, err := appConnCon.Start(); err != nil { diff --git a/mempool/reactor.go b/mempool/reactor.go index e4ee417fd90..7dbfa292443 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -9,6 +9,7 @@ import ( abci "github.com/tendermint/abci/types" wire "github.com/tendermint/go-wire" "github.com/tendermint/tmlibs/clist" + "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" @@ -40,6 +41,12 @@ func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReac return memR } +// SetLogger sets the Logger on the reactor and the underlying Mempool. +func (memR *MempoolReactor) SetLogger(l log.Logger) { + memR.Logger = l + memR.Mempool.SetLogger(l) +} + // GetChannels implements Reactor. // It returns the list of channels for this reactor. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { @@ -76,11 +83,7 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { case *TxMessage: err := memR.Mempool.CheckTx(msg.Tx, nil) if err != nil { - // Bad, seen, or conflicting tx. - memR.Logger.Info("Could not add tx", "tx", msg.Tx) - return - } else { - memR.Logger.Info("Added valid tx", "tx", msg.Tx) + memR.Logger.Info("Could not check tx", "tx", msg.Tx, "err", err) } // broadcasting happens from go routines per peer default: diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go new file mode 100644 index 00000000000..a2f0f272ef8 --- /dev/null +++ b/mempool/reactor_test.go @@ -0,0 +1,108 @@ +package mempool + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/go-kit/kit/log/term" + + "github.com/tendermint/abci/example/dummy" + "github.com/tendermint/tmlibs/log" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" +) + +// mempoolLogger is a TestingLogger which uses a different +// color for each validator ("validator" key must exist). +func mempoolLogger() log.Logger { + return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor { + for i := 0; i < len(keyvals)-1; i += 2 { + if keyvals[i] == "validator" { + return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))} + } + } + return term.FgBgColor{} + }) +} + +// connect N mempool reactors through N switches +func makeAndConnectMempoolReactors(config *cfg.Config, N int) []*MempoolReactor { + reactors := make([]*MempoolReactor, N) + logger := mempoolLogger() + for i := 0; i < N; i++ { + app := dummy.NewDummyApplication() + cc := proxy.NewLocalClientCreator(app) + mempool := newMempoolWithApp(cc) + + reactors[i] = NewMempoolReactor(config.Mempool, mempool) // so we dont start the consensus states + reactors[i].SetLogger(logger.With("validator", i)) + } + + p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("MEMPOOL", reactors[i]) + return s + + }, p2p.Connect2Switches) + return reactors +} + +// wait for all txs on all reactors +func waitForTxs(t *testing.T, txs types.Txs, reactors []*MempoolReactor) { + // wait for the txs in all mempools + wg := new(sync.WaitGroup) + for i := 0; i < len(reactors); i++ { + wg.Add(1) + go _waitForTxs(t, wg, txs, i, reactors) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + timer := time.After(TIMEOUT) + select { + case <-timer: + t.Fatal("Timed out waiting for txs") + case <-done: + } +} + +// wait for all txs on a single mempool +func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int, reactors []*MempoolReactor) { + + mempool := reactors[reactorIdx].Mempool + for mempool.Size() != len(txs) { + time.Sleep(time.Second) + } + + reapedTxs := mempool.Reap(len(txs)) + for i, tx := range txs { + assert.Equal(t, tx, reapedTxs[i], fmt.Sprintf("txs at index %d on reactor %d don't match: %v vs %v", i, reactorIdx, tx, reapedTxs[i])) + } + wg.Done() +} + +var ( + NUM_TXS = 1000 + TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow +) + +func TestReactorBroadcastTxMessage(t *testing.T) { + config := cfg.TestConfig() + N := 4 + reactors := makeAndConnectMempoolReactors(config, N) + + // send a bunch of txs to the first reactor's mempool + // and wait for them all to be received in the others + txs := checkTxs(t, reactors[0].Mempool, NUM_TXS) + waitForTxs(t, txs, reactors) +} diff --git a/p2p/connection.go b/p2p/connection.go index 7d99e1ed3d5..97d54635d3d 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -471,6 +471,7 @@ FOR_LOOP: } if msgBytes != nil { c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes) + // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine c.onReceive(pkt.ChannelID, msgBytes) } default: