Skip to content

Commit

Permalink
invoke callbacks when set late in socket client (#8331)
Browse files Browse the repository at this point in the history
  • Loading branch information
williambanfield committed Apr 13, 2022
1 parent efddab0 commit da6ec8f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 19 deletions.
26 changes: 13 additions & 13 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,15 @@ type ReqRes struct {
*sync.WaitGroup
*types.Response // Not set atomically, so be sure to use WaitGroup.

mtx tmsync.Mutex
done bool // Gets set to true once *after* WaitGroup.Done().
cb func(*types.Response) // A single callback that may be set.
mtx tmsync.Mutex

// callbackInvoked as a variable to track if the callback was already
// invoked during the regular execution of the request. This variable
// allows clients to set the callback simultaneously without potentially
// invoking the callback twice by accident, once when 'SetCallback' is
// called and once during the normal request.
callbackInvoked bool
cb func(*types.Response) // A single callback that may be set.
}

func NewReqRes(req *types.Request) *ReqRes {
Expand All @@ -92,8 +98,8 @@ func NewReqRes(req *types.Request) *ReqRes {
WaitGroup: waitGroup1(),
Response: nil,

done: false,
cb: nil,
callbackInvoked: false,
cb: nil,
}
}

Expand All @@ -103,7 +109,7 @@ func NewReqRes(req *types.Request) *ReqRes {
func (r *ReqRes) SetCallback(cb func(res *types.Response)) {
r.mtx.Lock()

if r.done {
if r.callbackInvoked {
r.mtx.Unlock()
cb(r.Response)
return
Expand All @@ -122,6 +128,7 @@ func (r *ReqRes) InvokeCallback() {
if r.cb != nil {
r.cb(r.Response)
}
r.callbackInvoked = true
}

// GetCallback returns the configured callback of the ReqRes object which may be
Expand All @@ -136,13 +143,6 @@ func (r *ReqRes) GetCallback() func(*types.Response) {
return r.cb
}

// SetDone marks the ReqRes object as done.
func (r *ReqRes) SetDone() {
r.mtx.Lock()
r.done = true
r.mtx.Unlock()
}

func waitGroup1() (wg *sync.WaitGroup) {
wg = &sync.WaitGroup{}
wg.Add(1)
Expand Down
5 changes: 1 addition & 4 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func (cli *grpcClient) OnStart() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()

reqres.SetDone()
reqres.Done()

// Notify client listener if set
Expand All @@ -75,9 +74,7 @@ func (cli *grpcClient) OnStart() error {
}

// Notify reqRes listener if set
if cb := reqres.GetCallback(); cb != nil {
cb(reqres.Response)
}
reqres.InvokeCallback()
}
for reqres := range cli.chReqRes {
if reqres != nil {
Expand Down
5 changes: 3 additions & 2 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,13 @@ func (app *localClient) ApplySnapshotChunkSync(

func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
app.Callback(req, res)
return newLocalReqRes(req, res)
rr := newLocalReqRes(req, res)
rr.callbackInvoked = true
return rr
}

func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes {
reqRes := NewReqRes(req)
reqRes.Response = res
reqRes.SetDone()
return reqRes
}
69 changes: 69 additions & 0 deletions abci/client/socket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package abcicli_test

import (
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -118,3 +119,71 @@ func (slowApp) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginBlock
time.Sleep(200 * time.Millisecond)
return types.ResponseBeginBlock{}
}

// TestCallbackInvokedWhenSetLaet ensures that the callback is invoked when
// set after the client completes the call into the app. Currently this
// test relies on the callback being allowed to be invoked twice if set multiple
// times, once when set early and once when set late.
func TestCallbackInvokedWhenSetLate(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
app := blockedABCIApplication{
wg: wg,
}
_, c := setupClientServer(t, app)
reqRes := c.CheckTxAsync(types.RequestCheckTx{})

done := make(chan struct{})
cb := func(_ *types.Response) {
close(done)
}
reqRes.SetCallback(cb)
app.wg.Done()
<-done

var called bool
cb = func(_ *types.Response) {
called = true
}
reqRes.SetCallback(cb)
require.True(t, called)
}

type blockedABCIApplication struct {
wg *sync.WaitGroup
types.BaseApplication
}

func (b blockedABCIApplication) CheckTx(r types.RequestCheckTx) types.ResponseCheckTx {
b.wg.Wait()
return b.BaseApplication.CheckTx(r)
}

// TestCallbackInvokedWhenSetEarly ensures that the callback is invoked when
// set before the client completes the call into the app.
func TestCallbackInvokedWhenSetEarly(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
app := blockedABCIApplication{
wg: wg,
}
_, c := setupClientServer(t, app)
reqRes := c.CheckTxAsync(types.RequestCheckTx{})

done := make(chan struct{})
cb := func(_ *types.Response) {
close(done)
}
reqRes.SetCallback(cb)
app.wg.Done()

called := func() bool {
select {
case <-done:
return true
default:
return false
}
}
require.Eventually(t, called, time.Second, time.Millisecond*25)
}

0 comments on commit da6ec8f

Please sign in to comment.