Permalink
Browse files

nsqd: multi-topic v2 sub benchmarks

  • Loading branch information...
1 parent 2d05a1a commit fdf60c319bea38454437713d8bf212862f5dc6c1 @mreiferson mreiferson committed Jan 5, 2013
Showing with 53 additions and 4 deletions.
  1. +53 −4 nsqd/protocol_v2_test.go
View
@@ -387,10 +387,10 @@ func benchmarkProtocolV2Sub(b *testing.B, size int) {
workers := runtime.GOMAXPROCS(0)
for j := 0; j < workers; j++ {
wg.Add(1)
- go func(id int) {
- subWorker(b.N, workers, tcpAddr, topicName, rdyChan, goChan, id)
+ go func() {
+ subWorker(b.N, workers, tcpAddr, topicName, rdyChan, goChan)
wg.Done()
- }(j)
+ }()
<-rdyChan
}
b.StartTimer()
@@ -402,7 +402,7 @@ func benchmarkProtocolV2Sub(b *testing.B, size int) {
nsqd.Exit()
}
-func subWorker(n int, workers int, tcpAddr *net.TCPAddr, topicName string, rdyChan chan int, goChan chan int, id int) {
+func subWorker(n int, workers int, tcpAddr *net.TCPAddr, topicName string, rdyChan chan int, goChan chan int) {
conn, err := mustConnectNSQd(tcpAddr)
if err != nil {
panic(err.Error())
@@ -462,3 +462,52 @@ func BenchmarkProtocolV2Sub128k(b *testing.B) { benchmarkProtocolV2Sub(b, 128*10
func BenchmarkProtocolV2Sub256k(b *testing.B) { benchmarkProtocolV2Sub(b, 256*1024) }
func BenchmarkProtocolV2Sub512k(b *testing.B) { benchmarkProtocolV2Sub(b, 512*1024) }
func BenchmarkProtocolV2Sub1m(b *testing.B) { benchmarkProtocolV2Sub(b, 1024*1024) }
+
+func benchmarkProtocolV2MultiSub(b *testing.B, num int) {
+ var wg sync.WaitGroup
+ b.StopTimer()
+
+ log.SetOutput(ioutil.Discard)
+ log.SetOutput(os.Stdout)
+
+ options := NewNsqdOptions()
+ options.memQueueSize = int64(b.N)
+ tcpAddr, _ := mustStartNSQd(options)
+ msg := make([]byte, 256)
+ b.SetBytes(int64(len(msg) * num))
+
+ goChan := make(chan int)
+ rdyChan := make(chan int)
+ workers := runtime.GOMAXPROCS(0)
+ for i := 0; i < num; i++ {
+ topicName := "bench_v2" + strconv.Itoa(b.N) + "_" + strconv.Itoa(i) + "_" + strconv.Itoa(int(time.Now().Unix()))
+ topic := nsqd.GetTopic(topicName)
+ for i := 0; i < b.N; i++ {
+ msg := nsq.NewMessage(<-nsqd.idChan, msg)
+ topic.PutMessage(msg)
+ }
+ topic.GetChannel("ch")
+
+ for j := 0; j < workers; j++ {
+ wg.Add(1)
+ go func() {
+ subWorker(b.N, workers, tcpAddr, topicName, rdyChan, goChan)
+ wg.Done()
+ }()
+ <-rdyChan
+ }
+ }
+ b.StartTimer()
+
+ close(goChan)
+ wg.Wait()
+
+ b.StopTimer()
+ nsqd.Exit()
+}
+
+func BenchmarkProtocolV2MultiSub1(b *testing.B) { benchmarkProtocolV2MultiSub(b, 1) }
+func BenchmarkProtocolV2MultiSub2(b *testing.B) { benchmarkProtocolV2MultiSub(b, 2) }
+func BenchmarkProtocolV2MultiSub4(b *testing.B) { benchmarkProtocolV2MultiSub(b, 4) }
+func BenchmarkProtocolV2MultiSub8(b *testing.B) { benchmarkProtocolV2MultiSub(b, 8) }
+func BenchmarkProtocolV2MultiSub16(b *testing.B) { benchmarkProtocolV2MultiSub(b, 16) }

0 comments on commit fdf60c3

Please sign in to comment.