Skip to content

Commit

Permalink
mempool: ensure async requests are flushed to the server (#9010)
Browse files Browse the repository at this point in the history
In the v0.34 line, the socket and gRPC clients require explicit flushes to
ensure that the client and server have received an async request.  Add these
calls explicitly where required in the backport of the priority mempool.

In addition, the gRPC client's flush plumbing was not fully hooked up in the
v0.34 line, so this change includes that update as well.
  • Loading branch information
M. J. Fromberger committed Jul 14, 2022
1 parent ba1711e commit 223ece9
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
4 changes: 3 additions & 1 deletion abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,9 @@ func (cli *grpcClient) finishSyncCall(reqres *ReqRes) *types.Response {
//----------------------------------------

func (cli *grpcClient) FlushSync() error {
return nil
reqres := cli.FlushAsync()
cli.finishSyncCall(reqres).GetFlush()
return cli.Error()
}

func (cli *grpcClient) EchoSync(msg string) (*types.ResponseEcho, error) {
Expand Down
7 changes: 7 additions & 0 deletions mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp
// the callback deadlock trying to acquire the same lock. This isn't a
// problem with out-of-process calls, but this has to work for both.
reqRes := txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
if err := txmp.proxyAppConn.FlushSync(); err != nil {
return err
}
reqRes.SetCallback(func(res *abci.Response) {
wtx := &WrappedTx{
tx: tx,
Expand Down Expand Up @@ -722,6 +725,10 @@ func (txmp *TxMempool) recheckTransactions() {
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err := txmp.proxyAppConn.FlushSync(); err != nil {
atomic.AddInt64(&txmp.txRecheck, -1)
txmp.logger.Error("mempool: error flushing re-CheckTx", "key", wtx.tx.Key(), "err", err)
}
}

txmp.proxyAppConn.FlushAsync()
Expand Down

0 comments on commit 223ece9

Please sign in to comment.