Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mempool no gossip back #2778

Merged
merged 41 commits into from
Mar 26, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
06d5ffd
Make the mempool not gossip txs back to peers its received it from
ValarDragon Nov 7, 2018
3904e81
Fix adversarial memleak
ValarDragon Nov 7, 2018
7842608
Don't break interface
ValarDragon Nov 7, 2018
167b720
Update changelog
ValarDragon Nov 7, 2018
fcc1a2b
Merge branch 'develop' into dev/mempool_no_gossip_back
ValarDragon Nov 7, 2018
aba2c93
Forgot to add a mtx
ValarDragon Nov 7, 2018
6da7a91
Merge branch 'dev/mempool_no_gossip_back' of github.com:tendermint/te…
ValarDragon Nov 7, 2018
3b10fe6
forgot a mutex
ValarDragon Nov 7, 2018
ca68847
Update mempool/reactor.go
melekes Nov 10, 2018
4fbed56
Update mempool/mempool.go
melekes Nov 10, 2018
cb1fca6
Merge branch 'develop' into dev/mempool_no_gossip_back
ebuchman Nov 11, 2018
6933dd1
Use unknown peer ID
melekes Nov 14, 2018
30d32dd
fix compilation
ValarDragon Nov 14, 2018
ef34f13
Merge branch 'develop' into dev/mempool_no_gossip_back
ValarDragon Nov 14, 2018
7d5415d
use next wait chan logic when skipping
ValarDragon Nov 14, 2018
5a296eb
Minor fixes
ValarDragon Nov 14, 2018
5174032
Add TxInfo
ValarDragon Nov 20, 2018
e086a3a
Add reverse map
ValarDragon Nov 21, 2018
4fd49d2
Make activeID's auto-reserve 0
ValarDragon Nov 21, 2018
8893dc1
0 -> UnknownPeerID
melekes Nov 26, 2018
ef84519
Switch to making the normal case set a callback on the reqres object
ValarDragon Nov 28, 2018
1c04fc6
Merge branch 'develop' into dev/mempool_no_gossip_back
ValarDragon Nov 29, 2018
5c4a5ce
fix merge conflict
ValarDragon Nov 29, 2018
df48b07
Addres comments
ValarDragon Dec 16, 2018
7953d8b
Add cache tests
ValarDragon Dec 16, 2018
e08c050
add cache tests
ValarDragon Dec 16, 2018
0fd4789
minor fixes
ebuchman Dec 17, 2018
681bde0
Merge branch 'develop' into dev/mempool_no_gossip_back
ebuchman Dec 21, 2018
43bfa6a
Merge branch 'develop' into dev/mempool_no_gossip_back
melekes Jan 24, 2019
e1cb280
Merge branch 'develop' into dev/mempool_no_gossip_back
melekes Feb 27, 2019
b32b199
update metrics in reqResCb and reformat code
melekes Feb 27, 2019
1f84487
goimport -w mempool/reactor.go
melekes Feb 27, 2019
bf3afc0
mempool: update memTx senders
melekes Feb 27, 2019
79acc63
change senders type from []uint16 to sync.Map
melekes Feb 27, 2019
53bb106
explain the choice of a map DS for senders
melekes Feb 27, 2019
28d9ef5
extract ids pool/mapper to a separate struct
melekes Feb 27, 2019
525a430
fix literal copies lock value from senders: sync.Map contains sync.Mutex
melekes Feb 27, 2019
6e6cbb0
use sync.Map#LoadOrStore instead of Load
melekes Feb 27, 2019
c414f54
Merge branch 'develop' into dev/mempool_no_gossip_back
melekes Mar 25, 2019
e3f376d
fixes after Ismail's review
melekes Mar 25, 2019
58d0cf7
rename resCbNormal to resCbFirstTime
melekes Mar 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi

