Skip to content

Commit

Permalink
Updates to the benchmark
Browse files Browse the repository at this point in the history
- minor output fix
- Add ID parameter to run muliple instances simultaneously.
  • Loading branch information
Colin Sullivan committed Jul 1, 2016
1 parent 2da181f commit a7d4600
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions examples/stan-bench.go
Expand Up @@ -26,10 +26,11 @@ const (
DefaultMessageSize = -1
DefaultIgnoreOld = false
DefaultMaxPubAcksInflight = 1000
DefaultClientID = "benchmark"
)

func usage() {
log.Fatalf("Usage: nats-bench [-s server (%s)] [--tls] [-np NUM_PUBLISHERS] [-ns NUM_SUBSCRIBERS] [-n NUM_MSGS] [-ms MESSAGE_SIZE] [-io] [-a] <subject>n", nats.DefaultURL)
log.Fatalf("Usage: nats-bench [-s server (%s)] [--tls] [-id CLIENT_ID] [-np NUM_PUBLISHERS] [-ns NUM_SUBSCRIBERS] [-n NUM_MSGS] [-ms MESSAGE_SIZE] [-io] [-a] <subject>\n", nats.DefaultURL)
}

var subStatChan chan *stats
Expand All @@ -45,6 +46,7 @@ func main() {
var messageSize = flag.Int("ms", DefaultMessageSize, "Message Size in bytes.")
var ignoreOld = flag.Bool("io", DefaultIgnoreOld, "Subscribers Ignore Old Messages")
var maxPubAcks = flag.Int("mpa", DefaultMaxPubAcksInflight, "Max number of published acks in flight")
var clientID = flag.String("id", DefaultClientID, "Benchmark process base client ID.")

log.SetFlags(0)
flag.Usage = usage
Expand Down Expand Up @@ -74,14 +76,16 @@ func main() {
// Run Subscribers first
startwg.Add(*numSubs)
for i := 0; i < *numSubs; i++ {
go runSubscriber(&startwg, &donewg, opts, *numMsgs, *ignoreOld, i)
subID := fmt.Sprintf("%s-sub-%d", *clientID, i)
go runSubscriber(&startwg, &donewg, opts, *numMsgs, *ignoreOld, subID)
}
startwg.Wait()

// Now Publishers
startwg.Add(*numPubs)
for i := 0; i < *numPubs; i++ {
go runPublisher(&startwg, &donewg, opts, *numMsgs, *messageSize, *async, i, *maxPubAcks)
pubID := fmt.Sprintf("%s-pub-%d", *clientID, i)
go runPublisher(&startwg, &donewg, opts, *numMsgs, *messageSize, *async, pubID, *maxPubAcks)
}

log.Printf("Starting benchmark [msgs=%d, pubs=%d, subs=%d]\n", *numMsgs, *numPubs, *numSubs)
Expand Down Expand Up @@ -137,17 +141,15 @@ func setMaxPubAcksInflight(val int) stan.Option {
}
}

func runPublisher(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs int, messageSize int, async bool, pubID int, maxPubAcksInFlight int) {
func runPublisher(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs int, messageSize int, async bool, pubID string, maxPubAcksInFlight int) {

nc, err := opts.Connect()
if err != nil {
log.Fatalf("Can't connect: %v\n", err)
}

publisher := fmt.Sprintf("benchmark-client-publisher%d", pubID)
snc, err := stan.Connect("test-cluster", publisher, setMaxPubAcksInflight(maxPubAcksInFlight), stan.NatsConn(nc))
snc, err := stan.Connect("test-cluster", pubID, setMaxPubAcksInflight(maxPubAcksInFlight), stan.NatsConn(nc))
if err != nil {
log.Fatalf("Publisher %s can't connect: %v\n", publisher, err)
log.Fatalf("Publisher %s can't connect: %v\n", pubID, err)
}

startwg.Done()
Expand Down Expand Up @@ -194,16 +196,14 @@ func runPublisher(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs in
donewg.Done()
}

func runSubscriber(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs int, ignoreOld bool, subID int) {
func runSubscriber(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs int, ignoreOld bool, subID string) {
nc, err := opts.Connect()
if err != nil {
log.Fatalf("Can't connect: %v\n", err)
}

subscriber := fmt.Sprintf("benchmark-client-subscriber%d", subID)
snc, err := stan.Connect("test-cluster", subscriber, stan.NatsConn(nc))
snc, err := stan.Connect("test-cluster", subID, stan.NatsConn(nc))
if err != nil {
log.Fatalf("Subscriber %s can't connect: %v\n", subscriber, err)
log.Fatalf("Subscriber %s can't connect: %v\n", subID, err)
}

args := flag.Args()
Expand Down

0 comments on commit a7d4600

Please sign in to comment.