Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mempool no gossip back #2778

Merged
merged 41 commits into from
Mar 26, 2019
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
06d5ffd
Make the mempool not gossip txs back to peers its received it from
ValarDragon Nov 7, 2018
3904e81
Fix adversarial memleak
ValarDragon Nov 7, 2018
7842608
Don't break interface
ValarDragon Nov 7, 2018
167b720
Update changelog
ValarDragon Nov 7, 2018
fcc1a2b
Merge branch 'develop' into dev/mempool_no_gossip_back
ValarDragon Nov 7, 2018
aba2c93
Forgot to add a mtx
ValarDragon Nov 7, 2018
6da7a91
Merge branch 'dev/mempool_no_gossip_back' of github.com:tendermint/te…
ValarDragon Nov 7, 2018
3b10fe6
forgot a mutex
ValarDragon Nov 7, 2018
ca68847
Update mempool/reactor.go
melekes Nov 10, 2018
4fbed56
Update mempool/mempool.go
melekes Nov 10, 2018
cb1fca6
Merge branch 'develop' into dev/mempool_no_gossip_back
ebuchman Nov 11, 2018
6933dd1
Use unknown peer ID
melekes Nov 14, 2018
30d32dd
fix compilation
ValarDragon Nov 14, 2018
ef34f13
Merge branch 'develop' into dev/mempool_no_gossip_back
ValarDragon Nov 14, 2018
7d5415d
use next wait chan logic when skipping
ValarDragon Nov 14, 2018
5a296eb
Minor fixes
ValarDragon Nov 14, 2018
5174032
Add TxInfo
ValarDragon Nov 20, 2018
e086a3a
Add reverse map
ValarDragon Nov 21, 2018
4fd49d2
Make activeID's auto-reserve 0
ValarDragon Nov 21, 2018
8893dc1
0 -> UnknownPeerID
melekes Nov 26, 2018
ef84519
Switch to making the normal case set a callback on the reqres object
ValarDragon Nov 28, 2018
1c04fc6
Merge branch 'develop' into dev/mempool_no_gossip_back
ValarDragon Nov 29, 2018
5c4a5ce
fix merge conflict
ValarDragon Nov 29, 2018
df48b07
Addres comments
ValarDragon Dec 16, 2018
7953d8b
Add cache tests
ValarDragon Dec 16, 2018
e08c050
add cache tests
ValarDragon Dec 16, 2018
0fd4789
minor fixes
ebuchman Dec 17, 2018
681bde0
Merge branch 'develop' into dev/mempool_no_gossip_back
ebuchman Dec 21, 2018
43bfa6a
Merge branch 'develop' into dev/mempool_no_gossip_back
melekes Jan 24, 2019
e1cb280
Merge branch 'develop' into dev/mempool_no_gossip_back
melekes Feb 27, 2019
b32b199
update metrics in reqResCb and reformat code
melekes Feb 27, 2019
1f84487
goimport -w mempool/reactor.go
melekes Feb 27, 2019
bf3afc0
mempool: update memTx senders
melekes Feb 27, 2019
79acc63
change senders type from []uint16 to sync.Map
melekes Feb 27, 2019
53bb106
explain the choice of a map DS for senders
melekes Feb 27, 2019
28d9ef5
extract ids pool/mapper to a separate struct
melekes Feb 27, 2019
525a430
fix literal copies lock value from senders: sync.Map contains sync.Mutex
melekes Feb 27, 2019
6e6cbb0
use sync.Map#LoadOrStore instead of Load
melekes Feb 27, 2019
c414f54
Merge branch 'develop' into dev/mempool_no_gossip_back
melekes Mar 25, 2019
e3f376d
fixes after Ismail's review
melekes Mar 25, 2019
58d0cf7
rename resCbNormal to resCbFirstTime
melekes Mar 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Special thanks to external contributors on this release:
### IMPROVEMENTS:
- [libs/common] \#3238 exit with zero (0) code upon receiving SIGTERM/SIGINT

- [mempool] \#2778 No longer send txs back to peers who sent it to you

### BUG FIXES:

- [p2p/conn] \#3347 Reject all-zero shared secrets in the Diffie-Hellman step of secret-connection
Expand Down
2 changes: 2 additions & 0 deletions docs/spec/reactors/mempool/reactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ for details.

Sending incorrectly encoded data or data exceeding `maxMsgSize` will result
in stopping the peer.

