diff --git a/examples/stan-bench/main.go b/examples/stan-bench/main.go index 6e659eb..527f4f0 100644 --- a/examples/stan-bench/main.go +++ b/examples/stan-bench/main.go @@ -20,6 +20,7 @@ import ( "log" "strings" "sync" + "sync/atomic" "time" "github.com/nats-io/go-nats" @@ -40,10 +41,14 @@ const ( ) func usage() { - log.Fatalf("Usage: stan-bench [-s server (%s)] [-tls] [-c CLUSTER_ID] [-id CLIENT_ID] [-np NUM_PUBLISHERS] [-ns NUM_SUBSCRIBERS] [-n NUM_MSGS] [-ms MESSAGE_SIZE] [-csv csvfile] [-mpa MAX_NUMBER_OF_PUBLISHED_ACKS_INFLIGHT] [-io] [-a] \n", nats.DefaultURL) + log.Fatalf("Usage: stan-bench [-s server (%s)] [-tls] [-c CLUSTER_ID] [-id CLIENT_ID] [-qgroup QUEUE_GROUP_NAME] [-np NUM_PUBLISHERS] [-ns NUM_SUBSCRIBERS] [-n NUM_MSGS] [-ms MESSAGE_SIZE] [-csv csvfile] [-mpa MAX_NUMBER_OF_PUBLISHED_ACKS_INFLIGHT] [-io] [-a] \n", nats.DefaultURL) } -var benchmark *bench.Benchmark +var ( + benchmark *bench.Benchmark + qTotalRecv int32 + qSubsLeft int32 +) func main() { var clusterID string @@ -61,6 +66,7 @@ func main() { var maxPubAcks = flag.Int("mpa", DefaultMaxPubAcksInflight, "Max number of published acks in flight") var clientID = flag.String("id", DefaultClientID, "Benchmark process base client ID") var csvFile = flag.String("csv", "", "Save bench data to csv file") + var queue = flag.String("qgroup", "", "Queue group name") log.SetFlags(0) flag.Usage = usage @@ -87,11 +93,14 @@ func main() { donewg.Add(*numPubs + *numSubs) + if *queue != "" { + qSubsLeft = int32(*numSubs) + } // Run Subscribers first startwg.Add(*numSubs) for i := 0; i < *numSubs; i++ { subID := fmt.Sprintf("%s-sub-%d", *clientID, i) - go runSubscriber(&startwg, &donewg, opts, clusterID, *numMsgs, *messageSize, *ignoreOld, subID) + go runSubscriber(&startwg, &donewg, opts, clusterID, subID, *queue, *numMsgs, *messageSize, *ignoreOld) } startwg.Wait() @@ -177,7 +186,7 @@ func runPublisher(startwg, donewg *sync.WaitGroup, opts nats.Options, clusterID donewg.Done() } -func runSubscriber(startwg, donewg *sync.WaitGroup, opts nats.Options, clusterID string, numMsgs int, msgSize int, ignoreOld bool, subID string) { +func runSubscriber(startwg, donewg *sync.WaitGroup, opts nats.Options, clusterID, subID, queue string, numMsgs, msgSize int, ignoreOld bool) { nc, err := opts.Connect() if err != nil { log.Fatalf("Subscriber %s can't connect: %v\n", subID, err) @@ -194,27 +203,48 @@ func runSubscriber(startwg, donewg *sync.WaitGroup, opts nats.Options, clusterID subj := args[0] ch := make(chan time.Time, 2) + isQueue := queue != "" received := 0 mcb := func(msg *stan.Msg) { received++ if received == 1 { ch <- time.Now() } - if received >= numMsgs { - ch <- time.Now() + if isQueue { + if atomic.AddInt32(&qTotalRecv, 1) >= int32(numMsgs) { + ch <- time.Now() + } + } else { + if received >= numMsgs { + ch <- time.Now() + } } } + var sub stan.Subscription if ignoreOld { - snc.Subscribe(subj, mcb) + sub, err = snc.QueueSubscribe(subj, queue, mcb) } else { - snc.Subscribe(subj, mcb, stan.DeliverAllAvailable()) + sub, err = snc.QueueSubscribe(subj, queue, mcb, stan.DeliverAllAvailable()) + } + if err != nil { + log.Fatalf("Subscriber %s can't subscribe: %v", subID, err) } startwg.Done() start := <-ch end := <-ch - benchmark.AddSubSample(bench.NewSample(numMsgs, msgSize, start, end, snc.NatsConn())) + benchmark.AddSubSample(bench.NewSample(received, msgSize, start, end, snc.NatsConn())) + // For queues, since not each member receives the total number of messages, + // when a member is done, it needs to publish a message to unblock other member(s). + if isQueue { + if sr := atomic.AddInt32(&qSubsLeft, -1); sr > 0 { + // Close this queue member first so that there is no chance that the + // server sends the message we are going to publish back to this member. + sub.Close() + snc.Publish(subj, []byte("done")) + } + } snc.Close() nc.Close() donewg.Done()