-
Notifications
You must be signed in to change notification settings - Fork 0
/
nsq_stat.go
131 lines (108 loc) · 3.34 KB
/
nsq_stat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// This is a utility application that polls /stats for all the producers
// of the specified topic/channel and displays aggregate stats
package main
import (
"errors"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/bitly/nsq/util"
"github.com/bitly/nsq/util/lookupd"
)
var (
showVersion = flag.Bool("version", false, "print version")
topic = flag.String("topic", "", "NSQ topic")
channel = flag.String("channel", "", "NSQ channel")
statusEvery = flag.Duration("status-every", 2*time.Second, "duration of time between polling/printing output")
nsqdHTTPAddrs = util.StringArray{}
lookupdHTTPAddrs = util.StringArray{}
)
func init() {
flag.Var(&nsqdHTTPAddrs, "nsqd-http-address", "nsqd HTTP address (may be given multiple times)")
flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
}
func statLoop(interval time.Duration, topic string, channel string,
nsqdTCPAddrs []string, lookupdHTTPAddrs []string) {
i := 0
for {
var producers []string
var err error
log.SetOutput(ioutil.Discard)
if len(lookupdHTTPAddrs) != 0 {
producers, err = lookupd.GetLookupdTopicProducers(topic, lookupdHTTPAddrs)
} else {
producers, err = lookupd.GetNSQDTopicProducers(topic, nsqdHTTPAddrs)
}
log.SetOutput(os.Stdout)
if err != nil {
log.Fatalf("ERROR: failed to get topic producers - %s", err)
}
log.SetOutput(ioutil.Discard)
_, allChannelStats, err := lookupd.GetNSQDStats(producers, topic)
log.SetOutput(os.Stdout)
if err != nil {
log.Fatalf("ERROR: failed to get nsqd stats - %s", err)
}
c, ok := allChannelStats[channel]
if !ok {
log.Fatalf("ERROR: failed to find channel(%s) in stats metadata for topic(%s)", channel, topic)
}
if i%25 == 0 {
fmt.Printf("---------------depth---------------+--------------metadata---------------\n")
fmt.Printf("%7s %7s %7s %5s %5s | %7s %7s %12s %7s\n", "total", "mem", "disk", "inflt", "def", "req", "t-o", "msgs", "clients")
}
// TODO: paused
fmt.Printf("%7d %7d %7d %5d %5d | %7d %7d %12d %7d\n",
c.Depth,
c.MemoryDepth,
c.BackendDepth,
c.InFlightCount,
c.DeferredCount,
c.RequeueCount,
c.TimeoutCount,
c.MessageCount,
c.ClientCount)
time.Sleep(interval)
i++
}
}
func checkAddrs(addrs []string) error {
for _, a := range addrs {
if strings.HasPrefix(a, "http") {
return errors.New("address should not contain scheme")
}
}
return nil
}
func main() {
flag.Parse()
if *showVersion {
fmt.Printf("nsq_stat v%s\n", util.BINARY_VERSION)
return
}
if *topic == "" || *channel == "" {
log.Fatal("--topic and --channel are required")
}
if len(nsqdHTTPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 {
log.Fatal("--nsqd-http-address or --lookupd-http-address required")
}
if len(nsqdHTTPAddrs) > 0 && len(lookupdHTTPAddrs) > 0 {
log.Fatal("use --nsqd-http-address or --lookupd-http-address not both")
}
if err := checkAddrs(nsqdHTTPAddrs); err != nil {
log.Fatalf("--nsqd-http-address error - %s", err)
}
if err := checkAddrs(lookupdHTTPAddrs); err != nil {
log.Fatalf("--lookupd-http-address error - %s", err)
}
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
go statLoop(*statusEvery, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs)
<-termChan
}