forked from hashicorp/serf
/
command.go
279 lines (237 loc) · 8.17 KB
/
command.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
package agent
import (
"flag"
"fmt"
"github.com/hashicorp/logutils"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/cli"
"os"
"strings"
"sync"
"time"
)
// Command is a Command implementation that runs a Serf agent.
// The command will not end unless a shutdown message is sent on the
// ShutdownCh. If two messages are sent on the ShutdownCh it will forcibly
// exit.
type Command struct {
ShutdownCh <-chan struct{}
Ui cli.Ui
lock sync.Mutex
shuttingDown bool
}
func (c *Command) Run(args []string) int {
ui := &cli.PrefixedUi{
OutputPrefix: "==> ",
InfoPrefix: " ",
ErrorPrefix: "==> ",
Ui: c.Ui,
}
var cmdConfig Config
var configFiles []string
cmdFlags := flag.NewFlagSet("agent", flag.ContinueOnError)
cmdFlags.Usage = func() { ui.Output(c.Help()) }
cmdFlags.StringVar(&cmdConfig.BindAddr, "bind", "", "address to bind listeners to")
cmdFlags.Var((*AppendSliceValue)(&configFiles), "config-file",
"json file to read config from")
cmdFlags.Var((*AppendSliceValue)(&configFiles), "config-dir",
"directory of json files to read")
cmdFlags.StringVar(&cmdConfig.EncryptKey, "encrypt", "", "encryption key")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.EventHandlers), "event-handler",
"command to execute when events occur")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.StartJoin), "join",
"address of agent to join on startup")
cmdFlags.StringVar(&cmdConfig.LogLevel, "log-level", "", "log level")
cmdFlags.StringVar(&cmdConfig.NodeName, "node", "", "node name")
cmdFlags.IntVar(&cmdConfig.Protocol, "protocol", -1, "protocol version")
cmdFlags.StringVar(&cmdConfig.Role, "role", "", "role name")
cmdFlags.StringVar(&cmdConfig.RPCAddr, "rpc-addr", "",
"address to bind RPC listener to")
if err := cmdFlags.Parse(args); err != nil {
return 1
}
config := DefaultConfig
if len(configFiles) > 0 {
fileConfig, err := ReadConfigPaths(configFiles)
if err != nil {
c.Ui.Error(err.Error())
return 1
}
config = MergeConfig(config, fileConfig)
}
config = MergeConfig(config, &cmdConfig)
if config.NodeName == "" {
hostname, err := os.Hostname()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error determining hostname: %s", err))
return 1
}
config.NodeName = hostname
}
eventScripts, err := config.EventScripts()
if err != nil {
c.Ui.Error(err.Error())
return 1
}
for _, script := range eventScripts {
if !script.Valid() {
c.Ui.Error(fmt.Sprintf("Invalid event script: %s", script.String()))
return 1
}
}
bindIP, bindPort, err := config.BindAddrParts()
if err != nil {
c.Ui.Error(fmt.Sprintf("Invalid bind address: %s", err))
return 1
}
encryptKey, err := config.EncryptBytes()
if err != nil {
c.Ui.Error(fmt.Sprintf("Invalid encryption key: %s", err))
return 1
}
// Setup logging. First create the gated log writer, which will
// store logs until we're ready to show them. Then create the level
// filter, filtering logs of the specified level.
logGate := &GatedWriter{
Writer: &cli.UiWriter{Ui: c.Ui},
}
logLevelFilter := LevelFilter()
logLevelFilter.MinLevel = logutils.LogLevel(strings.ToUpper(config.LogLevel))
logLevelFilter.Writer = logGate
if !ValidateLevelFilter(logLevelFilter) {
ui.Error(fmt.Sprintf(
"Invalid log level: %s. Valid log levels are: %v",
logLevelFilter.MinLevel, logLevelFilter.Levels))
return 1
}
serfConfig := serf.DefaultConfig()
serfConfig.MemberlistConfig.BindAddr = bindIP
serfConfig.MemberlistConfig.TCPPort = bindPort
serfConfig.MemberlistConfig.UDPPort = bindPort
serfConfig.MemberlistConfig.SecretKey = encryptKey
serfConfig.NodeName = config.NodeName
serfConfig.Role = config.Role
serfConfig.ProtocolVersion = uint8(config.Protocol)
serfConfig.CoalescePeriod = 3 * time.Second
serfConfig.QuiescentPeriod = time.Second
serfConfig.UserCoalescePeriod = 3 * time.Second
serfConfig.UserQuiescentPeriod = time.Second
agent := &Agent{
EventHandler: &ScriptEventHandler{
Self: serf.Member{
Name: serfConfig.NodeName,
Role: serfConfig.Role,
},
Scripts: eventScripts,
},
LogOutput: logLevelFilter,
RPCAddr: config.RPCAddr,
SerfConfig: serfConfig,
}
ui.Output("Starting Serf agent...")
if err := agent.Start(); err != nil {
ui.Error(err.Error())
return 1
}
defer agent.Shutdown()
ui.Output("Serf agent running!")
ui.Info(fmt.Sprintf("Node name: '%s'", config.NodeName))
ui.Info(fmt.Sprintf("Bind addr: '%s:%d'", bindIP, bindPort))
ui.Info(fmt.Sprintf(" RPC addr: '%s'", config.RPCAddr))
ui.Info(fmt.Sprintf("Encrypted: %#v", config.EncryptKey != ""))
if len(config.StartJoin) > 0 {
ui.Output("Joining cluster...")
n, err := agent.Join(config.StartJoin, true)
if err != nil {
ui.Error(err.Error())
return 1
}
ui.Info(fmt.Sprintf("Join completed. Synced with %d initial agents", n))
}
ui.Info("")
ui.Output("Log data will now stream in as it occurs:\n")
logGate.Flush()
graceful, forceful := c.startShutdownWatcher(agent, ui)
select {
case <-graceful:
case <-forceful:
// Forcefully shut down, return a bad exit status.
return 1
}
return 0
}
func (c *Command) startShutdownWatcher(agent *Agent, ui cli.Ui) (graceful <-chan struct{}, forceful <-chan struct{}) {
g := make(chan struct{})
f := make(chan struct{})
graceful = g
forceful = f
go func() {
<-c.ShutdownCh
c.lock.Lock()
c.shuttingDown = true
c.lock.Unlock()
ui.Output("Gracefully shutting down agent...")
go func() {
if err := agent.Shutdown(); err != nil {
ui.Error(fmt.Sprintf("Error: %s", err))
return
}
close(g)
}()
select {
case <-g:
// Gracefully shut down properly
case <-c.ShutdownCh:
close(f)
}
}()
return
}
func (c *Command) Synopsis() string {
return "Runs a Serf agent"
}
func (c *Command) Help() string {
helpText := `
Usage: serf agent [options]
Starts the Serf agent and runs until an interrupt is received. The
agent represents a single node in a cluster.
Options:
-bind=0.0.0.0 Address to bind network listeners to
-config-file=foo Path to a JSON file to read configuration from.
This can be specified multiple times.
-config-dir=foo Path to a directory to read configuration files
from. This will read every file ending in ".json"
as configuration in this directory in alphabetical
order.
-encrypt=foo Key for encrypting network traffic within Serf.
Must be a base64-encoded 16-byte key.
-event-handler=foo Script to execute when events occur. This can
be specified multiple times. See the event scripts
section below for more info.
-join=addr An initial agent to join with. This flag can be
specified multiple times.
-log-level=info Log level of the agent.
-node=hostname Name of this node. Must be unique in the cluster
-protocol=n Serf protocol version to use. This defaults to
the latest version, but can be set back for upgrades.
-role=foo The role of this node, if any. This can be used
by event scripts to differentiate different types
of nodes that may be part of the same cluster.
-rpc-addr=127.0.0.1:7373 Address to bind the RPC listener.
Event handlers:
For more information on what event handlers are, please read the
Serf documentation. This section will document how to configure them
on the command-line. There are three methods of specifying an event
handler:
- The value can be a plain script, such as "event.sh". In this case,
Serf will send all events to this script, and you'll be responsible
for differentiating between them based on the SERF_EVENT.
- The value can be in the format of "TYPE=SCRIPT", such as
"member-join=join.sh". With this format, Serf will only send events
of that type to that script.
- The value can be in the format of "user:EVENT=SCRIPT", such as
"user:deploy=deploy.sh". This means that Serf will only invoke this
script in the case of user events named "deploy".
`
return strings.TrimSpace(helpText)
}