-
Notifications
You must be signed in to change notification settings - Fork 0
/
nsqd.go
237 lines (197 loc) · 8.33 KB
/
nsqd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
package main
import (
"crypto/tls"
"flag"
"fmt"
"log"
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"syscall"
"time"
"github.com/judwhite/go-svc/svc"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/nsqd"
)
type tlsRequiredOption int
func (t *tlsRequiredOption) Set(s string) error {
s = strings.ToLower(s)
if s == "tcp-https" {
*t = nsqd.TLSRequiredExceptHTTP
return nil
}
required, err := strconv.ParseBool(s)
if required {
*t = nsqd.TLSRequired
} else {
*t = nsqd.TLSNotRequired
}
return err
}
func (t *tlsRequiredOption) Get() interface{} { return int(*t) }
func (t *tlsRequiredOption) String() string {
return strconv.FormatInt(int64(*t), 10)
}
func (t *tlsRequiredOption) IsBoolFlag() bool { return true }
type tlsMinVersionOption uint16
func (t *tlsMinVersionOption) Set(s string) error {
s = strings.ToLower(s)
switch s {
case "":
return nil
case "ssl3.0":
*t = tls.VersionSSL30
case "tls1.0":
*t = tls.VersionTLS10
case "tls1.1":
*t = tls.VersionTLS11
case "tls1.2":
*t = tls.VersionTLS12
default:
return fmt.Errorf("unknown tlsVersionOption %q", s)
}
return nil
}
func (t *tlsMinVersionOption) Get() interface{} { return uint16(*t) }
func (t *tlsMinVersionOption) String() string {
return strconv.FormatInt(int64(*t), 10)
}
func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet := flag.NewFlagSet("nsqd", flag.ExitOnError)
flagSet.String("log-level", "info", "set log verbosity: debug, info, warn, error, or fatal")
flagSet.String("log-prefix", "[nsqd] ", "log message prefix")
flagSet.Bool("verbose", false, "deprecated in favor of log-level")
flagSet.Int64("node-id", opts.ID, "unique part for message IDs, (int) in range [0,1024) (default is hash of hostname)")
flagSet.Bool("worker-id", false, "do NOT use this, use --node-id")
flagSet.String("https-address", opts.HTTPSAddress, "<addr>:<port> to listen on for HTTPS clients")
flagSet.String("http-address", opts.HTTPAddress, "<addr>:<port> to listen on for HTTP clients")
flagSet.String("tcp-address", opts.TCPAddress, "<addr>:<port> to listen on for TCP clients")
authHTTPAddresses := StringArray{}
flagSet.Var(&authHTTPAddresses, "auth-http-address", "<addr>:<port> to query auth server (may be given multiple times)")
flagSet.String("broadcast-address", opts.BroadcastAddress, "address that will be registered with lookupd (defaults to the OS hostname)")
lookupdTCPAddrs := StringArray{}
flagSet.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)")
flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect")
flagSet.Duration("http-client-request-timeout", opts.HTTPClientRequestTimeout, "timeout for HTTP request")
// diskqueue options
flagSet.String("data-path", opts.DataPath, "path to store disk-backed messages")
flagSet.Int64("mem-queue-size", opts.MemQueueSize, "number of messages to keep in memory (per topic/channel)")
flagSet.Int64("max-bytes-per-file", opts.MaxBytesPerFile, "number of bytes per diskqueue file before rolling")
flagSet.Int64("sync-every", opts.SyncEvery, "number of messages per diskqueue fsync")
flagSet.Duration("sync-timeout", opts.SyncTimeout, "duration of time per diskqueue fsync")
// msg and command options
flagSet.Duration("msg-timeout", opts.MsgTimeout, "default duration to wait before auto-requeing a message")
flagSet.Duration("max-msg-timeout", opts.MaxMsgTimeout, "maximum duration before a message will timeout")
flagSet.Int64("max-msg-size", opts.MaxMsgSize, "maximum size of a single message in bytes")
flagSet.Duration("max-req-timeout", opts.MaxReqTimeout, "maximum requeuing timeout for a message")
flagSet.Int64("max-body-size", opts.MaxBodySize, "maximum size of a single command body")
// client overridable configuration options
flagSet.Duration("max-heartbeat-interval", opts.MaxHeartbeatInterval, "maximum client configurable duration of time between client heartbeats")
flagSet.Int64("max-rdy-count", opts.MaxRdyCount, "maximum RDY count for a client")
flagSet.Int64("max-output-buffer-size", opts.MaxOutputBufferSize, "maximum client configurable size (in bytes) for a client output buffer")
flagSet.Duration("max-output-buffer-timeout", opts.MaxOutputBufferTimeout, "maximum client configurable duration of time between flushing to a client")
// statsd integration options
flagSet.String("statsd-address", opts.StatsdAddress, "UDP <addr>:<port> of a statsd daemon for pushing stats")
flagSet.Duration("statsd-interval", opts.StatsdInterval, "duration between pushing to statsd")
flagSet.Bool("statsd-mem-stats", opts.StatsdMemStats, "toggle sending memory and GC stats to statsd")
flagSet.String("statsd-prefix", opts.StatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement)")
flagSet.Int("statsd-udp-packet-size", opts.StatsdUDPPacketSize, "the size in bytes of statsd UDP packets")
// End to end percentile flags
e2eProcessingLatencyPercentiles := FloatArray{}
flagSet.Var(&e2eProcessingLatencyPercentiles, "e2e-processing-latency-percentile", "message processing time percentiles (as float (0, 1.0]) to track (can be specified multiple times or comma separated '1.0,0.99,0.95', default none)")
flagSet.Duration("e2e-processing-latency-window-time", opts.E2EProcessingLatencyWindowTime, "calculate end to end latency quantiles for this duration of time (ie: 60s would only show quantile calculations from the past 60 seconds)")
// TLS config
flagSet.String("tls-cert", opts.TLSCert, "path to certificate file")
flagSet.String("tls-key", opts.TLSKey, "path to key file")
flagSet.String("tls-client-auth-policy", opts.TLSClientAuthPolicy, "client certificate auth policy ('require' or 'require-verify')")
flagSet.String("tls-root-ca-file", opts.TLSRootCAFile, "path to certificate authority file")
tlsRequired := tlsRequiredOption(opts.TLSRequired)
tlsMinVersion := tlsMinVersionOption(opts.TLSMinVersion)
flagSet.Var(&tlsRequired, "tls-required", "require TLS for client connections (true, false, tcp-https)")
flagSet.Var(&tlsMinVersion, "tls-min-version", "minimum SSL/TLS version acceptable ('ssl3.0', 'tls1.0', 'tls1.1', or 'tls1.2')")
// compression
flagSet.Bool("deflate", opts.DeflateEnabled, "enable deflate feature negotiation (client compression)")
flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)")
flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)")
return flagSet
}
type program struct {
nsqd *nsqd.NSQD
}
func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
log.Fatal(err)
}
}
func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}
func (p *program) Start() error {
opts := nsqd.NewOptions()
flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])
rand.Seed(time.Now().UTC().UnixNano())
options.Resolve(opts, flagSet, nil)
nsqd := nsqd.New(opts)
err := nsqd.LoadMetadata()
if err != nil {
log.Fatalf("ERROR: %s", err.Error())
}
err = nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
nsqd.Main()
p.nsqd = nsqd
return nil
}
func (p *program) Stop() error {
if p.nsqd != nil {
p.nsqd.Exit()
}
return nil
}
//StringArray is a wrapper of string array
type StringArray []string
//Set will append the new value in string array
func (a *StringArray) Set(s string) error {
*a = append(*a, s)
return nil
}
//String will join string values
func (a *StringArray) String() string {
return strings.Join(*a, ",")
}
//FloatArray is a wrapper of float array
type FloatArray []float64
//Set will append the new value in float array
func (a *FloatArray) Set(param string) error {
for _, s := range strings.Split(param, ",") {
v, err := strconv.ParseFloat(s, 64)
if err != nil {
log.Fatalf("Could not parse: %s", s)
return nil
}
*a = append(*a, v)
}
sort.Sort(*a)
return nil
}
func (a FloatArray) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a FloatArray) Less(i, j int) bool { return a[i] > a[j] }
func (a FloatArray) Len() int { return len(a) }
func (a *FloatArray) String() string {
var s []string
for _, v := range *a {
s = append(s, fmt.Sprintf("%f", v))
}
return strings.Join(s, ",")
}