Skip to content

Commit

Permalink
Merge pull request #174 from nspcc-dev/fix-mempool-oom-handling
Browse files Browse the repository at this point in the history
cmd: make mempool OOM error handler more adaptive
  • Loading branch information
AnnaShaleva committed May 16, 2024
2 parents b0d351e + 283efbe commit 1e13456
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 22 deletions.
25 changes: 16 additions & 9 deletions cmd/bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@ func main() {
defer cancel()

var (
workers int
rate int
msPerBlock int
threshold time.Duration
dump *internal.Dump
desc = v.GetString("desc")
timeLimit = v.GetDuration("timeLimit")
mode = internal.BenchMode(v.GetString("mode"))
client *internal.RPCClient
workers int
rate int
msPerBlock int
mempoolOOMDelay time.Duration
threshold time.Duration
dump *internal.Dump
desc = v.GetString("desc")
timeLimit = v.GetDuration("timeLimit")
mode = internal.BenchMode(v.GetString("mode"))
client *internal.RPCClient
)

switch mode {
Expand All @@ -61,6 +62,11 @@ func main() {
log.Fatalf("could not receive RPC Node version: %v", err)
}
msPerBlock = version.Protocol.MillisecondsPerBlock
if msPerBlock > 1000 {
mempoolOOMDelay = time.Duration(msPerBlock) * time.Millisecond / 50
} else {
mempoolOOMDelay = time.Duration(msPerBlock) * time.Millisecond / 10
}

reg := regexp.MustCompile(`[^\w.-]+`)
versionStr := strings.Trim(reg.ReplaceAllString(version.UserAgent, "_"), "_")
Expand Down Expand Up @@ -133,6 +139,7 @@ func main() {
internal.WorkerTimeLimit(timeLimit),
internal.WorkerThreshold(threshold),
internal.WorkerBlockchainClient(client),
internal.WorkerMempoolOOMDelay(mempoolOOMDelay),
internal.WorkerRPSReporter(rep.UpdateRPS),
internal.WorkerTPSReporter(rep.UpdateTPS),
internal.WorkerErrReporter(rep.UpdateErr),
Expand Down
35 changes: 22 additions & 13 deletions cmd/internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,19 @@ type (
}

doerParams struct {
wrkCount int
cli *RPCClient
mode BenchMode
rate int
threshold time.Duration
timeLimit time.Duration
dump *Dump
cntReporter func(cnt int32)
errReporter func(cnt int32)
rpsReporter func(rps float64)
tpsReporter func(deltaTime uint64, txCount int, tps float64)
stop context.CancelFunc
wrkCount int
cli *RPCClient
mode BenchMode
rate int
threshold time.Duration
timeLimit time.Duration
mempoolOOMDelay time.Duration
dump *Dump
cntReporter func(cnt int32)
errReporter func(cnt int32)
rpsReporter func(rps float64)
tpsReporter func(deltaTime uint64, txCount int, tps float64)
stop context.CancelFunc
}

// WorkerOption is an option type to configure workers.
Expand Down Expand Up @@ -75,6 +76,14 @@ func WorkerBlockchainClient(cli *RPCClient) WorkerOption {
}
}

// WorkerMempoolOOMDelay sets the time interval to pause sender's work after
// mempool OOM error occurred on tx submission.
func WorkerMempoolOOMDelay(delay time.Duration) WorkerOption {
return func(p *doerParams) {
p.mempoolOOMDelay = delay
}
}

// WorkerTimeLimit sets time limit to send requests.
func WorkerTimeLimit(limit time.Duration) WorkerOption {
return func(p *doerParams) {
Expand Down Expand Up @@ -244,7 +253,7 @@ loop:
log.Printf("failed to re-enqueue transaction: %s\n", err)
d.countErr.Add(1)
}
time.Sleep(100 * time.Millisecond)
time.Sleep(d.mempoolOOMDelay)
} else {
d.countErr.Add(1)
}
Expand Down

0 comments on commit 1e13456

Please sign in to comment.