Skip to content

Commit

Permalink
rpc: avoid leaking threads during checktx (backport #8328) (#8333)
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed Apr 17, 2022
1 parent 226bc94 commit 04c1f76
Showing 1 changed file with 72 additions and 54 deletions.
126 changes: 72 additions & 54 deletions internal/rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,32 @@ func (env *Environment) BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*co
err := env.Mempool.CheckTx(
ctx.Context(),
tx,
func(res *abci.Response) { resCh <- res },
func(res *abci.Response) {
select {
case <-ctx.Context().Done():
case resCh <- res:
}
},
mempool.TxInfo{},
)
if err != nil {
return nil, err
}

res := <-resCh
r := res.GetCheckTx()

return &coretypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
Codespace: r.Codespace,
MempoolError: r.MempoolError,
Hash: tx.Hash(),
}, nil
select {
case <-ctx.Context().Done():
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err())
case res := <-resCh:
r := res.GetCheckTx()
return &coretypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
Codespace: r.Codespace,
MempoolError: r.MempoolError,
Hash: tx.Hash(),
}, nil
}
}

// BroadcastTxCommit returns with the responses from CheckTx and DeliverTx.
Expand All @@ -64,61 +72,71 @@ func (env *Environment) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*
err := env.Mempool.CheckTx(
ctx.Context(),
tx,
func(res *abci.Response) { resCh <- res },
func(res *abci.Response) {
select {
case <-ctx.Context().Done():
case resCh <- res:
}
},
mempool.TxInfo{},
)
if err != nil {
return nil, err
}

r := (<-resCh).GetCheckTx()
if r.Code != abci.CodeTypeOK {
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: tx.Hash(),
}, fmt.Errorf("transaction encountered error (%s)", r.MempoolError)
}

if !indexer.KVSinkEnabled(env.EventSinks) {
return &coretypes.ResultBroadcastTxCommit{
select {
case <-ctx.Context().Done():
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err())
case res := <-resCh:
r := res.GetCheckTx()
if r.Code != abci.CodeTypeOK {
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: tx.Hash(),
},
errors.New("cannot wait for commit because kvEventSync is not enabled")
}
}, fmt.Errorf("transaction encountered error (%s)", r.MempoolError)
}

startAt := time.Now()
timer := time.NewTimer(0)
defer timer.Stop()

count := 0
for {
count++
select {
case <-ctx.Context().Done():
env.Logger.Error("Error on broadcastTxCommit",
"duration", time.Since(startAt),
"err", err)
if !indexer.KVSinkEnabled(env.EventSinks) {
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: tx.Hash(),
}, fmt.Errorf("timeout waiting for commit of tx %s (%s)",
tx.Hash(), time.Since(startAt))
case <-timer.C:
txres, err := env.Tx(ctx, tx.Hash(), false)
if err != nil {
jitter := 100*time.Millisecond + time.Duration(rand.Int63n(int64(time.Second))) // nolint: gosec
backoff := 100 * time.Duration(count) * time.Millisecond
timer.Reset(jitter + backoff)
continue
}
},
errors.New("cannot confirm transaction because kvEventSink is not enabled")
}

return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
DeliverTx: txres.TxResult,
Hash: tx.Hash(),
Height: txres.Height,
}, nil
startAt := time.Now()
timer := time.NewTimer(0)
defer timer.Stop()

count := 0
for {
count++
select {
case <-ctx.Context().Done():
env.Logger.Error("error on broadcastTxCommit",
"duration", time.Since(startAt),
"err", err)
return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
Hash: tx.Hash(),
}, fmt.Errorf("timeout waiting for commit of tx %s (%s)",
tx.Hash(), time.Since(startAt))
case <-timer.C:
txres, err := env.Tx(ctx, tx.Hash(), false)
if err != nil {
jitter := 100*time.Millisecond + time.Duration(rand.Int63n(int64(time.Second))) // nolint: gosec
backoff := 100 * time.Duration(count) * time.Millisecond
timer.Reset(jitter + backoff)
continue
}

return &coretypes.ResultBroadcastTxCommit{
CheckTx: *r,
DeliverTx: txres.TxResult,
Hash: tx.Hash(),
Height: txres.Height,
}, nil
}
}
}
}
Expand Down

0 comments on commit 04c1f76

Please sign in to comment.