Skip to content

Commit

Permalink
bench: add workers into rate mode
Browse files Browse the repository at this point in the history
We need to run multiple workers for the rate mode and share the load
between them.

Close #170

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
  • Loading branch information
AliceInHunterland committed May 16, 2024
1 parent 1e13456 commit eace8ed
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
14 changes: 4 additions & 10 deletions cmd/bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {
defer cancel()

var (
workers int
workers = v.GetInt("workers")
rate int
msPerBlock int
mempoolOOMDelay time.Duration
Expand All @@ -45,18 +45,12 @@ func main() {
client *internal.RPCClient
)

switch mode {
case internal.ModeWorker:
workers = v.GetInt("workers")
client = internal.NewRPCClient(v, workers)

case internal.ModeRate:
workers = 1
if mode == internal.ModeRate {
rate = v.GetInt("rateLimit")
threshold = time.Duration(time.Second.Nanoseconds() / int64(rate))
client = internal.NewRPCClient(v, 1)
threshold = time.Duration(time.Second.Nanoseconds() / int64(rate) * int64(workers))
}

client = internal.NewRPCClient(v, workers)
version, err := client.GetVersion(ctx)
if err != nil {
log.Fatalf("could not receive RPC Node version: %v", err)
Expand Down
12 changes: 7 additions & 5 deletions cmd/internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func NewWorkers(opts ...WorkerOption) (Worker, error) {

switch p.mode {
case ModeRate:
log.Printf("Init worker with %d QPS / %s time limit (%d txs will try to send)", p.rate, p.timeLimit, ln)
log.Printf("Init %d workers with %d QPS / %s time limit (%d txs will try to send)", p.wrkCount, p.rate, p.timeLimit, ln)
case ModeWorker:
log.Printf("Init %d workers / %s time limit (%d txs will try to send)", p.wrkCount, p.timeLimit, ln)
}
Expand All @@ -220,8 +220,9 @@ func NewWorkers(opts ...WorkerOption) (Worker, error) {
// idx defines the order of the transaction being sent and can be more than overall transactions count, because retransmission is supported.
func (d *doer) worker(ctx context.Context, idx *atomic.Int64, start time.Time) {
var (
done = ctx.Done()
timer = time.NewTimer(d.timeLimit)
done = ctx.Done()
timer = time.NewTimer(d.timeLimit)
localTxCounter int64
)

defer func() {
Expand All @@ -237,7 +238,7 @@ loop:
case <-timer.C:
return
default:
i := idx.Add(1)
idx.Add(1)
if d.dump.TransactionsQueue.Len() == 0 {
return
}
Expand All @@ -264,10 +265,11 @@ loop:

since := time.Since(start)
count := d.countTxs.Add(1)
localTxCounter++
d.rpsReporter(float64(count) / since.Seconds())

if d.threshold > 0 {
waitFor := time.Until(start.Add(time.Duration(d.threshold.Nanoseconds() * (i + 1))))
waitFor := time.Until(start.Add(time.Duration(d.threshold.Nanoseconds() * (localTxCounter + 1))))
if waitFor > 0 {
time.Sleep(waitFor)
}
Expand Down
10 changes: 8 additions & 2 deletions runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ ARGS=(-i "/dump.txs")
FILES=()
MODE=""
COUNT=""
WORKERS_COUNT="20"
IR_TYPE=go
RPC_TYPE=
RPC_ADDR=()
Expand Down Expand Up @@ -150,7 +151,7 @@ while test $# -gt 0; do
-w)
test $# -gt 0 || fatal "workers count should be specified"
ARGS+=(-w "$1")
COUNT="$1"
WORKERS_COUNT="$1"
shift
;;

Expand Down Expand Up @@ -261,7 +262,12 @@ else
fatal "Invalid validator count: $NEOBENCH_VALIDATOR_COUNT"
fi

OUTPUT="/out/${OUTPUT}_${MODE}_${COUNT}.log"
if [ "rate" = "$MODE" ]; then
OUTPUT="/out/${OUTPUT}_${MODE}_${COUNT}_workers_${WORKERS_COUNT}.log"
else
OUTPUT="/out/${OUTPUT}_${MODE}_${COUNT}.log"
fi

if [ ${#RPC_ADDR[@]} -eq 0 ]; then
ARGS+=("${DEFAULT_RPC_ADDR[@]}")
else
Expand Down

0 comments on commit eace8ed

Please sign in to comment.