-
Notifications
You must be signed in to change notification settings - Fork 1
/
manager.go
85 lines (75 loc) · 2.27 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package cwl
import (
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/elastic/beats/libbeat/logp"
)
type GroupManager struct {
Params *Params
groups map[string]*Group
}
func NewGroupManager(params *Params) *GroupManager {
return &GroupManager{
Params: params,
groups: make(map[string]*Group),
}
}
func (manager *GroupManager) refreshGroups() {
for _, prospector := range manager.Params.Config.Prospectors {
prospector := prospector
for _, groupName := range prospector.GroupNames {
groupName := groupName
// If input group name doesn't end with a star, then consider it a
// normal group name
if !strings.HasSuffix(groupName, "*") {
if _, ok := manager.groups[groupName]; !ok {
manager.addNewGroup(groupName, &prospector)
}
continue
}
// If the input group name ends with a star, then consider it a prefix and
// find all group names with that prefix
err := manager.Params.AWSClient.DescribeLogGroupsPages(
&cloudwatchlogs.DescribeLogGroupsInput{
LogGroupNamePrefix: aws.String(groupName[:len(groupName)-1]),
},
func(page *cloudwatchlogs.DescribeLogGroupsOutput, lastPage bool) bool {
for _, logGroup := range page.LogGroups {
groupName := aws.StringValue(logGroup.LogGroupName)
if _, ok := manager.groups[groupName]; !ok {
manager.addNewGroup(groupName, &prospector)
}
}
return true
},
)
if err != nil {
logp.Warn("manager: Failed to describe log group %s [%s]", groupName, err.Error())
}
}
}
}
func (manager *GroupManager) addNewGroup(name string, prospector *Prospector) {
group := NewGroup(name, prospector, manager.Params)
manager.groups[group.Name] = group
go group.Monitor()
}
func (manager *GroupManager) Monitor() {
ticker := time.NewTicker(manager.Params.Config.GroupRefreshFrequency)
defer ticker.Stop()
reportTicker := time.NewTicker(manager.Params.Config.ReportFrequency)
defer reportTicker.Stop()
for {
select {
case <-ticker.C:
manager.refreshGroups()
case <-reportTicker.C:
manager.report()
}
}
}
func (manager *GroupManager) report() {
logp.Info("report[manager] number of prospectors: %d, number of groups: %d", len(manager.Params.Config.Prospectors), len(manager.groups))
}