This repository has been archived by the owner on Apr 2, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 531
/
config.go
578 lines (518 loc) · 17.9 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
/***** BEGIN LICENSE BLOCK *****
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2012
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Rob Miller (rmiller@mozilla.com)
# Mike Trinkala (trink@mozilla.com)
#
# ***** END LICENSE BLOCK *****/
package pipeline
import (
"code.google.com/p/go-uuid/uuid"
"fmt"
"github.com/bbangert/toml"
. "github.com/mozilla-services/heka/message"
"log"
"os"
"regexp"
"sync"
"time"
)
// Cap size of our decoder set arrays
const MAX_HEADER_MESSAGEENCODING Header_MessageEncoding = 256
var (
AvailablePlugins = make(map[string]func() interface{})
DecodersByEncoding = make(map[Header_MessageEncoding]string)
topHeaderMessageEncoding Header_MessageEncoding
PluginTypeRegex = regexp.MustCompile("^.*(Decoder|Filter|Input|Output)$")
)
// Adds a plugin to the set of usable Heka plugins that can be referenced from
// a Heka config file.
func RegisterPlugin(name string, factory func() interface{}) {
AvailablePlugins[name] = factory
}
// Generic plugin configuration type that will be used for plugins that don't
// provide the `HasConfigStruct` interface.
type PluginConfig map[string]toml.Primitive
// API made available to all plugins providing Heka-wide utility functions.
type PluginHelper interface {
// Returns an `OutputRunner` for an output plugin registered using the
// specified name, or ok == false if no output by that name is registered.
Output(name string) (oRunner OutputRunner, ok bool)
// Returns an `FilterRunner` for a filter plugin registered using the
// specified name, or ok == false if no filter by that name is registered.
Filter(name string) (fRunner FilterRunner, ok bool)
// Returns the currently running Heka instance's unique PipelineConfig
// object.
PipelineConfig() *PipelineConfig
// Returns a single `DecoderSet` of running decoders for use by any plugin
// (usually inputs) that wants to decode binary data into a `Message`
// struct.
DecoderSet() DecoderSet
// Expects a loop count value from an existing message (or zero if there's
// no relevant existing message), returns an initialized `PipelinePack`
// pointer that can be populated w/ message data and inserted into the
// Heka pipeline. Returns `nil` if the loop count value provided is
// greater than the maximum allowed by the Heka instance.
PipelinePack(msgLoopCount uint) *PipelinePack
}
// Indicates a plug-in has a specific-to-itself config struct that should be
// passed in to its Init method.
type HasConfigStruct interface {
// Returns a default-value-populated configuration structure into which
// the plugin's TOML configuration will be deserialized.
ConfigStruct() interface{}
}
// Master config object encapsulating the entire heka/pipeline configuration.
type PipelineConfig struct {
// All running InputRunners, by name.
InputRunners map[string]InputRunner
// PluginWrappers that can create Decoder plugin objects.
DecoderWrappers map[string]*PluginWrapper
// All available running DecoderSets
DecoderSets []DecoderSet
// All running FilterRunners, by name.
FilterRunners map[string]FilterRunner
// All running OutputRunners, by name.
OutputRunners map[string]OutputRunner
// Heka message router instance.
router *messageRouter
// PipelinePack supply for Input plugins.
inputRecycleChan chan *PipelinePack
// PipelinePack supply for Filter plugins (separate pool prevents
// deadlocks).
injectRecycleChan chan *PipelinePack
// Stores log messages generated by plugin config errors.
logMsgs []string
// Lock protecting access to the set of running filters so dynamic filters
// can be safely added and removed whil Heka is running.
filtersLock sync.Mutex
// Is freed when all FilterRunners have stopped.
filtersWg sync.WaitGroup
// Is freed when all DecoderRunners have stopped.
decodersWg sync.WaitGroup
// Channel providing round-robin access to the initialized DecoderSets.
decodersChan chan DecoderSet
// Name of host on which Heka is running.
hostname string
// Heka process id.
pid int32
}
// Creates and initializes a PipelineConfig object. `nil` value for `globals`
// argument means we should use the default global config values.
func NewPipelineConfig(globals *GlobalConfigStruct) (config *PipelineConfig) {
config = new(PipelineConfig)
if globals == nil {
globals = DefaultGlobals()
}
// Replace global `Globals` function w/ one that returns our values.
Globals = func() *GlobalConfigStruct {
return globals
}
config.InputRunners = make(map[string]InputRunner)
config.DecoderWrappers = make(map[string]*PluginWrapper)
config.DecoderSets = make([]DecoderSet, globals.DecoderPoolSize)
config.FilterRunners = make(map[string]FilterRunner)
config.OutputRunners = make(map[string]OutputRunner)
config.router = NewMessageRouter()
config.inputRecycleChan = make(chan *PipelinePack, globals.PoolSize)
config.injectRecycleChan = make(chan *PipelinePack, globals.PoolSize)
config.logMsgs = make([]string, 0, 4)
config.decodersChan = make(chan DecoderSet, globals.DecoderPoolSize)
config.hostname, _ = os.Hostname()
config.pid = int32(os.Getpid())
return config
}
// Callers should pass in the msgLoopCount value from any relevant Message
// objects they are holding. Returns a PipelinePack for injection into Heka
// pipeline, or nil if the msgLoopCount is above the configured maximum.
func (self *PipelineConfig) PipelinePack(msgLoopCount uint) *PipelinePack {
if msgLoopCount++; msgLoopCount > Globals().MaxMsgLoops {
return nil
}
pack := <-self.injectRecycleChan
pack.Message.SetTimestamp(time.Now().UnixNano())
pack.Message.SetUuid(uuid.NewRandom())
pack.Message.SetHostname(self.hostname)
pack.Message.SetPid(self.pid)
pack.RefCount = 1
pack.MsgLoopCount = msgLoopCount
return pack
}
// Returns OutputRunner registered under the specified name, or nil (and ok ==
// false) if no such name is registered.
func (self *PipelineConfig) Output(name string) (oRunner OutputRunner, ok bool) {
oRunner, ok = self.OutputRunners[name]
return
}
// Returns the underlying config object via the Helper interface.
func (self *PipelineConfig) PipelineConfig() *PipelineConfig {
return self
}
// Returns a running DecoderSet instance, round-robin cycling through all
// created DecoderSets.
func (self *PipelineConfig) DecoderSet() (ds DecoderSet) {
ch := <-self.decodersChan
self.decodersChan <- ch
return ch
}
// Returns a FilterRunner with the given name, or nil and ok == false if no
// such name is registered.
func (self *PipelineConfig) Filter(name string) (fRunner FilterRunner, ok bool) {
fRunner, ok = self.FilterRunners[name]
return
}
// Starts the provided FilterRunner and adds it to the set of running Filters.
func (self *PipelineConfig) AddFilterRunner(fRunner FilterRunner) error {
self.filtersLock.Lock()
defer self.filtersLock.Unlock()
self.FilterRunners[fRunner.Name()] = fRunner
self.filtersWg.Add(1)
if err := fRunner.Start(self, &self.filtersWg); err != nil {
self.filtersWg.Done()
return fmt.Errorf("AddFilterRunner '%s' failed to start: %s",
fRunner.Name(), err)
} else {
self.router.MrChan() <- fRunner.MatchRunner()
}
return nil
}
// Removes the specified FilterRunner from the configuration, returns false if
// no such name is registered.
func (self *PipelineConfig) RemoveFilterRunner(name string) bool {
if Globals().Stopping {
return false
}
self.filtersLock.Lock()
defer self.filtersLock.Unlock()
if fRunner, ok := self.FilterRunners[name]; ok {
self.router.MrChan() <- fRunner.MatchRunner()
close(fRunner.InChan())
delete(self.FilterRunners, name)
return true
}
return false
}
type ConfigFile PluginConfig
// The TOML spec for plugin configuration options that will be pulled out by
// Heka itself for runner configuration before the config is passed to the
// Plugin.Init method.
type PluginGlobals struct {
Typ string `toml:"type"`
Ticker uint `toml:"ticker_interval"`
Encoding string `toml:"encoding_name"`
Matcher string `toml:"message_matcher"`
Signer string `toml:"message_signer"`
}
// Default Decoders configuration.
var defaultDecoderTOML = `
[JsonDecoder]
encoding_name = "JSON"
[ProtobufDecoder]
encoding_name = "PROTOCOL_BUFFER"
`
// A helper object to support delayed plugin creation.
type PluginWrapper struct {
name string
configCreator func() interface{}
pluginCreator func() interface{}
}
// Create a new instance of the plugin and return it. Errors are ignored. Call
// CreateWithError if an error is needed.
func (self *PluginWrapper) Create() (plugin interface{}) {
plugin, _ = self.CreateWithError()
return
}
// Create a new instance of the plugin and return it, or nil and appropriate
// error value if this isn't possible.
func (self *PluginWrapper) CreateWithError() (plugin interface{}, err error) {
defer func() {
// Slight protection against Init call into plugin code.
if r := recover(); r != nil {
plugin = nil
err = fmt.Errorf("'%s' Init() panicked: %s", self.name, r)
}
}()
plugin = self.pluginCreator()
err = plugin.(Plugin).Init(self.configCreator())
return
}
// If `configable` supports the `HasConfigStruct` interface this will use said
// interface to fetch a config struct object and populate it w/ the values in
// provided `config`. If not, simply returns `config` unchanged.
func LoadConfigStruct(config toml.Primitive, configable interface{}) (
configStruct interface{}, err error) {
// On two lines for scoping reasons.
hasConfigStruct, ok := configable.(HasConfigStruct)
if !ok {
// If we don't have a config struct, change it to a PluginConfig
configStruct = PluginConfig{}
if err = toml.PrimitiveDecode(config, configStruct); err != nil {
configStruct = nil
}
return
}
defer func() {
// Slight protection against ConfigStruct call into plugin code.
if r := recover(); r != nil {
configStruct = nil
err = fmt.Errorf("ConfigStruct() panicked: %s", r)
}
}()
configStruct = hasConfigStruct.ConfigStruct()
if err = toml.PrimitiveDecode(config, configStruct); err != nil {
configStruct = nil
err = fmt.Errorf("Can't unmarshal config: %s", err)
}
return
}
// Registers a the specified decoder to be used for messages with the
// specified Heka protocol encoding header.
func regDecoderForHeader(decoderName string, encodingName string) (err error) {
var encoding Header_MessageEncoding
var ok bool
if encodingInt32, ok := Header_MessageEncoding_value[encodingName]; !ok {
err = fmt.Errorf("No Header_MessageEncoding named '%s'", encodingName)
return
} else {
encoding = Header_MessageEncoding(encodingInt32)
}
if encoding > MAX_HEADER_MESSAGEENCODING {
err = fmt.Errorf("Header_MessageEncoding '%s' value '%d' higher than max '%d'",
encodingName, encoding, MAX_HEADER_MESSAGEENCODING)
return
}
// Be nice to be able to verify that this is actually a decoder.
if _, ok = AvailablePlugins[decoderName]; !ok {
err = fmt.Errorf("No decoder named '%s' registered as a plugin", decoderName)
return
}
if encoding > topHeaderMessageEncoding {
topHeaderMessageEncoding = encoding
}
DecodersByEncoding[encoding] = decoderName
return
}
// Used internally to log and record plugin config loading errors.
func (self *PipelineConfig) log(msg string) {
self.logMsgs = append(self.logMsgs, msg)
log.Println(msg)
}
// loadSection must be passed a plugin name and the config for that plugin. It
// will create a PluginWrapper (i.e. a factory). For decoders the
// PluginWrappers are stored and used later to create the DecoderSet pool. For
// the other plugin types, we create the plugin, configure it, then create the
// appropriate plugin runner.
func (self *PipelineConfig) loadSection(sectionName string,
configSection toml.Primitive) (errcnt uint) {
var ok bool
var err error
var pluginGlobals PluginGlobals
var pluginType string
wrapper := new(PluginWrapper)
wrapper.name = sectionName
if err = toml.PrimitiveDecode(configSection, &pluginGlobals); err != nil {
self.log(fmt.Sprintf("Unable to decode config for plugin: %s, error: %s",
wrapper.name, err.Error()))
errcnt++
return
}
if pluginGlobals.Typ == "" {
pluginType = sectionName
} else {
pluginType = pluginGlobals.Typ
}
if wrapper.pluginCreator, ok = AvailablePlugins[pluginType]; !ok {
self.log(fmt.Sprintf("No such plugin: %s", wrapper.name))
errcnt++
return
}
// Create plugin, test config object generation.
plugin := wrapper.pluginCreator()
var config interface{}
if config, err = LoadConfigStruct(configSection, plugin); err != nil {
self.log(fmt.Sprintf("Can't load config for %s '%s': %s", sectionName,
wrapper.name, err))
errcnt++
return
}
wrapper.configCreator = func() interface{} { return config }
// Apply configuration to instantiated plugin.
configPlugin := func() (err error) {
defer func() {
// Slight protection against Init call into plugin code.
if r := recover(); r != nil {
err = fmt.Errorf("Init() panicked: %s", r)
}
}()
err = plugin.(Plugin).Init(config)
return
}
if err = configPlugin(); err != nil {
self.log(fmt.Sprintf("Initialization failed for '%s': %s",
sectionName, err))
errcnt++
return
}
// Determine the plugin type
pluginCats := PluginTypeRegex.FindStringSubmatch(pluginType)
if len(pluginCats) < 2 {
self.log(fmt.Sprintf("Type doesn't contain valid plugin name: %s", pluginType))
errcnt++
return
}
pluginCategory := pluginCats[1]
// For decoders check to see if we need to register against a protocol
// header, store the wrapper and continue.
if pluginCategory == "Decoder" {
if pluginGlobals.Encoding != "" {
err = regDecoderForHeader(pluginType, pluginGlobals.Encoding)
if err != nil {
self.log(fmt.Sprintf(
"Can't register decoder '%s' for encoding '%s': %s",
wrapper.name, pluginGlobals.Encoding, err))
errcnt++
return
}
}
self.DecoderWrappers[wrapper.name] = wrapper
return
}
// For inputs we just store the InputRunner and we're done.
if pluginCategory == "Input" {
self.InputRunners[wrapper.name] = NewInputRunner(wrapper.name, plugin.(Input))
return
}
// Filters and outputs have a few more config settings.
runner := NewFORunner(wrapper.name, plugin.(Plugin))
runner.name = wrapper.name
if pluginGlobals.Ticker != 0 {
runner.tickLength = time.Duration(pluginGlobals.Ticker) * time.Second
}
var matcher *MatchRunner
if pluginGlobals.Matcher != "" {
if matcher, err = NewMatchRunner(pluginGlobals.Matcher,
pluginGlobals.Signer); err != nil {
self.log(fmt.Sprintf("Can't create message matcher for '%s': %s",
wrapper.name, err))
errcnt++
return
}
runner.matcher = matcher
}
switch pluginCategory {
case "Filter":
if matcher != nil {
self.router.fMatchers = append(self.router.fMatchers, matcher)
}
self.FilterRunners[runner.name] = runner
case "Output":
if matcher != nil {
self.router.oMatchers = append(self.router.oMatchers, matcher)
}
self.OutputRunners[runner.name] = runner
}
return
}
// LoadFromConfigFile loads a TOML configuration file and stores the
// result in the value pointed to by config. The maps in the config
// will be initialized as needed.
//
// The PipelineConfig should be already initialized before passed in via
// its Init function.
func (self *PipelineConfig) LoadFromConfigFile(filename string) (err error) {
var configFile ConfigFile
if _, err = toml.DecodeFile(filename, &configFile); err != nil {
return fmt.Errorf("Error decoding config file: %s", err)
}
// Load all the plugins
var errcnt uint
for name, conf := range configFile {
log.Println("Loading: ", name)
errcnt += self.loadSection(name, conf)
}
// Add JSON/PROTOCOL_BUFFER decoders if none were configured
var configDefault ConfigFile
toml.Decode(defaultDecoderTOML, &configDefault)
dWrappers := self.DecoderWrappers
if _, ok := dWrappers["JsonDecoder"]; !ok {
log.Println("Loading: JsonDecoder")
errcnt += self.loadSection("JsonDecoder", configDefault["JsonDecoder"])
}
if _, ok := dWrappers["ProtobufDecoder"]; !ok {
log.Println("Loading: ProtobufDecoder")
errcnt += self.loadSection("ProtobufDecoder", configDefault["ProtobufDecoder"])
}
// Create / prep the DecoderSet pool
var dRunner DecoderRunner
for i := 0; i < Globals().DecoderPoolSize; i++ {
if self.DecoderSets[i], err = newDecoderSet(dWrappers); err != nil {
log.Println(err)
errcnt += 1
}
for _, dRunner = range self.DecoderSets[i].AllByName() {
dRunner.Start(self, &self.decodersWg)
}
self.decodersChan <- self.DecoderSets[i]
}
if errcnt != 0 {
return fmt.Errorf("%d errors loading plugins", errcnt)
}
return
}
func init() {
RegisterPlugin("UdpInput", func() interface{} {
return new(UdpInput)
})
RegisterPlugin("TcpInput", func() interface{} {
return new(TcpInput)
})
RegisterPlugin("JsonDecoder", func() interface{} {
return new(JsonDecoder)
})
RegisterPlugin("ProtobufDecoder", func() interface{} {
return new(ProtobufDecoder)
})
RegisterPlugin("StatsdInput", func() interface{} {
return new(StatsdInput)
})
RegisterPlugin("LogOutput", func() interface{} {
return new(LogOutput)
})
RegisterPlugin("FileOutput", func() interface{} {
return new(FileOutput)
})
RegisterPlugin("WhisperOutput", func() interface{} {
return new(WhisperOutput)
})
RegisterPlugin("LogfileInput", func() interface{} {
return new(LogfileInput)
})
RegisterPlugin("TcpOutput", func() interface{} {
return new(TcpOutput)
})
RegisterPlugin("StatFilter", func() interface{} {
return new(StatFilter)
})
RegisterPlugin("SandboxFilter", func() interface{} {
return new(SandboxFilter)
})
RegisterPlugin("TransformFilter", func() interface{} {
return new(TransformFilter)
})
RegisterPlugin("CounterFilter", func() interface{} {
return new(CounterFilter)
})
RegisterPlugin("SandboxManagerFilter", func() interface{} {
return new(SandboxManagerFilter)
})
RegisterPlugin("DashboardOutput", func() interface{} {
return new(DashboardOutput)
})
}