Skip to content

Commit

Permalink
abci/grpc: return async responses in order (#5520)
Browse files Browse the repository at this point in the history
Fixes #5439. This is really a workaround for #5519 (unless we require async implementations to return ordered responses, but that kind of defeats the purpose of having an async API).
  • Loading branch information
erikgrinaker committed Oct 20, 2020
1 parent 6e16df8 commit 047267b
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 22 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Expand Up @@ -26,4 +26,6 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi

### BUG FIXES

- [abci/grpc] \#5520 Return async responses in order, to avoid mempool panics. (@erikgrinaker)

- [types] \#5523 Change json naming of `PartSetHeader` within `BlockID` from `parts` to `part_set_header` (@marbar3778)
65 changes: 43 additions & 22 deletions abci/client/grpc_client.go
Expand Up @@ -22,8 +22,9 @@ type grpcClient struct {
service.BaseService
mustConnect bool

client types.ABCIApplicationClient
conn *grpc.ClientConn
client types.ABCIApplicationClient
conn *grpc.ClientConn
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool

mtx tmsync.Mutex
addr string
Expand All @@ -35,6 +36,13 @@ func NewGRPCClient(addr string, mustConnect bool) Client {
cli := &grpcClient{
addr: addr,
mustConnect: mustConnect,
// Buffering the channel is needed to make calls appear asynchronous,
// which is required when the caller makes multiple async calls before
// processing callbacks (e.g. due to holding locks). 64 means that a
// caller can make up to 64 async calls before a callback must be
// processed (otherwise it deadlocks). It also means that we can make 64
// gRPC calls while processing a slow callback at the channel head.
chReqRes: make(chan *ReqRes, 64),
}
cli.BaseService = *service.NewBaseService(nil, "grpcClient", cli)
return cli
Expand All @@ -48,6 +56,34 @@ func (cli *grpcClient) OnStart() error {
if err := cli.BaseService.OnStart(); err != nil {
return err
}

// This processes asynchronous request/response messages and dispatches
// them to callbacks.
go func() {
// Use a separate function to use defer for mutex unlocks (this handles panics)
callCb := func(reqres *ReqRes) {
cli.mtx.Lock()
defer cli.mtx.Unlock()

// Notify client listener if set
if cli.resCb != nil {
cli.resCb(reqres.Request, reqres.Response)
}

// Notify reqRes listener if set
if cb := reqres.GetCallback(); cb != nil {
cb(reqres.Response)
}
}
for reqres := range cli.chReqRes {
if reqres != nil {
callCb(reqres)
} else {
cli.Logger.Error("Received nil reqres")
}
}
}()

RETRY_LOOP:
for {
conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc))
Expand Down Expand Up @@ -85,6 +121,7 @@ func (cli *grpcClient) OnStop() {
if cli.conn != nil {
cli.conn.Close()
}
close(cli.chReqRes)
}

func (cli *grpcClient) StopForError(err error) {
Expand Down Expand Up @@ -254,26 +291,10 @@ func (cli *grpcClient) ApplySnapshotChunkAsync(params types.RequestApplySnapshot

func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
reqres := NewReqRes(req)
reqres.Response = res // Set response
reqres.Done() // Release waiters
reqres.SetDone() // so reqRes.SetCallback will run the callback

// goroutine for callbacks
go func() {
cli.mtx.Lock()
defer cli.mtx.Unlock()

// Notify client listener if set
if cli.resCb != nil {
cli.resCb(reqres.Request, res)
}

// Notify reqRes listener if set
if cb := reqres.GetCallback(); cb != nil {
cb(res)
}
}()

reqres.Response = res // Set response
reqres.Done() // Release waiters
reqres.SetDone() // so reqRes.SetCallback will run the callback
cli.chReqRes <- reqres // use channel for async responses, since they must be ordered
return reqres
}

Expand Down

0 comments on commit 047267b

Please sign in to comment.