Skip to content

Commit

Permalink
Allow benchmark clients to make N calls at once
Browse files Browse the repository at this point in the history
This reduces the overhead of waiting for the relay to send "call"
commands, reading them etc.
  • Loading branch information
prashantv committed May 6, 2016
1 parent 0f1c8c4 commit 8c4b4f0
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 93 deletions.
76 changes: 24 additions & 52 deletions benchmark/benchclient/main.go
Expand Up @@ -23,19 +23,15 @@ package main

import (
"bufio"
"bytes"
"flag"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"

"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/benchmark"
"github.com/uber/tchannel-go/raw"
"github.com/uber/tchannel-go/testutils"
"github.com/uber/tchannel-go/thrift"
gen "github.com/uber/tchannel-go/thrift/gen-go/test"
)

var (
Expand All @@ -55,74 +51,50 @@ func main() {

rdr := bufio.NewScanner(os.Stdin)
for rdr.Scan() {
var (
d time.Duration
err error
)
line := rdr.Text()
parts := strings.Split(line, " ")
var n int
var err error
if len(parts) >= 2 {
n, err = strconv.Atoi(parts[1])
if err != nil {
log.Fatalf("unrecognized number %q: %v", parts[1], err)
}
}

switch line := rdr.Text(); line {
switch cmd := parts[0]; cmd {
case "warmup":
if err := client.Warmup(); err != nil {
log.Fatalf("warmup failed: %v", err)
}
fmt.Println("success")
continue
case "rcall":
d, err = client.RawCall()
makeCalls(n, client.RawCall)
case "tcall":
d, err = client.ThriftCall()
makeCalls(n, client.ThriftCall)
case "quit":
return
default:
log.Fatalf("unrecognized command: %v", line)
}

if err != nil {
log.Printf("Call failed: %v", err)
continue
}
fmt.Println(d)
}

if err := rdr.Err(); err != nil {
log.Fatalf("Reader failed: %v", err)
}
}

func makeRawCall(ch *tchannel.Channel) {
ctx, cancel := tchannel.NewContext(*timeout)
defer cancel()

arg := testutils.RandBytes(*requestSize)
started := time.Now()

sc := ch.GetSubChannel(*serviceName)
rArg2, rArg3, _, err := raw.CallSC(ctx, sc, "echo", arg, arg)
if err != nil {
fmt.Println("failed:", err)
return
}
duration := time.Since(started)
if !bytes.Equal(rArg2, arg) || !bytes.Equal(rArg3, arg) {
log.Fatalf("Echo gave different string!")
}
fmt.Println(duration)
}

func makeCall(client gen.TChanSecondService) {
ctx, cancel := thrift.NewContext(*timeout)
defer cancel()

arg := testutils.RandString(*requestSize)
started := time.Now()
res, err := client.Echo(ctx, arg)
func makeCalls(n int, f func(n int) ([]time.Duration, error)) {
durations, err := f(n)
if err != nil {
fmt.Println("failed:", err)
return
log.Fatalf("Call failed: %v", err)
}
duration := time.Since(started)
if res != arg {
log.Fatalf("Echo gave different string!")
for i, d := range durations {
if i > 0 {
fmt.Printf(" ")
}
fmt.Printf("%v", d)
}
fmt.Println(duration)
fmt.Println()
}
26 changes: 19 additions & 7 deletions benchmark/external_client.go
Expand Up @@ -60,18 +60,30 @@ func (c *externalClient) Warmup() error {
return nil
}

func (c *externalClient) callAndParse(cmd string) (time.Duration, error) {
func (c *externalClient) callAndParse(cmd string) ([]time.Duration, error) {
out, err := c.writeAndRead(cmd)
if err != nil {
return 0, err
return nil, err
}
return time.ParseDuration(out)

durationStrs := strings.Split(out, " ")
durations := make([]time.Duration, len(durationStrs))
for i, s := range durationStrs {
d, err := time.ParseDuration(s)
if err != nil {
return nil, fmt.Errorf("calls failed: %v", out)
}

durations[i] = d
}

return durations, nil
}

func (c *externalClient) RawCall() (time.Duration, error) {
return c.callAndParse("rcall")
func (c *externalClient) RawCall(n int) ([]time.Duration, error) {
return c.callAndParse(fmt.Sprintf("rcall %v", n))
}

func (c *externalClient) ThriftCall() (time.Duration, error) {
return c.callAndParse("tcall")
func (c *externalClient) ThriftCall(n int) ([]time.Duration, error) {
return c.callAndParse(fmt.Sprintf("tcall %v", n))
}
4 changes: 2 additions & 2 deletions benchmark/interfaces.go
Expand Up @@ -31,10 +31,10 @@ type Client interface {
Warmup() error

// RawCall makes an echo call using raw.
RawCall() (time.Duration, error)
RawCall(n int) ([]time.Duration, error)

// ThriftCall makes an echo call using thrift.
ThriftCall() (time.Duration, error)
ThriftCall(n int) ([]time.Duration, error)

// Close closes the benchmark client.
Close()
Expand Down
69 changes: 43 additions & 26 deletions benchmark/internal_client.go
Expand Up @@ -82,38 +82,55 @@ func (c *internalClient) Warmup() error {
return nil
}

func (c *internalClient) RawCall() (time.Duration, error) {
ctx, cancel := tchannel.NewContext(c.opts.timeout)
defer cancel()
func (c *internalClient) makeCalls(n int, f func() (time.Duration, error)) ([]time.Duration, error) {
latencies := make([]time.Duration, n)
for i := range latencies {
var err error
latencies[i], err = f()
if err != nil {
return nil, err
}
}
return latencies, nil
}

func (c *internalClient) RawCall(n int) ([]time.Duration, error) {
return c.makeCalls(n, func() (time.Duration, error) {
ctx, cancel := tchannel.NewContext(c.opts.timeout)
defer cancel()

started := time.Now()
rArg2, rArg3, _, err := raw.CallSC(ctx, c.sc, "echo", c.argBytes, c.argBytes)
duration := time.Since(started)
started := time.Now()
rArg2, rArg3, _, err := raw.CallSC(ctx, c.sc, "echo", c.argBytes, c.argBytes)
duration := time.Since(started)

if err != nil {
return 0, err
}
if !bytes.Equal(rArg2, c.argBytes) || !bytes.Equal(rArg3, c.argBytes) {
panic("echo call returned wrong results")
}
return duration, nil
if err != nil {
return 0, err
}
if !bytes.Equal(rArg2, c.argBytes) || !bytes.Equal(rArg3, c.argBytes) {
panic("echo call returned wrong results")
}
return duration, nil

})
}

func (c *internalClient) ThriftCall() (time.Duration, error) {
ctx, cancel := thrift.NewContext(c.opts.timeout)
defer cancel()
func (c *internalClient) ThriftCall(n int) ([]time.Duration, error) {
return c.makeCalls(n, func() (time.Duration, error) {
ctx, cancel := thrift.NewContext(c.opts.timeout)
defer cancel()

started := time.Now()
res, err := c.tClient.Echo(ctx, c.argStr)
duration := time.Since(started)
started := time.Now()
res, err := c.tClient.Echo(ctx, c.argStr)
duration := time.Since(started)

if err != nil {
return 0, err
}
if res != c.argStr {
panic("thrift Echo returned wrong result")
}
return duration, nil
if err != nil {
return 0, err
}
if res != c.argStr {
panic("thrift Echo returned wrong result")
}
return duration, nil
})
}

func (c *internalClient) Close() {
Expand Down
24 changes: 18 additions & 6 deletions thrift/thrift_bench_test.go
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/stretchr/testify/require"
)

const callBatch = 100

var (
useHyperbahn = flag.Bool("useHyperbahn", false, "Whether to advertise and route requests through Hyperbahn")
hyperbahnNodes = flag.String("hyperbahn-nodes", "127.0.0.1:21300,127.0.0.1:21301", "Comma-separated list of Hyperbahn nodes")
Expand All @@ -51,9 +53,14 @@ func BenchmarkBothSerial(b *testing.B) {
)

b.ResetTimer()
for i := 0; i < b.N; i++ {

if _, err := client.ThriftCall(); err != nil {
for i := 0; i < b.N; i += callBatch {
calls := callBatch
if i+calls > b.N {
calls = b.N - i
}

if _, err := client.ThriftCall(calls); err != nil {
b.Errorf("Call failed: %v", err)
}
}
Expand All @@ -71,8 +78,13 @@ func BenchmarkInboundSerial(b *testing.B) {
require.NoError(b, client.Warmup(), "Warmup failed")

b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := client.ThriftCall(); err != nil {
for i := 0; i < b.N; i += callBatch {
calls := callBatch
if i+calls > b.N {
calls = b.N - i
}

if _, err := client.ThriftCall(calls); err != nil {
b.Errorf("Call failed: %v", err)
}
}
Expand All @@ -95,10 +107,10 @@ func BenchmarkInboundParallel(b *testing.B) {
require.NoError(b, client.Warmup(), "Warmup failed")

for pb.Next() {
if _, err := client.ThriftCall(); err != nil {
if _, err := client.ThriftCall(100); err != nil {
b.Errorf("Call failed: %v", err)
}
reqCounter.Inc()
reqCounter.Add(100)
}
})

Expand Down

0 comments on commit 8c4b4f0

Please sign in to comment.