From fb0e9209fe35c370dbe1a759cca8d77e04347ec0 Mon Sep 17 00:00:00 2001 From: Peter Miron Date: Mon, 14 Nov 2016 17:05:12 -0500 Subject: [PATCH 1/5] updated stan-bench.go to use the -s url specified on the command line. --- examples/stan-bench.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/examples/stan-bench.go b/examples/stan-bench.go index 763cfcb..375c7b6 100644 --- a/examples/stan-bench.go +++ b/examples/stan-bench.go @@ -7,7 +7,6 @@ import ( "fmt" "io/ioutil" "log" - "strings" "sync" "time" @@ -35,7 +34,7 @@ func usage() { var benchmark *bench.Benchmark func main() { - var urls = flag.String("s", nats.DefaultURL, "The NATS server URLs (separated by comma)") + var url = flag.String("s", nats.DefaultURL, "The NATS server URL") var tls = flag.Bool("tls", false, "Use TLS secure sonnection") var numPubs = flag.Int("np", DefaultNumPubs, "Number of concurrent publishers") var numSubs = flag.Int("ns", DefaultNumSubs, "Number of concurrent subscribers") @@ -58,10 +57,7 @@ func main() { // Setup the option block opts := nats.DefaultOptions - opts.Servers = strings.Split(*urls, ",") - for i, s := range opts.Servers { - opts.Servers[i] = strings.Trim(s, " ") - } + opts.Url = *url opts.Secure = *tls benchmark = bench.NewBenchmark("NATS Streaming", *numSubs, *numPubs) @@ -104,7 +100,7 @@ func main() { func runPublisher(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs int, msgSize int, async bool, pubID string, maxPubAcksInflight int) { - snc, err := stan.Connect("test-cluster", pubID, stan.MaxPubAcksInflight(maxPubAcksInflight)) + snc, err := stan.Connect("test-cluster", pubID, stan.MaxPubAcksInflight(maxPubAcksInflight), stan.NatsURL(opts.Url)) if err != nil { log.Fatalf("Publisher %s can't connect: %v\n", pubID, err) } @@ -152,7 +148,7 @@ func runPublisher(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs in } func runSubscriber(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs int, msgSize int, ignoreOld bool, subID string) { - snc, err := stan.Connect("test-cluster", subID) + snc, err := stan.Connect("test-cluster", subID, stan.NatsURL(opts.Url)) if err != nil { log.Fatalf("Subscriber %s can't connect: %v\n", subID, err) } From ba798735d7ade08d3c505875f8d40f7243a552e1 Mon Sep 17 00:00:00 2001 From: Peter Miron Date: Mon, 14 Nov 2016 18:56:17 -0500 Subject: [PATCH 2/5] revised per Wally and Ivan's feedback. --- examples/stan-bench.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/examples/stan-bench.go b/examples/stan-bench.go index 375c7b6..6e63bcb 100644 --- a/examples/stan-bench.go +++ b/examples/stan-bench.go @@ -13,6 +13,7 @@ import ( "github.com/nats-io/go-nats-streaming" "github.com/nats-io/nats" "github.com/nats-io/nats/bench" + "strings" ) // Some sane defaults @@ -34,7 +35,7 @@ func usage() { var benchmark *bench.Benchmark func main() { - var url = flag.String("s", nats.DefaultURL, "The NATS server URL") + var urls = flag.String("s", nats.DefaultURL, "The NATS server URL") var tls = flag.Bool("tls", false, "Use TLS secure sonnection") var numPubs = flag.Int("np", DefaultNumPubs, "Number of concurrent publishers") var numSubs = flag.Int("ns", DefaultNumSubs, "Number of concurrent subscribers") @@ -57,7 +58,11 @@ func main() { // Setup the option block opts := nats.DefaultOptions - opts.Url = *url + opts.Servers = strings.Split(*urls, ",") + for i, s := range opts.Servers { + opts.Servers[i] = strings.Trim(s, " ") + } + opts.Secure = *tls benchmark = bench.NewBenchmark("NATS Streaming", *numSubs, *numPubs) @@ -99,8 +104,11 @@ func main() { } func runPublisher(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs int, msgSize int, async bool, pubID string, maxPubAcksInflight int) { - - snc, err := stan.Connect("test-cluster", pubID, stan.MaxPubAcksInflight(maxPubAcksInflight), stan.NatsURL(opts.Url)) + nc, err := opts.Connect() + if err != nil { + log.Fatalf("Subscriber %s can't connect: %v\n", pubID, err) + } + snc, err := stan.Connect("test-cluster", pubID, stan.MaxPubAcksInflight(maxPubAcksInflight), stan.NatsConn(nc)) if err != nil { log.Fatalf("Publisher %s can't connect: %v\n", pubID, err) } @@ -148,7 +156,11 @@ func runPublisher(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs in } func runSubscriber(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs int, msgSize int, ignoreOld bool, subID string) { - snc, err := stan.Connect("test-cluster", subID, stan.NatsURL(opts.Url)) + nc, err := opts.Connect() + if err != nil { + log.Fatalf("Subscriber %s can't connect: %v\n", subID, err) + } + snc, err := stan.Connect("test-cluster", subID, stan.NatsConn(nc)) if err != nil { log.Fatalf("Subscriber %s can't connect: %v\n", subID, err) } From 3d0ab0b9d61da8922ba5752a0c178ceec7eaf020 Mon Sep 17 00:00:00 2001 From: Peter Miron Date: Mon, 14 Nov 2016 19:05:00 -0500 Subject: [PATCH 3/5] fixed error text on publisher. --- examples/stan-bench.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/stan-bench.go b/examples/stan-bench.go index 6e63bcb..598e56f 100644 --- a/examples/stan-bench.go +++ b/examples/stan-bench.go @@ -106,7 +106,7 @@ func main() { func runPublisher(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs int, msgSize int, async bool, pubID string, maxPubAcksInflight int) { nc, err := opts.Connect() if err != nil { - log.Fatalf("Subscriber %s can't connect: %v\n", pubID, err) + log.Fatalf("Publisher %s can't connect: %v\n", pubID, err) } snc, err := stan.Connect("test-cluster", pubID, stan.MaxPubAcksInflight(maxPubAcksInflight), stan.NatsConn(nc)) if err != nil { From f0a14ece3e815f4105bc32e8c42192b95cf26142 Mon Sep 17 00:00:00 2001 From: Peter Miron Date: Tue, 15 Nov 2016 11:55:10 -0500 Subject: [PATCH 4/5] added nc.Close() per Ivan's recommendation. --- examples/stan-bench.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/stan-bench.go b/examples/stan-bench.go index 598e56f..5d4b2d7 100644 --- a/examples/stan-bench.go +++ b/examples/stan-bench.go @@ -152,6 +152,7 @@ func runPublisher(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs in benchmark.AddPubSample(bench.NewSample(numMsgs, msgSize, start, time.Now(), snc.NatsConn())) snc.Close() + nc.Close() donewg.Done() } @@ -188,5 +189,6 @@ func runSubscriber(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs i <-ch benchmark.AddSubSample(bench.NewSample(numMsgs, msgSize, start, time.Now(), snc.NatsConn())) snc.Close() + nc.Close() donewg.Done() } From 3cb02cfee8555c894a1711820ac96a9919958ac3 Mon Sep 17 00:00:00 2001 From: Peter Miron Date: Tue, 15 Nov 2016 12:00:05 -0500 Subject: [PATCH 5/5] moved strings and corrected help text on URLs to original. --- examples/stan-bench.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/stan-bench.go b/examples/stan-bench.go index 5d4b2d7..30bdc66 100644 --- a/examples/stan-bench.go +++ b/examples/stan-bench.go @@ -7,13 +7,14 @@ import ( "fmt" "io/ioutil" "log" + "strings" "sync" "time" "github.com/nats-io/go-nats-streaming" "github.com/nats-io/nats" "github.com/nats-io/nats/bench" - "strings" + ) // Some sane defaults @@ -35,7 +36,7 @@ func usage() { var benchmark *bench.Benchmark func main() { - var urls = flag.String("s", nats.DefaultURL, "The NATS server URL") + var urls = flag.String("s", nats.DefaultURL, "The NATS server URLs (separated by comma") var tls = flag.Bool("tls", false, "Use TLS secure sonnection") var numPubs = flag.Int("np", DefaultNumPubs, "Number of concurrent publishers") var numSubs = flag.Int("ns", DefaultNumSubs, "Number of concurrent subscribers")