-
Notifications
You must be signed in to change notification settings - Fork 10
/
clusterchecks.go
143 lines (122 loc) · 4.07 KB
/
clusterchecks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package providers
import (
"time"
"github.com/n9e/n9e-agentd/pkg/config"
"github.com/n9e/n9e-agentd/pkg/autodiscovery/integration"
"github.com/n9e/n9e-agentd/pkg/autodiscovery/providers/names"
"github.com/n9e/n9e-agentd/staging/datadog-agent/pkg/clusteragent/clusterchecks/types"
"github.com/n9e/n9e-agentd/staging/datadog-agent/pkg/util"
"github.com/n9e/n9e-agentd/staging/datadog-agent/pkg/util/clusteragent"
"k8s.io/klog/v2"
)
const defaultGraceDuration = 60 * time.Second
// ClusterChecksConfigProvider implements the ConfigProvider interface
// for the cluster check feature.
type ClusterChecksConfigProvider struct {
dcaClient clusteragent.DCAClientInterface
graceDuration time.Duration
heartbeat time.Time
lastChange int64
identifier string
flushedConfigs bool
}
// NewClusterChecksConfigProvider returns a new ConfigProvider collecting
// cluster check configurations from the cluster-agent.
// Connectivity is not checked at this stage to allow for retries, Collect will do it.
func NewClusterChecksConfigProvider(cfg config.ConfigurationProviders) (ConfigProvider, error) {
c := &ClusterChecksConfigProvider{
graceDuration: defaultGraceDuration,
}
c.identifier = config.C.ClcRunnerId
if c.identifier == "" {
c.identifier, _ = util.GetHostname()
if config.C.CloudFoundry {
boshID := config.C.BoshID
if boshID == "" {
klog.Warning("configuration variable cloud_foundry is set to true, but bosh_id is empty, can't retrieve node name")
} else {
c.identifier = boshID
}
}
}
if cfg.GraceTimeSeconds > 0 {
c.graceDuration = time.Duration(cfg.GraceTimeSeconds) * time.Second
}
// Register in the cluster agent as soon as possible
c.IsUpToDate() //nolint:errcheck
return c, nil
}
func (c *ClusterChecksConfigProvider) initClient() error {
dcaClient, err := clusteragent.GetClusterAgentClient()
if err == nil {
c.dcaClient = dcaClient
}
return err
}
// String returns a string representation of the ClusterChecksConfigProvider
func (c *ClusterChecksConfigProvider) String() string {
return names.ClusterChecks
}
func (c *ClusterChecksConfigProvider) withinGracePeriod() bool {
return c.heartbeat.Add(c.graceDuration).After(time.Now())
}
// IsUpToDate queries the cluster-agent to update its status and
// query if new configurations are available
func (c *ClusterChecksConfigProvider) IsUpToDate() (bool, error) {
if c.dcaClient == nil {
err := c.initClient()
if err != nil {
return false, err
}
}
status := types.NodeStatus{
LastChange: c.lastChange,
}
reply, err := c.dcaClient.PostClusterCheckStatus(c.identifier, status)
if err != nil {
if c.withinGracePeriod() {
// Return true to keep the configs during the grace period
klog.V(5).Infof("Catching error during grace period: %s", err)
return true, nil
}
// Return false, the next Collect will flush the configs
return false, err
}
c.heartbeat = time.Now()
if reply.IsUpToDate {
klog.V(6).Infof("Up to date with change %d", c.lastChange)
} else {
klog.V(6).Infof("Not up to date with change %d", c.lastChange)
}
return reply.IsUpToDate, nil
}
// Collect retrieves configurations the cluster-agent dispatched to this agent
func (c *ClusterChecksConfigProvider) Collect() ([]integration.Config, error) {
if c.dcaClient == nil {
err := c.initClient()
if err != nil {
return nil, err
}
}
reply, err := c.dcaClient.GetClusterCheckConfigs(c.identifier)
if err != nil {
if !c.flushedConfigs {
// On first error after grace period, mask the error once
// to delete the configurations and de-schedule the checks
c.flushedConfigs = true
return nil, nil
}
return nil, err
}
c.flushedConfigs = false
c.lastChange = reply.LastChange
klog.V(6).Infof("Storing last change %d", c.lastChange)
return reply.Configs, nil
}
func init() {
RegisterProvider("clusterchecks", NewClusterChecksConfigProvider)
}