Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

abci/grpc: return async responses in order #5520

Merged
merged 6 commits into from Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Expand Up @@ -25,3 +25,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [statesync] \#5516 Check that all heights necessary to rebuild state for a snapshot exist before adding the snapshot to the pool. (@erikgrinaker)

### BUG FIXES
- [abci/grpc] \#5520 Return async responses in order, to avoid mempool panics. (@erikgrinaker)
59 changes: 37 additions & 22 deletions abci/client/grpc_client.go
Expand Up @@ -22,8 +22,9 @@ type grpcClient struct {
service.BaseService
mustConnect bool

client types.ABCIApplicationClient
conn *grpc.ClientConn
client types.ABCIApplicationClient
conn *grpc.ClientConn
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool

mtx tmsync.Mutex
addr string
Expand All @@ -35,6 +36,7 @@ func NewGRPCClient(addr string, mustConnect bool) Client {
cli := &grpcClient{
addr: addr,
mustConnect: mustConnect,
chReqRes: make(chan *ReqRes, 64),
melekes marked this conversation as resolved.
Show resolved Hide resolved
}
cli.BaseService = *service.NewBaseService(nil, "grpcClient", cli)
return cli
Expand All @@ -48,6 +50,34 @@ func (cli *grpcClient) OnStart() error {
if err := cli.BaseService.OnStart(); err != nil {
return err
}

// This processes asynchronous request/response messages and dispatches
// them to callbacks.
go func() {
// Use a separate function to use defer for mutex unlocks (this handles panics)
callCb := func(reqres *ReqRes) {
cli.mtx.Lock()
defer cli.mtx.Unlock()

// Notify client listener if set
if cli.resCb != nil {
cli.resCb(reqres.Request, reqres.Response)
}

// Notify reqRes listener if set
if cb := reqres.GetCallback(); cb != nil {
cb(reqres.Response)
}
}
for reqres := range cli.chReqRes {
if reqres != nil {
callCb(reqres)
} else {
cli.Logger.Error("Received nil reqres")
}
}
}()

RETRY_LOOP:
for {
conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc))
Expand Down Expand Up @@ -85,6 +115,7 @@ func (cli *grpcClient) OnStop() {
if cli.conn != nil {
cli.conn.Close()
}
close(cli.chReqRes)
}

func (cli *grpcClient) StopForError(err error) {
Expand Down Expand Up @@ -254,26 +285,10 @@ func (cli *grpcClient) ApplySnapshotChunkAsync(params types.RequestApplySnapshot

func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
reqres := NewReqRes(req)
reqres.Response = res // Set response
reqres.Done() // Release waiters
reqres.SetDone() // so reqRes.SetCallback will run the callback

// goroutine for callbacks
go func() {
cli.mtx.Lock()
defer cli.mtx.Unlock()

// Notify client listener if set
if cli.resCb != nil {
cli.resCb(reqres.Request, res)
}

// Notify reqRes listener if set
if cb := reqres.GetCallback(); cb != nil {
cb(res)
}
}()

reqres.Response = res // Set response
reqres.Done() // Release waiters
reqres.SetDone() // so reqRes.SetCallback will run the callback
cli.chReqRes <- reqres // use channel for async responses, since they must be ordered
return reqres
}

Expand Down