Skip to content

Commit

Permalink
Merge pull request tendermint#3 from BiJie/issue_bc218
Browse files Browse the repository at this point in the history
Add priority lock for CheckTx in mempool and ReCheck ABCI interface
  • Loading branch information
darren-liu committed Nov 8, 2018
2 parents 90eda9b + 1701b9c commit 3821815
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 19 deletions.
1 change: 1 addition & 0 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Client interface {
SetOptionAsync(types.RequestSetOption) *ReqRes
DeliverTxAsync(tx []byte) *ReqRes
CheckTxAsync(tx []byte) *ReqRes
ReCheckTxAsync(tx []byte) *ReqRes
QueryAsync(types.RequestQuery) *ReqRes
CommitAsync() *ReqRes
InitChainAsync(types.RequestInitChain) *ReqRes
Expand Down
9 changes: 9 additions & 0 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,15 @@ func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes {
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_CheckTx{res}})
}

func (cli *grpcClient) ReCheckTxAsync(tx []byte) *ReqRes {
req := types.ToRequestCheckTx(tx)
res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.FailFast(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_CheckTx{res}})
}

func (cli *grpcClient) QueryAsync(params types.RequestQuery) *ReqRes {
req := types.ToRequestQuery(params)
res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.FailFast(true))
Expand Down
10 changes: 10 additions & 0 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ func (app *localClient) CheckTxAsync(tx []byte) *ReqRes {
)
}

func (app *localClient) ReCheckTxAsync(tx []byte) *ReqRes {
app.mtx.Lock()
res := app.Application.ReCheckTx(tx)
app.mtx.Unlock()
return app.callback(
types.ToRequestCheckTx(tx),
types.ToResponseCheckTx(res),
)
}

func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes {
app.mtx.Lock()
res := app.Application.Query(req)
Expand Down
4 changes: 4 additions & 0 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ func (cli *socketClient) CheckTxAsync(tx []byte) *ReqRes {
return cli.queueRequest(types.ToRequestCheckTx(tx))
}

func (cli *socketClient) ReCheckTxAsync(tx []byte) *ReqRes {
return cli.queueRequest(types.ToRequestCheckTx(tx))
}

func (cli *socketClient) QueryAsync(req types.RequestQuery) *ReqRes {
return cli.queueRequest(types.ToRequestQuery(req))
}
Expand Down
4 changes: 4 additions & 0 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (app *PersistentKVStoreApplication) CheckTx(tx []byte) types.ResponseCheckT
return app.app.CheckTx(tx)
}

func (app *PersistentKVStoreApplication) ReCheckTx(tx []byte) types.ResponseCheckTx {
return app.app.CheckTx(tx)
}

