forked from nsqio/nsq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bench_channels.go
90 lines (83 loc) · 1.76 KB
/
bench_channels.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main
import (
"bufio"
"flag"
"fmt"
"net"
"sync"
"time"
"github.com/nsqio/go-nsq"
)
var (
num = flag.Int("num", 10000, "num channels")
tcpAddress = flag.String("nsqd-tcp-address", "127.0.0.1:4150", "<addr>:<port> to connect to nsqd")
)
func main() {
flag.Parse()
var wg sync.WaitGroup
goChan := make(chan int)
rdyChan := make(chan int)
for j := 0; j < *num; j++ {
wg.Add(1)
go func(id int) {
subWorker(*num, *tcpAddress, fmt.Sprintf("t%d", j), "ch", rdyChan, goChan, id)
wg.Done()
}(j)
<-rdyChan
time.Sleep(5 * time.Millisecond)
}
close(goChan)
wg.Wait()
}
func subWorker(n int, tcpAddr string,
topic string, channel string,
rdyChan chan int, goChan chan int, id int) {
conn, err := net.DialTimeout("tcp", tcpAddr, time.Second)
if err != nil {
panic(err.Error())
}
conn.Write(nsq.MagicV2)
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
ci := make(map[string]interface{})
ci["short_id"] = "test"
ci["long_id"] = "test"
cmd, _ := nsq.Identify(ci)
cmd.WriteTo(rw)
nsq.Subscribe(topic, channel).WriteTo(rw)
rdyCount := 1
rdy := rdyCount
rdyChan <- 1
<-goChan
nsq.Ready(rdyCount).WriteTo(rw)
rw.Flush()
nsq.ReadResponse(rw)
nsq.ReadResponse(rw)
for {
resp, err := nsq.ReadResponse(rw)
if err != nil {
panic(err.Error())
}
frameType, data, err := nsq.UnpackResponse(resp)
if err != nil {
panic(err.Error())
}
if frameType == nsq.FrameTypeError {
panic(string(data))
} else if frameType == nsq.FrameTypeResponse {
nsq.Nop().WriteTo(rw)
rw.Flush()
continue
}
msg, err := nsq.DecodeMessage(data)
if err != nil {
panic(err.Error())
}
nsq.Finish(msg.ID).WriteTo(rw)
rdy--
if rdy == 0 {
nsq.Ready(rdyCount).WriteTo(rw)
rdy = rdyCount
rw.Flush()
}
}
}