- [crypto/merkle] [\#2756](https://github.com/tendermint/tendermint/issues/2756) Fix crypto/merkle ProofOperators.Verify to check bounds on keypath parts.
- [mempool] fix a bug where we create a WAL despite `wal_dir` being empty
- [p2p] \#2771 Fix `peer-id` label name in prometheus metrics
- [mempool] \#2778 No longer send txs back to peers who sent it to you
- [p2p] \#2771 Fix `peer-id` label name in prometheus metrics\
2 changes: 1 addition & 1 deletion abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (cli *grpcClient) Error() error {
// NOTE: callback may get internally generated flush responses.
func (cli *grpcClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
cli.mtx.Unlock()
}

//----------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func NewLocalClient(mtx *sync.Mutex, app types.Application) *localClient {

func (app *localClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.Callback = cb
app.mtx.Unlock()
}

// TODO: change types.Application to include Error()?
Expand Down
2 changes: 2 additions & 0 deletions docs/spec/reactors/mempool/reactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ for details.

Sending incorrectly encoded data or data exceeding `maxMsgSize` will result
in stopping the peer.

The mempool will not send a tx back to any peer which it received it from.
16 changes: 14 additions & 2 deletions mempool/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ func BenchmarkReap(b *testing.B) {
}
}

func BenchmarkCheckTx(b *testing.B) {
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
app := kvstore.NewKVStoreApplication()
cc := proxy.NewLocalClientCreator(app)
mempool := newMempoolWithApp(cc)

for i := 0; i < b.N; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTx(tx, nil)
}
}