The mempool will not send a tx back to any peer which it received it from.
13 changes: 13 additions & 0 deletions mempool/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@ func BenchmarkReap(b *testing.B) {
}
}

func BenchmarkCheckTx(b *testing.B) {
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
app := kvstore.NewKVStoreApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
defer cleanup()

for i := 0; i < b.N; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTx(tx, nil)
}
}

func BenchmarkCacheInsertTime(b *testing.B) {
cache := newMapTxCache(b.N)
txs := make([][]byte, b.N)
Expand Down
101 changes: 101 additions & 0 deletions mempool/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package mempool

import (
"crypto/rand"
"crypto/sha256"
"testing"

"github.com/stretchr/testify/require"

"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)

func TestCacheRemove(t *testing.T) {
cache := newMapTxCache(100)
numTxs := 10
txs := make([][]byte, numTxs)
for i := 0; i < numTxs; i++ {
// probability of collision is 2**-256
txBytes := make([]byte, 32)
rand.Read(txBytes)
txs[i] = txBytes
cache.Push(txBytes)
// make sure its added to both the linked list and the map
require.Equal(t, i+1, len(cache.map_))
require.Equal(t, i+1, cache.list.Len())
}
for i := 0; i < numTxs; i++ {
cache.Remove(txs[i])
// make sure its removed from both the map and the linked list
require.Equal(t, numTxs-(i+1), len(cache.map_))
require.Equal(t, numTxs-(i+1), cache.list.Len())
}
}

func TestCacheAfterUpdate(t *testing.T) {
app := kvstore.NewKVStoreApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
defer cleanup()

// reAddIndices & txsInCache can have elements > numTxsToCreate
// also assumes max index is 255 for convenience
// txs in cache also checks order of elements
tests := []struct {
numTxsToCreate int
updateIndices []int
reAddIndices []int
txsInCache []int
}{
{1, []int{}, []int{1}, []int{1, 0}}, // adding new txs works
{2, []int{1}, []int{}, []int{1, 0}}, // update doesn't remove tx from cache
{2, []int{2}, []int{}, []int{2, 1, 0}}, // update adds new tx to cache
{2, []int{1}, []int{1}, []int{1, 0}}, // re-adding after update doesn't make dupe
}
for tcIndex, tc := range tests {
for i := 0; i < tc.numTxsToCreate; i++ {
tx := types.Tx{byte(i)}
err := mempool.CheckTx(tx, nil)
require.NoError(t, err)
}

updateTxs := []types.Tx{}
for _, v := range tc.updateIndices {
tx := types.Tx{byte(v)}
updateTxs = append(updateTxs, tx)
}
mempool.Update(int64(tcIndex), updateTxs, nil, nil)

for _, v := range tc.reAddIndices {
tx := types.Tx{byte(v)}
_ = mempool.CheckTx(tx, nil)
}

cache := mempool.cache.(*mapTxCache)
node := cache.list.Front()
counter := 0
for node != nil {
require.NotEqual(t, len(tc.txsInCache), counter,
"cache larger than expected on testcase %d", tcIndex)

nodeVal := node.Value.([sha256.Size]byte)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we launched: #2187 🚀

expectedBz := sha256.Sum256([]byte{byte(tc.txsInCache[len(tc.txsInCache)-counter-1])})
// Reference for reading the errors:
// >>> sha256('\x00').hexdigest()
// '6e340b9cffb37a989ca544e6bb780a2c78901d3fb33738768511a30617afa01d'
// >>> sha256('\x01').hexdigest()
// '4bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a'
// >>> sha256('\x02').hexdigest()
// 'dbc1b4c900ffe48d575b5da5c638040125f65db0fe3e24494b76ea986457d986'

require.Equal(t, expectedBz, nodeVal, "Equality failed on index %d, tc %d", counter, tcIndex)
counter++
node = node.Next()
}
require.Equal(t, len(tc.txsInCache), counter,
"cache smaller than expected on testcase %d", tcIndex)
mempool.Flush()
}
}
116 changes: 89 additions & 27 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ type PreCheckFunc func(types.Tx) error
// transaction doesn't require more gas than available for the block.
type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error

// TxInfo are parameters that get passed when attempting to add a tx to the
// mempool.
type TxInfo struct {
PeerID uint16
melekes marked this conversation as resolved.
Show resolved Hide resolved
}

/*

The mempool pushes new txs onto the proxyAppConn.
Expand Down Expand Up @@ -150,7 +156,8 @@ type Mempool struct {

proxyMtx sync.Mutex
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
txs *clist.CList // concurrent linked-list of good txs
txsMap map[[sha256.Size]byte]*clist.CElement
melekes marked this conversation as resolved.
Show resolved Hide resolved
height int64 // the last block Update()'d to
rechecking int32 // for re-checking filtered txs on Update()
recheckCursor *clist.CElement // next expected response
Expand Down Expand Up @@ -189,6 +196,7 @@ func NewMempool(
config: config,
proxyAppConn: proxyAppConn,
txs: clist.New(),
txsMap: make(map[[sha256.Size]byte]*clist.CElement),
height: height,
rechecking: 0,
recheckCursor: nil,
Expand Down Expand Up @@ -304,6 +312,7 @@ func (mem *Mempool) Flush() {
e.DetachPrev()
}

mem.txsMap = make(map[[sha256.Size]byte]*clist.CElement)
_ = atomic.SwapInt64(&mem.txsBytes, 0)
}

Expand All @@ -327,6 +336,13 @@ func (mem *Mempool) TxsWaitChan() <-chan struct{} {
// It gets called from another goroutine.
// CONTRACT: Either cb will get called, or err returned.
func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
melekes marked this conversation as resolved.
Show resolved Hide resolved
return mem.CheckTxWithInfo(tx, cb, TxInfo{PeerID: UnknownPeerID})
}

// CheckTxWithInfo performs the same operation as CheckTx, but with extra meta data about the tx.
// Currently this metadata is the peer who sent it,
// used to prevent the tx from being gossiped back to them.
func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) {
ValarDragon marked this conversation as resolved.
Show resolved Hide resolved
mem.proxyMtx.Lock()
// use defer to unlock mutex because application (*local client*) might panic
defer mem.proxyMtx.Unlock()
Expand Down Expand Up @@ -357,6 +373,17 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {

// CACHE
if !mem.cache.Push(tx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we skip this cache check stuff entirely if the sender is the RPC, ie ID=0 ? Ref #3414 (comment)

// record the sender
e, ok := mem.txsMap[sha256.Sum256(tx)]
if ok { // tx may be in cache, but not in the mempool
memTx := e.Value.(*mempoolTx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this guaranteed to be non nil ?

Also how does the tx end up in the txsMap but not the mempool?

We shouldn't call this the cache in the comment, since mem.cache is something else ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the comment is confusing... if ok here means tx resides in the mempool (txsMap and txs hold transactions). But tx can be only in the cache and not present in the mempool (txsMap and txs). When we've committed a block, txs are removed from the mempool, but not from cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this guaranteed to be non nil ?

yes

if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded {
// TODO: consider punishing peer for dups,
// its non-trivial since invalid txs can become valid,
// but they can spam the same tx with little cost to them atm.
melekes marked this conversation as resolved.
Show resolved Hide resolved
}
}

return ErrTxInCache
}
// END CACHE
Expand All @@ -381,27 +408,58 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
}
reqRes := mem.proxyAppConn.CheckTxAsync(tx)
if cb != nil {
reqRes.SetCallback(cb)
composedCallback := func(res *abci.Response) {
mem.reqResCb(tx, txInfo.PeerID)(res)
ebuchman marked this conversation as resolved.
Show resolved Hide resolved
cb(res)
}
reqRes.SetCallback(composedCallback)
} else {
reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID))
}

return nil
}

// ABCI callback function
// ABCI callback function. This is used for handling rechecks, as the normal
// case is handled by the reqRes callback so it can incorporate local
// information.
func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) {
melekes marked this conversation as resolved.
Show resolved Hide resolved
if mem.recheckCursor == nil {
mem.resCbNormal(req, res)
} else {
mem.metrics.RecheckTimes.Add(1)
mem.resCbRecheck(req, res)
return
}

mem.metrics.RecheckTimes.Add(1)
mem.resCbRecheck(req, res)

// update metrics
mem.metrics.Size.Set(float64(mem.Size()))
}

func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
// ABCI request result callback function incorporates local information, like
// the peer that sent us this tx, so we can avoid sending it back to the same
// peer.
func (mem *Mempool) reqResCb(tx []byte, peerID uint16) func(res *abci.Response) {
return func(res *abci.Response) {
if mem.recheckCursor != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this possible? Seems like it shouldn't be, and if it is, like it'd be a race condition

return
}
mem.resCbNormal(tx, peerID, res)
liamsi marked this conversation as resolved.
Show resolved Hide resolved

// update metrics
mem.metrics.Size.Set(float64(mem.Size()))
}
}

func (mem *Mempool) addTx(memTx *mempoolTx) {
e := mem.txs.PushBack(memTx)
mem.txsMap[sha256.Sum256(memTx.tx)] = e
atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx)))
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx)))
}

func (mem *Mempool) resCbNormal(tx []byte, peerID uint16, res *abci.Response) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename the sthsthCb methods?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resCbFirstTime?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that would be a bit better. I think it's OK to merge without renaming even (as these methods are not exported).

switch r := res.Value.(type) {
case *abci.Response_CheckTx:
tx := req.GetCheckTx().Tx
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
Expand All @@ -412,15 +470,14 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
gasWanted: r.CheckTx.GasWanted,
tx: tx,
}
mem.txs.PushBack(memTx)
atomic.AddInt64(&mem.txsBytes, int64(len(tx)))
memTx.senders.Store(peerID, true)
mem.addTx(memTx)
mem.logger.Info("Added good transaction",
"tx", TxID(tx),
"res", r,
"height", memTx.height,
"total", mem.Size(),
)
mem.metrics.TxSizeBytes.Observe(float64(len(tx)))
mem.notifyTxsAvailable()
} else {
// ignore bad transaction
Expand All @@ -434,6 +491,17 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
}
}

func (mem *Mempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) {
mem.txs.Remove(elem)
elem.DetachPrev()
delete(mem.txsMap, sha256.Sum256(tx))
atomic.AddInt64(&mem.txsBytes, int64(-len(tx)))

if removeFromCache {
mem.cache.Remove(tx)
}
}

func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
Expand All @@ -454,12 +522,8 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
} else {
// Tx became invalidated due to newly committed block.
mem.logger.Info("Tx is no longer valid", "tx", TxID(tx), "res", r, "err", postCheckErr)
mem.txs.Remove(mem.recheckCursor)
atomic.AddInt64(&mem.txsBytes, int64(-len(tx)))
mem.recheckCursor.DetachPrev()

// remove from cache (it might be good later)
mem.cache.Remove(tx)
// NOTE: we remove tx from the cache because it might be good later
mem.removeTx(tx, mem.recheckCursor, true)
}
if mem.recheckCursor == mem.recheckEnd {
mem.recheckCursor = nil
Expand Down Expand Up @@ -627,12 +691,9 @@ func (mem *Mempool) removeTxs(txs types.Txs) []types.Tx {
memTx := e.Value.(*mempoolTx)
// Remove the tx if it's already in a block.
if _, ok := txsMap[string(memTx.tx)]; ok {
// remove from clist
mem.txs.Remove(e)
atomic.AddInt64(&mem.txsBytes, int64(-len(memTx.tx)))
e.DetachPrev()

// NOTE: we don't remove committed txs from the cache.
mem.removeTx(memTx.tx, e, false)

continue
}
txsLeft = append(txsLeft, memTx.tx)
Expand Down Expand Up @@ -663,6 +724,7 @@ func (mem *Mempool) recheckTxs(txs []types.Tx) {
type mempoolTx struct {
height int64 // height that this tx had been validated in
gasWanted int64 // amount of gas this tx states it will require
senders sync.Map // ids of peers who've sent us this tx (as a map for quick lookups)
tx types.Tx //
}

Expand Down Expand Up @@ -707,8 +769,8 @@ func (cache *mapTxCache) Reset() {
cache.mtx.Unlock()
}

// Push adds the given tx to the cache and returns true. It returns false if tx
// is already in the cache.
// Push adds the given tx to the cache and returns true. It returns
// false if tx is already in the cache.
func (cache *mapTxCache) Push(tx types.Tx) bool {
cache.mtx.Lock()
defer cache.mtx.Unlock()
Expand All @@ -728,8 +790,8 @@ func (cache *mapTxCache) Push(tx types.Tx) bool {
cache.list.Remove(popped)
}
}
cache.list.PushBack(txHash)
cache.map_[txHash] = cache.list.Back()
e := cache.list.PushBack(txHash)
cache.map_[txHash] = e
return true
}

Expand Down