Skip to content

Commit

Permalink
Merge pull request #201 from nats-io/add_qgroup_to_stan_bench
Browse files Browse the repository at this point in the history
[ADDED] Ability to set a queue group for stan-bench
  • Loading branch information
kozlovic committed Sep 21, 2018
2 parents e54893d + 5e00194 commit 12cecea
Showing 1 changed file with 39 additions and 9 deletions.
48 changes: 39 additions & 9 deletions examples/stan-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"log"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/nats-io/go-nats"
Expand All @@ -40,10 +41,14 @@ const (
)

func usage() {
log.Fatalf("Usage: stan-bench [-s server (%s)] [-tls] [-c CLUSTER_ID] [-id CLIENT_ID] [-np NUM_PUBLISHERS] [-ns NUM_SUBSCRIBERS] [-n NUM_MSGS] [-ms MESSAGE_SIZE] [-csv csvfile] [-mpa MAX_NUMBER_OF_PUBLISHED_ACKS_INFLIGHT] [-io] [-a] <subject>\n", nats.DefaultURL)
log.Fatalf("Usage: stan-bench [-s server (%s)] [-tls] [-c CLUSTER_ID] [-id CLIENT_ID] [-qgroup QUEUE_GROUP_NAME] [-np NUM_PUBLISHERS] [-ns NUM_SUBSCRIBERS] [-n NUM_MSGS] [-ms MESSAGE_SIZE] [-csv csvfile] [-mpa MAX_NUMBER_OF_PUBLISHED_ACKS_INFLIGHT] [-io] [-a] <subject>\n", nats.DefaultURL)
}

var benchmark *bench.Benchmark
var (
benchmark *bench.Benchmark
qTotalRecv int32
qSubsLeft int32
)

func main() {
var clusterID string
Expand All @@ -61,6 +66,7 @@ func main() {
var maxPubAcks = flag.Int("mpa", DefaultMaxPubAcksInflight, "Max number of published acks in flight")
var clientID = flag.String("id", DefaultClientID, "Benchmark process base client ID")
var csvFile = flag.String("csv", "", "Save bench data to csv file")
var queue = flag.String("qgroup", "", "Queue group name")

log.SetFlags(0)
flag.Usage = usage
Expand All @@ -87,11 +93,14 @@ func main() {

donewg.Add(*numPubs + *numSubs)

if *queue != "" {
qSubsLeft = int32(*numSubs)
}
// Run Subscribers first
startwg.Add(*numSubs)
for i := 0; i < *numSubs; i++ {
subID := fmt.Sprintf("%s-sub-%d", *clientID, i)
go runSubscriber(&startwg, &donewg, opts, clusterID, *numMsgs, *messageSize, *ignoreOld, subID)
go runSubscriber(&startwg, &donewg, opts, clusterID, subID, *queue, *numMsgs, *messageSize, *ignoreOld)
}
startwg.Wait()

Expand Down Expand Up @@ -177,7 +186,7 @@ func runPublisher(startwg, donewg *sync.WaitGroup, opts nats.Options, clusterID
donewg.Done()
}

func runSubscriber(startwg, donewg *sync.WaitGroup, opts nats.Options, clusterID string, numMsgs int, msgSize int, ignoreOld bool, subID string) {
func runSubscriber(startwg, donewg *sync.WaitGroup, opts nats.Options, clusterID, subID, queue string, numMsgs, msgSize int, ignoreOld bool) {
nc, err := opts.Connect()
if err != nil {
log.Fatalf("Subscriber %s can't connect: %v\n", subID, err)
Expand All @@ -194,27 +203,48 @@ func runSubscriber(startwg, donewg *sync.WaitGroup, opts nats.Options, clusterID
subj := args[0]
ch := make(chan time.Time, 2)

isQueue := queue != ""
received := 0
mcb := func(msg *stan.Msg) {
received++
if received == 1 {
ch <- time.Now()
}
if received >= numMsgs {
ch <- time.Now()
if isQueue {
if atomic.AddInt32(&qTotalRecv, 1) >= int32(numMsgs) {
ch <- time.Now()
}
} else {
if received >= numMsgs {
ch <- time.Now()
}
}
}

var sub stan.Subscription
if ignoreOld {
snc.Subscribe(subj, mcb)
sub, err = snc.QueueSubscribe(subj, queue, mcb)
} else {
snc.Subscribe(subj, mcb, stan.DeliverAllAvailable())
sub, err = snc.QueueSubscribe(subj, queue, mcb, stan.DeliverAllAvailable())
}
if err != nil {
log.Fatalf("Subscriber %s can't subscribe: %v", subID, err)
}
startwg.Done()

start := <-ch
end := <-ch
benchmark.AddSubSample(bench.NewSample(numMsgs, msgSize, start, end, snc.NatsConn()))
benchmark.AddSubSample(bench.NewSample(received, msgSize, start, end, snc.NatsConn()))
// For queues, since not each member receives the total number of messages,
// when a member is done, it needs to publish a message to unblock other member(s).
if isQueue {
if sr := atomic.AddInt32(&qSubsLeft, -1); sr > 0 {
// Close this queue member first so that there is no chance that the
// server sends the message we are going to publish back to this member.
sub.Close()
snc.Publish(subj, []byte("done"))
}
}
snc.Close()
nc.Close()
donewg.Done()
Expand Down

0 comments on commit 12cecea

Please sign in to comment.