forked from DataDog/datadog-agent
/
launcher.go
94 lines (84 loc) · 2.69 KB
/
launcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-2019 Datadog, Inc.
package windowsevent
import (
"github.com/ninnemana/datadog-agent/pkg/util/log"
"github.com/ninnemana/datadog-agent/pkg/logs/config"
"github.com/ninnemana/datadog-agent/pkg/logs/pipeline"
"github.com/ninnemana/datadog-agent/pkg/logs/restart"
)
// Launcher is in charge of starting and stopping windows event logs tailers
type Launcher struct {
sources chan *config.LogSource
pipelineProvider pipeline.Provider
tailers map[string]*Tailer
stop chan struct{}
}
// NewLauncher returns a new Launcher.
func NewLauncher(sources *config.LogSources, pipelineProvider pipeline.Provider) *Launcher {
return &Launcher{
sources: sources.GetAddedForType(config.WindowsEventType),
pipelineProvider: pipelineProvider,
tailers: make(map[string]*Tailer),
stop: make(chan struct{}),
}
}
// Start starts the launcher.
func (l *Launcher) Start() {
availableChannels, err := EnumerateChannels()
if err != nil {
log.Debug("Could not list windows event log channels: ", err)
} else {
log.Debug("Found available windows event log channels: ", availableChannels)
}
go l.run()
}
// run starts new tailers.
func (l *Launcher) run() {
for {
select {
case source := <-l.sources:
identifier := Identifier(source.Config.ChannelPath, source.Config.Query)
if _, exists := l.tailers[identifier]; exists {
// tailer already setup
continue
}
tailer, err := l.setupTailer(source)
if err != nil {
log.Info("Could not set up windows event log tailer: ", err)
} else {
l.tailers[identifier] = tailer
}
case <-l.stop:
return
}
}
}
// Stop stops all active tailers
func (l *Launcher) Stop() {
l.stop <- struct{}{}
stopper := restart.NewParallelStopper()
for _, tailer := range l.tailers {
stopper.Add(tailer)
delete(l.tailers, tailer.Identifier())
}
stopper.Stop()
}
// sanitizedConfig sets default values for the config
func (l *Launcher) sanitizedConfig(sourceConfig *config.LogsConfig) *Config {
config := &Config{sourceConfig.ChannelPath, sourceConfig.Query}
if config.Query == "" {
config.Query = "*"
}
return config
}
// setupTailer configures and starts a new tailer
func (l *Launcher) setupTailer(source *config.LogSource) (*Tailer, error) {
sanitizedConfig := l.sanitizedConfig(source.Config)
config := &Config{sanitizedConfig.ChannelPath, sanitizedConfig.Query}
tailer := NewTailer(source, config, l.pipelineProvider.NextPipelineChan())
tailer.Start()
return tailer, nil
}