// Commit will panic if InitChain was not called
func (app *PersistentKVStoreApplication) Commit() types.ResponseCommit {
return app.app.Commit()
Expand Down
7 changes: 6 additions & 1 deletion abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ type Application interface {
Query(RequestQuery) ResponseQuery // Query for state

// Mempool Connection
CheckTx(tx []byte) ResponseCheckTx // Validate a tx for the mempool
CheckTx(tx []byte) ResponseCheckTx // Validate a tx for the mempool
ReCheckTx(tx []byte) ResponseCheckTx // Validate a tx for the mempool

// Consensus Connection
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain with validators and other info from TendermintCore
Expand Down Expand Up @@ -53,6 +54,10 @@ func (BaseApplication) CheckTx(tx []byte) ResponseCheckTx {
return ResponseCheckTx{Code: CodeTypeOK}
}

func (BaseApplication) ReCheckTx(tx []byte) ResponseCheckTx {
return ResponseCheckTx{Code: CodeTypeOK}
}

func (BaseApplication) Commit() ResponseCommit {
return ResponseCommit{}
}
Expand Down
13 changes: 9 additions & 4 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ func deliverTxsRange(cs *ConsensusState, start, end int) {

func TestMempoolTxConcurrentWithCommit(t *testing.T) {
state, privVals := randGenesisState(1, false, 10)
cs := newConsensusState(state, privVals[0], NewCounterApplication())
// checkTx and block mutex are not purely FIFO, so we don't need to stick
// to the counter sequence
app := NewCounterApplication()
app.serial = false
cs := newConsensusState(state, privVals[0], app)
height, round := cs.Height, cs.Round
newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)

Expand Down Expand Up @@ -182,10 +186,11 @@ type CounterApplication struct {

txCount int
mempoolTxCount int
serial bool
}

func NewCounterApplication() *CounterApplication {
return &CounterApplication{}
return &CounterApplication{serial: true}
}

func (app *CounterApplication) Info(req abci.RequestInfo) abci.ResponseInfo {
Expand All @@ -194,7 +199,7 @@ func (app *CounterApplication) Info(req abci.RequestInfo) abci.ResponseInfo {

func (app *CounterApplication) DeliverTx(tx []byte) abci.ResponseDeliverTx {
txValue := txAsUint64(tx)
if txValue != uint64(app.txCount) {
if app.serial && txValue != uint64(app.txCount) {
return abci.ResponseDeliverTx{
Code: code.CodeTypeBadNonce,
Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.txCount, txValue)}
Expand All @@ -205,7 +210,7 @@ func (app *CounterApplication) DeliverTx(tx []byte) abci.ResponseDeliverTx {

func (app *CounterApplication) CheckTx(tx []byte) abci.ResponseCheckTx {
txValue := txAsUint64(tx)
if txValue != uint64(app.mempoolTxCount) {
if app.serial && txValue != uint64(app.mempoolTxCount) {
return abci.ResponseCheckTx{
Code: code.CodeTypeBadNonce,
Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.mempoolTxCount, txValue)}
Expand Down
45 changes: 31 additions & 14 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func TxID(tx []byte) string {
type Mempool struct {
config *cfg.MempoolConfig

proxyMtx sync.Mutex
proxyLowMtx sync.Mutex
proxyNextMtx sync.Mutex
proxyBlockingMtx sync.Mutex
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
counter int64 // simple incrementing counter
Expand Down Expand Up @@ -196,8 +198,8 @@ func (mem *Mempool) CloseWAL() bool {
return false
}

mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
mem.Lock()
defer mem.Unlock()

if mem.wal == nil {
return false
Expand Down Expand Up @@ -226,12 +228,27 @@ func (mem *Mempool) InitWAL() {

// Lock locks the mempool. The consensus must be able to hold lock to safely update.
func (mem *Mempool) Lock() {
mem.proxyMtx.Lock()
mem.proxyNextMtx.Lock()
mem.proxyBlockingMtx.Lock()
mem.proxyNextMtx.Unlock()
}

// Unlock unlocks the mempool.
func (mem *Mempool) Unlock() {
mem.proxyMtx.Unlock()
mem.proxyBlockingMtx.Unlock()
}

//LockLow uses triple mutex to low the priority of CheckTx()
func (mem *Mempool) LockLow() {
mem.proxyLowMtx.Lock()
mem.proxyNextMtx.Lock()
mem.proxyBlockingMtx.Lock()
mem.proxyNextMtx.Unlock()
}

func (mem *Mempool) UnlockLow() {
mem.proxyBlockingMtx.Unlock()
mem.proxyLowMtx.Unlock()
}

// Size returns the number of transactions in the mempool.
Expand All @@ -247,8 +264,8 @@ func (mem *Mempool) FlushAppConn() error {

// Flush removes all transactions from the mempool and cache
func (mem *Mempool) Flush() {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
mem.Lock()
defer mem.Unlock()

mem.cache.Reset()

Expand Down Expand Up @@ -278,8 +295,8 @@ func (mem *Mempool) TxsWaitChan() <-chan struct{} {
// It gets called from another goroutine.
// CONTRACT: Either cb will get called, or err returned.
func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
mem.LockLow()
defer mem.UnlockLow()

if mem.Size() >= mem.config.Size {
return ErrMempoolIsFull
Expand Down Expand Up @@ -428,8 +445,8 @@ func (mem *Mempool) notifyTxsAvailable() {
// If both maxes are negative, there is no cap on the size of all returned
// transactions (~ all available transactions).
func (mem *Mempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
mem.Lock()
defer mem.Unlock()

for atomic.LoadInt32(&mem.rechecking) > 0 {
// TODO: Something better?
Expand Down Expand Up @@ -464,8 +481,8 @@ func (mem *Mempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
// If max is negative, there is no cap on the size of all returned
// transactions (~ all available transactions).
func (mem *Mempool) ReapMaxTxs(max int) types.Txs {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
mem.Lock()
defer mem.Unlock()

if max < 0 {
max = mem.txs.Len()
Expand Down Expand Up @@ -560,7 +577,7 @@ func (mem *Mempool) recheckTxs(goodTxs []types.Tx) {
// Push txs to proxyAppConn
// NOTE: resCb() may be called concurrently.
for _, tx := range goodTxs {
mem.proxyAppConn.CheckTxAsync(tx)
mem.proxyAppConn.ReCheckTxAsync(tx)
}
mem.proxyAppConn.FlushAsync()
}
Expand Down
102 changes: 102 additions & 0 deletions mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -219,6 +220,107 @@ func TestTxsAvailable(t *testing.T) {
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
}

type SleepCounterApplication struct {
*counter.CounterApplication
wg *sync.WaitGroup
}

func NewSleepCounterApplication(f bool, i int) *SleepCounterApplication {
wg := &sync.WaitGroup{}
wg.Add(i)
return &SleepCounterApplication{counter.NewCounterApplication(f), wg}
}

func (app *SleepCounterApplication) CheckTx(tx []byte) abci.ResponseCheckTx {
res := app.CounterApplication.CheckTx(tx)
app.wg.Wait()
return res
}

func TestReapPriority(t *testing.T) {
TotalTx := 15
app := NewSleepCounterApplication(false, TotalTx)
cc := proxy.NewLocalClientCreator(app)

mempool := newMempoolWithApp(cc)
appConnCon, _ := cc.NewABCIClient()
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
err := appConnCon.Start()
require.Nil(t, err)

testResult := make(chan string, 100)
var wg sync.WaitGroup
j := 0
checkTxs := func(i int) {
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(i))
mempool.CheckTx(txBytes, nil)
wg.Done()
}
//seqReap := make(chan int, 5)

reapCheck := func(threshold int) {
//for threshold := range seqReap {
txs := mempool.ReapMaxBytesMaxGas(-1, -1)

if len(txs) >= threshold {
str := fmt.Sprintf("Reap failed to have priority, %v > %v\n", len(txs), threshold)
fmt.Print(str)
testResult <- str
} else {
fmt.Printf("Priority reaping: %v < %v\n", len(txs), threshold)
}
j += len(txs)
if err := mempool.Update(0, txs, nil, nil); err != nil {
testResult <- err.Error()
}
for _, txBytes := range txs {

res, err := appConnCon.DeliverTxSync(txBytes)
if err != nil {
testResult <- fmt.Sprintf("Client error committing tx: %v", err)
}
if res.IsErr() {
testResult <- fmt.Sprintf("Error committing tx. Code:%v result:%X log:%v",
res.Code, res.Data, res.Log)
}
// fmt.Println("delivered")
}

res, err := appConnCon.CommitSync()
if err != nil {
testResult <- fmt.Sprintf("Client error committing: %v", err)
}
if len(res.Data) != 8 {
testResult <- fmt.Sprintf("Error committing. Hash:%X", res.Data)
}

//}

}

//go reapCheck()
wg.Add(TotalTx)
for i := 1; i <= TotalTx; i++ {
fmt.Printf("Insert checkTX:%v\n", i)
go checkTxs(i)
}
//close(seqReap)
time.Sleep(time.Millisecond)
for i := 1; i <= TotalTx; i++ {
app.wg.Done()
}
reapCheck(TotalTx)
wg.Wait()
close(testResult)
k := 0
for s := range testResult {
t.Log(s)
k++
}
require.Equal(t, 0, k)
}

func TestSerialReap(t *testing.T) {
app := counter.NewCounterApplication(true)
app.SetOption(abci.RequestSetOption{Key: "serial", Value: "on"})
Expand Down
5 changes: 5 additions & 0 deletions proxy/app_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type AppConnMempool interface {
Error() error

CheckTxAsync(tx []byte) *abcicli.ReqRes
ReCheckTxAsync(tx []byte) *abcicli.ReqRes

FlushAsync() *abcicli.ReqRes
FlushSync() error
Expand Down Expand Up @@ -114,6 +115,10 @@ func (app *appConnMempool) CheckTxAsync(tx []byte) *abcicli.ReqRes {
return app.appConn.CheckTxAsync(tx)
}

func (app *appConnMempool) ReCheckTxAsync(tx []byte) *abcicli.ReqRes {
return app.appConn.ReCheckTxAsync(tx)
}

//------------------------------------------------
// Implements AppConnQuery (subset of abcicli.Client)

Expand Down

0 comments on commit 3821815

Please sign in to comment.