-
Notifications
You must be signed in to change notification settings - Fork 0
/
nsqd.go
236 lines (200 loc) · 8.25 KB
/
nsqd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
package main
import (
"crypto/tls"
"flag"
"fmt"
"log"
"math/rand"
"os"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"
"github.com/BurntSushi/toml"
"github.com/judwhite/go-svc/svc"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/internal/app"
"github.com/nsqio/nsq/internal/version"
"github.com/nsqio/nsq/nsqd"
)
type tlsRequiredOption int
func (t *tlsRequiredOption) Set(s string) error {
s = strings.ToLower(s)
if s == "tcp-https" {
*t = nsqd.TLSRequiredExceptHTTP
return nil
}
required, err := strconv.ParseBool(s)
if required {
*t = nsqd.TLSRequired
} else {
*t = nsqd.TLSNotRequired
}
return err
}
func (t *tlsRequiredOption) Get() interface{} { return int(*t) }
func (t *tlsRequiredOption) String() string {
return strconv.FormatInt(int64(*t), 10)
}
func (t *tlsRequiredOption) IsBoolFlag() bool { return true }
type tlsMinVersionOption uint16
func (t *tlsMinVersionOption) Set(s string) error {
s = strings.ToLower(s)
switch s {
case "":
return nil
case "ssl3.0":
*t = tls.VersionSSL30
case "tls1.0":
*t = tls.VersionTLS10
case "tls1.1":
*t = tls.VersionTLS11
case "tls1.2":
*t = tls.VersionTLS12
default:
return fmt.Errorf("unknown tlsVersionOption %q", s)
}
return nil
}
func (t *tlsMinVersionOption) Get() interface{} { return uint16(*t) }
func (t *tlsMinVersionOption) String() string {
return strconv.FormatInt(int64(*t), 10)
}
func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet := flag.NewFlagSet("nsqd", flag.ExitOnError)
// basic options
flagSet.Bool("version", false, "print version string")
flagSet.Bool("verbose", false, "enable verbose logging")
flagSet.String("config", "", "path to config file")
flagSet.Int64("worker-id", opts.ID, "unique seed for message ID generation (int) in range [0,4096) (will default to a hash of hostname)")
flagSet.String("https-address", opts.HTTPSAddress, "<addr>:<port> to listen on for HTTPS clients")
flagSet.String("http-address", opts.HTTPAddress, "<addr>:<port> to listen on for HTTP clients")
flagSet.String("tcp-address", opts.TCPAddress, "<addr>:<port> to listen on for TCP clients")
authHTTPAddresses := app.StringArray{}
flagSet.Var(&authHTTPAddresses, "auth-http-address", "<addr>:<port> to query auth server (may be given multiple times)")
flagSet.String("broadcast-address", opts.BroadcastAddress, "address that will be registered with lookupd (defaults to the OS hostname)")
lookupdTCPAddrs := app.StringArray{}
flagSet.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)")
// diskqueue options
flagSet.String("data-path", opts.DataPath, "path to store disk-backed messages")
flagSet.Int64("mem-queue-size", opts.MemQueueSize, "number of messages to keep in memory (per topic/channel)")
flagSet.Int64("max-bytes-per-file", opts.MaxBytesPerFile, "number of bytes per diskqueue file before rolling")
flagSet.Int64("sync-every", opts.SyncEvery, "number of messages per diskqueue fsync")
flagSet.Duration("sync-timeout", opts.SyncTimeout, "duration of time per diskqueue fsync")
// msg and command options
flagSet.String("msg-timeout", opts.MsgTimeout.String(), "duration to wait before auto-requeing a message")
flagSet.Duration("max-msg-timeout", opts.MaxMsgTimeout, "maximum duration before a message will timeout")
flagSet.Int64("max-msg-size", opts.MaxMsgSize, "maximum size of a single message in bytes")
flagSet.Duration("max-req-timeout", opts.MaxReqTimeout, "maximum requeuing timeout for a message")
// remove, deprecated
flagSet.Int64("max-message-size", opts.MaxMsgSize, "(deprecated use --max-msg-size) maximum size of a single message in bytes")
flagSet.Int64("max-body-size", opts.MaxBodySize, "maximum size of a single command body")
// client overridable configuration options
flagSet.Duration("max-heartbeat-interval", opts.MaxHeartbeatInterval, "maximum client configurable duration of time between client heartbeats")
flagSet.Int64("max-rdy-count", opts.MaxRdyCount, "maximum RDY count for a client")
flagSet.Int64("max-output-buffer-size", opts.MaxOutputBufferSize, "maximum client configurable size (in bytes) for a client output buffer")
flagSet.Duration("max-output-buffer-timeout", opts.MaxOutputBufferTimeout, "maximum client configurable duration of time between flushing to a client")
// statsd integration options
flagSet.String("statsd-address", opts.StatsdAddress, "UDP <addr>:<port> of a statsd daemon for pushing stats")
flagSet.String("statsd-interval", opts.StatsdInterval.String(), "duration between pushing to statsd")
flagSet.Bool("statsd-mem-stats", opts.StatsdMemStats, "toggle sending memory and GC stats to statsd")
flagSet.String("statsd-prefix", opts.StatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement)")
// End to end percentile flags
e2eProcessingLatencyPercentiles := app.FloatArray{}
flagSet.Var(&e2eProcessingLatencyPercentiles, "e2e-processing-latency-percentile", "message processing time percentiles (as float (0, 1.0]) to track (can be specified multiple times or comma separated '1.0,0.99,0.95', default none)")
flagSet.Duration("e2e-processing-latency-window-time", opts.E2EProcessingLatencyWindowTime, "calculate end to end latency quantiles for this duration of time (ie: 60s would only show quantile calculations from the past 60 seconds)")
// TLS config
flagSet.String("tls-cert", opts.TLSCert, "path to certificate file")
flagSet.String("tls-key", opts.TLSKey, "path to key file")
flagSet.String("tls-client-auth-policy", opts.TLSClientAuthPolicy, "client certificate auth policy ('require' or 'require-verify')")
flagSet.String("tls-root-ca-file", opts.TLSRootCAFile, "path to certificate authority file")
tlsRequired := tlsRequiredOption(opts.TLSRequired)
tlsMinVersion := tlsMinVersionOption(opts.TLSMinVersion)
flagSet.Var(&tlsRequired, "tls-required", "require TLS for client connections (true, false, tcp-https)")
flagSet.Var(&tlsMinVersion, "tls-min-version", "minimum SSL/TLS version acceptable ('ssl3.0', 'tls1.0', 'tls1.1', or 'tls1.2')")
// compression
flagSet.Bool("deflate", opts.DeflateEnabled, "enable deflate feature negotiation (client compression)")
flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)")
flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)")
return flagSet
}
type config map[string]interface{}
// Validate settings in the config file, and fatal on errors
func (cfg config) Validate() {
// special validation/translation
if v, exists := cfg["tls_required"]; exists {
var t tlsRequiredOption
err := t.Set(fmt.Sprintf("%v", v))
if err == nil {
cfg["tls_required"] = t.String()
} else {
log.Fatalf("ERROR: failed parsing tls required %v", v)
}
}
if v, exists := cfg["tls_min_version"]; exists {
var t tlsMinVersionOption
err := t.Set(fmt.Sprintf("%v", v))
if err == nil {
newVal := fmt.Sprintf("%v", t.Get())
if newVal != "0" {
cfg["tls_min_version"] = newVal
} else {
delete(cfg, "tls_min_version")
}
} else {
log.Fatalf("ERROR: failed parsing tls min version %v", v)
}
}
}
type program struct {
nsqd *nsqd.NSQD
}
func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
log.Fatal(err)
}
}
func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}
func (p *program) Start() error {
opts := nsqd.NewOptions()
flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])
rand.Seed(time.Now().UTC().UnixNano())
if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqd"))
os.Exit(0)
}
var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
}
}
cfg.Validate()
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)
nsqd.LoadMetadata()
err := nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
nsqd.Main()
p.nsqd = nsqd
return nil
}
func (p *program) Stop() error {
if p.nsqd != nil {
p.nsqd.Exit()
}
return nil
}