func BenchmarkCacheInsertTime(b *testing.B) {
cache := newMapTxCache(b.N)
txs := make([][]byte, b.N)
Expand All @@ -34,7 +46,7 @@ func BenchmarkCacheInsertTime(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Push(txs[i])
cache.Push(txs[i], 0)
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -46,7 +58,7 @@ func BenchmarkCacheRemoveTime(b *testing.B) {
for i := 0; i < b.N; i++ {
txs[i] = make([]byte, 8)
binary.BigEndian.PutUint64(txs[i], uint64(i))
cache.Push(txs[i])
cache.Push(txs[i], 0)
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
57 changes: 41 additions & 16 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"

"github.com/tendermint/tendermint/abci/client"

"github.com/pkg/errors"

amino "github.com/tendermint/go-amino"
Expand Down Expand Up @@ -177,7 +179,7 @@ func NewMempool(
} else {
mempool.cache = nopTxCache{}
}
proxyAppConn.SetResponseCallback(mempool.resCb)
proxyAppConn.SetResponseCallback(mempool.resCb(0))
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
for _, option := range options {
option(mempool)
}
Expand Down Expand Up @@ -296,6 +298,10 @@ func (mem *Mempool) TxsWaitChan() <-chan struct{} {
// It gets called from another goroutine.
// CONTRACT: Either cb will get called, or err returned.
func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
melekes marked this conversation as resolved.
Show resolved Hide resolved
return mem.checkTxFromPeer(tx, cb, 0)
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
}

func (mem *Mempool) checkTxFromPeer(tx types.Tx, cb func(*abci.Response), peerID uint16) (err error) {
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()

Expand All @@ -310,7 +316,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
}

// CACHE
if !mem.cache.Push(tx) {
if !mem.cache.Push(tx, peerID) {
return ErrTxInCache
}
// END CACHE
Expand All @@ -333,6 +339,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
if err = mem.proxyAppConn.Error(); err != nil {
return err
}
mem.proxyAppConn.SetResponseCallback(mem.resCb(peerID))
reqRes := mem.proxyAppConn.CheckTxAsync(tx)
if cb != nil {
reqRes.SetCallback(cb)
Expand All @@ -342,17 +349,19 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
}

// ABCI callback function
func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) {
if mem.recheckCursor == nil {
mem.resCbNormal(req, res)
} else {
mem.metrics.RecheckTimes.Add(1)
mem.resCbRecheck(req, res)
func (mem *Mempool) resCb(peerID uint16) abcicli.Callback {
return func(req *abci.Request, res *abci.Response) {
if mem.recheckCursor == nil {
mem.resCbNormal(req, res, peerID)
} else {
mem.metrics.RecheckTimes.Add(1)
mem.resCbRecheck(req, res)
}
mem.metrics.Size.Set(float64(mem.Size()))
}
mem.metrics.Size.Set(float64(mem.Size()))
}

func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response, peerID uint16) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
tx := req.GetCheckTx().Tx
Expand All @@ -366,7 +375,9 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
counter: mem.counter,
height: mem.height,
gasWanted: r.CheckTx.GasWanted,
tx: tx,
// senders: []uint16{mem.cache.GetPeerID(tx)},
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
senders: []uint16{peerID},
tx: tx,
}
mem.txs.PushBack(memTx)
mem.logger.Info("Added good transaction",
Expand Down Expand Up @@ -607,6 +618,7 @@ type mempoolTx struct {
counter int64 // a simple incrementing counter
height int64 // height that this tx had been validated in
gasWanted int64 // amount of gas this tx states it will require
senders []uint16 // ids of peers who've sent us this tx
tx types.Tx //
}

Expand All @@ -619,7 +631,7 @@ func (memTx *mempoolTx) Height() int64 {

type txCache interface {
Reset()
Push(tx types.Tx) bool
Push(tx types.Tx, peerID uint16) bool
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
Remove(tx types.Tx)
}

Expand Down Expand Up @@ -653,14 +665,27 @@ func (cache *mapTxCache) Reset() {

// Push adds the given tx to the cache and returns true. It returns false if tx
// is already in the cache.
func (cache *mapTxCache) Push(tx types.Tx) bool {
func (cache *mapTxCache) Push(tx types.Tx, peerID uint16) bool {
cache.mtx.Lock()
defer cache.mtx.Unlock()

// Use the tx hash in the cache
txHash := sha256.Sum256(tx)
if moved, exists := cache.map_[txHash]; exists {
cache.list.MoveToFront(moved)
memTx, succ := moved.Value.(*mempoolTx)
// succ would be false if tx was reaped
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would the type assertion fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your right, I'm not sure why I thought it would do that lol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized what happened. I mistakenly thought the hashmap was returning a pointer to the main mempool's node for it, not the linked list. So none of this logic works for adding subsequent peers, we have to update the cache to know about the main mempool's tx index in order for that to happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now would it be best to just comment out this section, and merge the rest? (It currently works for not gossiping it back to the first peer who sent it to you, which is still a significant improvement imo)

if succ {
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < len(memTx.senders); i++ {
if peerID == memTx.senders[i] {
// TODO: consider punishing peer for dups,
// its non-trivial since invalid txs can become valid,
// but they can spam the same tx with little cost to them atm.
return false
}
}
memTx.senders = append(memTx.senders, peerID)
}
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
return false
}

Expand Down Expand Up @@ -694,6 +719,6 @@ type nopTxCache struct{}

var _ txCache = (*nopTxCache)(nil)

func (nopTxCache) Reset() {}
func (nopTxCache) Push(types.Tx) bool { return true }
func (nopTxCache) Remove(types.Tx) {}
func (nopTxCache) Reset() {}
func (nopTxCache) Push(types.Tx, uint16) bool { return true }
func (nopTxCache) Remove(types.Tx) {}
18 changes: 9 additions & 9 deletions mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
}
}

func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
func checkTxs(t *testing.T, mempool *Mempool, count int, peerID uint16) types.Txs {
txs := make(types.Txs, count)
for i := 0; i < count; i++ {
txBytes := make([]byte, 20)
Expand All @@ -64,7 +64,7 @@ func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
if err != nil {
t.Error(err)
}
if err := mempool.CheckTx(txBytes, nil); err != nil {
if err := mempool.checkTxFromPeer(txBytes, nil, peerID); err != nil {
// Skip invalid txs.
// TestMempoolFilters will fail otherwise. It asserts a number of txs
// returned.
Expand All @@ -83,7 +83,7 @@ func TestReapMaxBytesMaxGas(t *testing.T) {
mempool := newMempoolWithApp(cc)

// Ensure gas calculation behaves as expected
checkTxs(t, mempool, 1)
checkTxs(t, mempool, 1, 0)
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
tx0 := mempool.TxsFront().Value.(*mempoolTx)
// assert that kv store has gas wanted = 1.
require.Equal(t, app.CheckTx(tx0.tx).GasWanted, int64(1), "KVStore had a gas value neq to 1")
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestReapMaxBytesMaxGas(t *testing.T) {
{20, 20000, 30, 20},
}
for tcIndex, tt := range tests {
checkTxs(t, mempool, tt.numTxsToCreate)
checkTxs(t, mempool, tt.numTxsToCreate, 0)
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
got := mempool.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas)
assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d",
len(got), tt.expectedNumTxs, tcIndex)
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestMempoolFilters(t *testing.T) {
}
for tcIndex, tt := range tests {
mempool.Update(1, emptyTxArr, tt.preFilter, tt.postFilter)
checkTxs(t, mempool, tt.numTxsToCreate)
checkTxs(t, mempool, tt.numTxsToCreate, 0)
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex)
mempool.Flush()
}
Expand All @@ -175,7 +175,7 @@ func TestTxsAvailable(t *testing.T) {
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)

// send a bunch of txs, it should only fire once
txs := checkTxs(t, mempool, 100)
txs := checkTxs(t, mempool, 100, 0)
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)

Expand All @@ -190,7 +190,7 @@ func TestTxsAvailable(t *testing.T) {
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)

// send a bunch more txs. we already fired for this height so it shouldnt fire again
moreTxs := checkTxs(t, mempool, 50)
moreTxs := checkTxs(t, mempool, 50, 0)
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)

// now call update with all the txs. it should not fire as there are no txs left
Expand All @@ -201,7 +201,7 @@ func TestTxsAvailable(t *testing.T) {
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)

// send a bunch more txs, it should only fire once
checkTxs(t, mempool, 100)
checkTxs(t, mempool, 100, 0)
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
}
Expand Down Expand Up @@ -324,7 +324,7 @@ func TestCacheRemove(t *testing.T) {
txBytes := make([]byte, 32)
rand.Read(txBytes)
txs[i] = txBytes
cache.Push(txBytes)
cache.Push(txBytes, 0)
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
// make sure its added to both the linked list and the map
require.Equal(t, i+1, len(cache.map_))
require.Equal(t, i+1, cache.list.Len())
Expand Down
38 changes: 37 additions & 1 deletion mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mempool
import (
"fmt"
"reflect"
"sync"
"time"

amino "github.com/tendermint/go-amino"
Expand All @@ -23,17 +24,27 @@ const (
)

// MempoolReactor handles mempool tx broadcasting amongst peers.
// It maintains a map from peer ID to counter, to prevent gossiping txs to the
// peers you received it from.
type MempoolReactor struct {
p2p.BaseReactor
config *cfg.MempoolConfig
Mempool *Mempool

idMtx *sync.RWMutex
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
nextID uint16 // assumes that a node will never have over 65536 peers
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
peerMap map[p2p.ID]uint16
}

// NewMempoolReactor returns a new MempoolReactor with the given config and mempool.
func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor {
memR := &MempoolReactor{
config: config,
Mempool: mempool,

idMtx: &sync.RWMutex{},
peerMap: make(map[p2p.ID]uint16),
nextID: 1, // reserve 0 for mempoolReactor.BroadcastTx
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
}
memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR)
return memR
Expand Down Expand Up @@ -67,11 +78,18 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor.
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (memR *MempoolReactor) AddPeer(peer p2p.Peer) {
memR.idMtx.Lock()
memR.peerMap[peer.ID()] = memR.nextID
memR.nextID++
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
memR.idMtx.Unlock()
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
go memR.broadcastTxRoutine(peer)
}

// RemovePeer implements Reactor.
func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
memR.idMtx.Lock()
delete(memR.peerMap, peer.ID())
memR.idMtx.Unlock()
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
// broadcast routine checks if peer is gone and returns
}

Expand All @@ -88,7 +106,10 @@ func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {

switch msg := msg.(type) {
case *TxMessage:
err := memR.Mempool.CheckTx(msg.Tx, nil)
memR.idMtx.RLock()
peerID := memR.peerMap[src.ID()]
memR.idMtx.RUnlock()
melekes marked this conversation as resolved.
Show resolved Hide resolved
err := memR.Mempool.checkTxFromPeer(msg.Tx, nil, peerID)
if err != nil {
memR.Logger.Info("Could not check tx", "tx", TxID(msg.Tx), "err", err)
}
Expand All @@ -114,6 +135,9 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
return
}

memR.idMtx.RLock()
peerID := memR.peerMap[peer.ID()]
memR.idMtx.RUnlock()
var next *clist.CElement
for {
// This happens because the CElement we were looking at got garbage
Expand Down Expand Up @@ -143,6 +167,18 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
continue
}
}
// ensure peer hasn't already sent us this tx
skip := false
for i := 0; i < len(memTx.senders); i++ {
if peerID == memTx.senders[i] {
next = next.Next()
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
skip = true
break
}
}
if skip {
continue
}
// send memTx
msg := &TxMessage{Tx: memTx.tx}
success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg))
Expand Down