Permalink
Browse files

examples: add benchmark reader/writer

  • Loading branch information...
1 parent 762dd84 commit 857aa94263126af35d38c71e340f4dab7c4b838b @mreiferson mreiferson committed Oct 31, 2012
Showing with 173 additions and 0 deletions.
  1. +2 −0 .gitignore
  2. +91 −0 examples/bench_reader/bench_reader.go
  3. +80 −0 examples/bench_writer/bench_writer.go
View
@@ -4,6 +4,8 @@ nsqlookupd/nsqlookupd
nsqreader/nsqreader
nsqstatsd/nsqstatsd
nsqadmin/nsqadmin
+examples/bench_reader/bench_reader
+examples/bench_writer/bench_writer
examples/nsq_to_file/nsq_to_file
examples/nsq_pubsub/nsq_pubsub
examples/nsq_to_http/nsq_to_http
@@ -0,0 +1,91 @@
+package main
+
+import (
+ "../../nsq"
+ "bufio"
+ "flag"
+ "log"
+ "math"
+ "net"
+ "runtime"
+ "sync"
+ "time"
+)
+
+var (
+ num = flag.Int("num", 1000000, "num messages")
+ tcpAddress = flag.String("nsqd-tcp-address", "127.0.0.1:4150", "<addr>:<port> to connect to nsqd")
+ topic = flag.String("topic", "sub_bench", "topic to receive messages on")
+ channel = flag.String("channel", "ch", "channel to receive messages on")
+)
+
+func main() {
+ flag.Parse()
+ var wg sync.WaitGroup
+
+ goChan := make(chan int)
+ rdyChan := make(chan int)
+ workers := runtime.GOMAXPROCS(0)
+ for j := 0; j < workers; j++ {
+ wg.Add(1)
+ go func(id int) {
+ subWorker(*num, workers, *tcpAddress, *topic, *channel, rdyChan, goChan, id)
+ wg.Done()
+ }(j)
+ <-rdyChan
+ }
+
+ start := time.Now()
+ close(goChan)
+ wg.Wait()
+ end := time.Now()
+ duration := end.Sub(start)
+ log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op",
+ duration,
+ float64(*num * 200) / duration.Seconds() / 1024 / 1024,
+ float64(*num) / duration.Seconds(),
+ float64(duration / time.Microsecond) / float64(*num))
+}
+
+func subWorker(n int, workers int, tcpAddr string, topic string, channel string, rdyChan chan int, goChan chan int, id int) {
+ conn, err := net.DialTimeout("tcp", tcpAddr, time.Second)
+ if err != nil {
+ panic(err.Error())
+ }
+ conn.Write(nsq.MagicV2)
+ rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
+ nsq.Subscribe(topic, channel, "test", "test").Write(rw)
+ rdyCount := int(math.Min(math.Max(float64(n/workers), 1), 2500))
+ rdyChan <- 1
+ <-goChan
+ nsq.Ready(rdyCount).Write(rw)
+ rw.Flush()
+ num := n / workers
+ numRdy := num/rdyCount - 1
+ rdy := rdyCount
+ for i := 0; i < num; i += 1 {
+ resp, err := nsq.ReadResponse(rw)
+ if err != nil {
+ panic(err.Error())
+ }
+ frameType, data, err := nsq.UnpackResponse(resp)
+ if err != nil {
+ panic(err.Error())
+ }
+ if frameType == nsq.FrameTypeError {
+ panic("got something else")
+ }
+ msg, err := nsq.DecodeMessage(data)
+ if err != nil {
+ panic(err.Error())
+ }
+ nsq.Finish(msg.Id).Write(rw)
+ rdy--
+ if rdy == 0 && numRdy > 0 {
+ nsq.Ready(rdyCount).Write(rw)
+ rdy = rdyCount
+ numRdy--
+ rw.Flush()
+ }
+ }
+}
@@ -0,0 +1,80 @@
+package main
+
+import (
+ "../../nsq"
+ "bufio"
+ "bytes"
+ "flag"
+ "log"
+ "net"
+ "runtime"
+ "sync"
+ "time"
+)
+
+var (
+ num = flag.Int("num", 1000000, "num messages")
+ tcpAddress = flag.String("nsqd-tcp-address", "127.0.0.1:4150", "<addr>:<port> to connect to nsqd")
+ topic = flag.String("topic", "sub_bench", "topic to receive messages on")
+ size = flag.Int("size", 200, "size of messages")
+ batchSize = flag.Int("batch-size", 200, "batch size of messages")
+)
+
+func main() {
+ flag.Parse()
+ var wg sync.WaitGroup
+
+ msg := make([]byte, *size)
+ batch := make([][]byte, 0)
+ for i := 0; i < *batchSize; i++ {
+ batch = append(batch, msg)
+ }
+
+ start := time.Now()
+ for j := 0; j < runtime.GOMAXPROCS(0); j++ {
+ wg.Add(1)
+ go func() {
+ pubWorker(*num, *tcpAddress, *batchSize, batch, *topic)
+ wg.Done()
+ }()
+ }
+
+ wg.Wait()
+ end := time.Now()
+ duration := end.Sub(start)
+ log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op",
+ duration,
+ float64(*num * 200) / duration.Seconds() / 1024 / 1024,
+ float64(*num) / duration.Seconds(),
+ float64(duration / time.Microsecond) / float64(*num))
+}
+
+func pubWorker(n int, tcpAddr string, batchSize int, batch [][]byte, topic string) {
+ conn, err := net.DialTimeout("tcp", tcpAddr, time.Second)
+ if err != nil {
+ panic(err.Error())
+ }
+ conn.Write(nsq.MagicV2)
+ rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
+
+ num := n / runtime.GOMAXPROCS(0) / batchSize
+ for i := 0; i < num; i += 1 {
+ cmd, _ := nsq.MultiPublish(topic, batch)
+ err := cmd.Write(rw)
+ if err != nil {
+ panic(err.Error())
+ }
+ err = rw.Flush()
+ if err != nil {
+ panic(err.Error())
+ }
+ resp, err := nsq.ReadResponse(rw)
+ if err != nil {
+ panic(err.Error())
+ }
+ _, data, _ := nsq.UnpackResponse(resp)
+ if !bytes.Equal(data, []byte("OK")) {
+ panic("invalid response")
+ }
+ }
+}

0 comments on commit 857aa94

Please sign in to comment.