forked from paychex/prometheus-emcecs-exporter
/
ecsclient.go
305 lines (261 loc) · 11.3 KB
/
ecsclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
package ecsclient
import (
"crypto/tls"
"encoding/xml"
"fmt"
"io/ioutil"
"net"
"net/http"
"time"
"strings"
"github.com/prometheus/common/log"
"github.com/tidwall/gjson"
)
// EcsClient is used to persist connection to an ECS Cluster
type EcsClient struct {
UserName string
Password string
AuthToken string
ClusterAddress string
nodeList []string
EcsVersion string
ErrorCount float64
}
type NodeState struct {
TotalDTnum float64 `xml:"entry>total_dt_num"`
UnreadyDTnum float64 `xml:"entry>unready_dt_num"`
UnknownDTnum float64 `xml:"entry>unknown_dt_num"`
NodeIP string
ActiveConnections float64 `xml:"entry>load_factor"`
}
type dataNodes struct {
DataNodes []string
VersionInfo string
}
type pingList struct {
Xmlns string `xml:"xmlns,attr"`
Name []string `xml:"PingItem>Name"`
Value float64 `xml:"PingItem>Value"`
Status []string `xml:"PingItem>Status"`
Text []string `xml:"PingItem>Text"`
}
// RetrieveAuthToken and store as part of the client struct for future use
func (c *EcsClient) RetrieveAuthToken() (authToken string, err error) {
reqLoginURL := "https://" + c.ClusterAddress + ":4443/login"
log.Debugf("Using the following info to log into the ECS, username: %v, URL: %v", c.UserName, c.ClusterAddress)
// using
client := &http.Client{Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
Timeout: 15 * time.Second}
req, _ := http.NewRequest("GET", reqLoginURL, nil)
req.SetBasicAuth(c.UserName, c.Password)
resp, err := client.Do(req)
if err != nil {
log.Infof("\n - Error connecting to ECS: %s", err)
return "", err
}
defer resp.Body.Close()
log.Debugf("Response Status Code: %v", resp.StatusCode)
log.Debugf("Response Status: %v", resp.Status)
log.Debugf("Response Body: %v", resp.Body)
log.Debugf("AuthToken is: %v", resp.Header.Get("X-Sds-Auth-Token"))
if resp.StatusCode != 200 {
// we didnt get a good response code, so bailing out
log.Infoln("Got a non 200 response code: ", resp.StatusCode)
log.Debugln("response was: ", resp)
c.ErrorCount++
return "", fmt.Errorf("received non 200 error code: %v. the response was: %v", resp.Status, resp)
}
c.AuthToken = resp.Header.Get("X-Sds-Auth-Token")
return resp.Header.Get("X-Sds-Auth-Token"), nil
}
// Logout closes out the connection to ECS when we are done.
// if we dont log out we use up all of the available login tokens
func (c *EcsClient) Logout() error {
// there’s a maximum number of login tokens (100) per user
// need to log out to throw away the token since we arent set up for caching...
reqLogoutURL := "https://" + c.ClusterAddress + ":4443/logout"
log.Infof("Logging out of %s", c.ClusterAddress)
// we dont need the reply data, so just throw it away
_, err := c.CallECSAPI(reqLogoutURL)
if err != nil {
log.Infof("Error logging out of ECS: %s", c.ClusterAddress)
return err
}
c.AuthToken = ""
return nil
}
func (c *EcsClient) CallECSAPI(request string) (response string, err error) {
client := &http.Client{Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
Timeout: 45 * time.Second}
req, _ := http.NewRequest("GET", request, nil)
req.Header.Add("Accept", "application/json")
req.Header.Add("Content-Type", "application/json")
req.Header.Add("X-SDS-AUTH-TOKEN", c.AuthToken)
resp, err := client.Do(req)
if err != nil {
log.Infof("\n - Error connecting to ECS: %s", err)
return "", fmt.Errorf("error connecting to : %v. the error was: %v", request, err)
}
defer resp.Body.Close()
respText, err := ioutil.ReadAll(resp.Body)
s := string(respText)
if resp.StatusCode != 200 {
log.Infof("Got error code: %v when accessing URL: %s\n Body text is: %s\n", resp.StatusCode, request, respText)
return "", fmt.Errorf("error connecting to : %v. the error was: %v", request, resp.StatusCode)
}
return s, nil
}
// RetrieveReplState will return a struct containing the state of the ECS cluster on query
func (c *EcsClient) RetrieveReplState() (EcsReplState, error) {
// this will only pull the current stats, which is what we want for this application
// reqStatusURL := "https://" + c.ClusterAddress + ":4443/dashboard/zones/localzone?dataType=current"
reqStatusURL := "https://" + c.ClusterAddress + ":4443/dashboard/zones/localzone/replicationgroups"
s, err := c.CallECSAPI(reqStatusURL)
if err != nil {
return EcsReplState{}, err
}
return EcsReplState{
RgName: gjson.Get(s, "name").String(),
ReplicationIngressTraffic: gjson.Get(s, "replicationIngressTraffic").Float(),
ReplicationEgressTraffic: gjson.Get(s, "replicationEgressTraffic").Float(),
ChunksRepoPendingReplicationTotalSize: gjson.Get(s, "chunksRepoPendingReplicationTotalSize").Float(),
ChunksJournalPendingReplicationTotalSize: gjson.Get(s, "chunksJournalPendingReplicationTotalSize").Float(),
ChunksPendingXorTotalSize: gjson.Get(s, "chunksPendingXorTotalSize").Float(),
ReplicationRpoTimestamp: gjson.Get(s, "replicationRpoTimestamp").Float(),
}, nil
}
// RetrieveClusterState will return a struct containing the state of the ECS cluster on query
func (c *EcsClient) RetrieveClusterState() (EcsClusterState, error) {
// this will only pull the current stats, which is what we want for this application
// reqStatusURL := "https://" + c.ClusterAddress + ":4443/dashboard/zones/localzone?dataType=current"
reqStatusURL := "https://" + c.ClusterAddress + ":4443/dashboard/zones/localzone"
s, err := c.CallECSAPI(reqStatusURL)
if err != nil {
return EcsClusterState{}, err
}
fields := EcsClusterState{
VdcName: gjson.Get(s, "name").String(),
NumBadDisks: gjson.Get(s, "numBadDisks").Float(),
NumBadNodes: gjson.Get(s, "numBadNodes").Float(),
NumGoodNodes: gjson.Get(s, "numGoodNodes").Float(),
NumGoodDisks: gjson.Get(s, "numGoodDisks").Float(),
AlertsNumUnackCritical: gjson.Get(s, "alertsNumUnackCritical.0.Count").Float(),
AlertsNumUnackError: gjson.Get(s, "alertsNumUnackError.0.Count").Float(),
AlertsNumUnackInfo: gjson.Get(s, "alertsNumUnackInfo.0.Count").Float(),
AlertsNumUnackWarning: gjson.Get(s, "alertsNumUnackWarning.0.Count").Float(),
DiskSpaceFree: gjson.Get(s, "diskSpaceFreeCurrent.0.Space").Float(),
DiskSpaceTotal: gjson.Get(s, "diskSpaceTotalCurrent.0.Space").Float(),
DiskSpaceAllocated: gjson.Get(s, "diskSpaceAllocatedCurrent.0.Space").Float(),
TransactionErrorsTotal: gjson.Get(s, "transactionErrorsCurrent.errorSuccessTotals.0.errorTotal").Float(),
TransactionSuccessTotal: gjson.Get(s, "transactionErrorsCurrent.errorSuccessTotals.0.successTotal").Float(),
TransactionReadLatency: gjson.Get(s, "transactionReadLatencyCurrent.0.Latency").Float(), //validated
TransactionWriteLatency: gjson.Get(s, "transactionWriteLatencyCurrent.0.Latency").Float(), //validated
TransactionReadTransactionsPerSecond: gjson.Get(s, "transactionReadTransactionsPerSecCurrent.0.TPS").Float(),
TransactionWriteTransactionsPerSecond: gjson.Get(s, "transactionWriteTransactionsPerSecCurrent.0.TPS").Float(),
TransactionWriteBandwidthCurrent: gjson.Get(s, "transactionWriteBandwidthCurrent.0.Bandwidth").Float(),
TransactionReadBandwidthCurrent: gjson.Get(s, "transactionReadBandwidthCurrent.0.Bandwidth").Float(),
}
return fields, nil
}
// RetrieveNodeCount returns number of nodes found in the cluster
// since the "nodeList" is private
func (c *EcsClient) RetrieveNodeCount() int {
return len(c.nodeList)
}
// RetrieveNodeInfo will retrieve a list of individual nodes in the cluster
// this is used to pull DTstats later on
func (c *EcsClient) RetrieveNodeInfo() {
parsedOutput := &dataNodes{}
// ECS gives you a way to get the node IPs, BUT it wont do it without a namespace
// Interestingly you can give it ANY namespace, including ones that dont exist
reqStatusURL := "https://" + strings.Replace(c.ClusterAddress, "-mgt", "-data", -1) + ":9021/?endpoint"
log.Debug("node ip url is: " + reqStatusURL)
client := &http.Client{}
req, _ := http.NewRequest("GET", reqStatusURL, nil)
req.Header.Set("x-emc-namespace", "nodeips")
resp, err := client.Do(req)
if err != nil {
log.Info("Error connecting to ECS Cluster at: " + reqStatusURL)
c.nodeList = nil
c.EcsVersion = ""
c.ErrorCount++
return
}
defer resp.Body.Close()
bytes, _ := ioutil.ReadAll(resp.Body)
log.Debugf("Output from node poll is %s", bytes)
xml.Unmarshal(bytes, parsedOutput)
c.nodeList = parsedOutput.DataNodes
c.EcsVersion = parsedOutput.VersionInfo
}
func (c *EcsClient) retrieveNodeState(node string, ch chan<- NodeState) {
parsedOutput := &NodeState{}
parsedPing := &pingList{}
parsedOutput.NodeIP = node
log.Debug("this is the node I am querying ", strings.Replace(node,"-mgt", "-data", -1))
reqStatusURL := "http://" + strings.Replace(node, "-mgt", "-data", -1) + ":9101/stats/dt/DTInitStat"
log.Debug("URL we are checking is ", reqStatusURL)
resp, err := http.Get(reqStatusURL)
if err != nil {
log.Info("Error connecting to ECS Cluster at: " + reqStatusURL)
c.ErrorCount++
ch <- *parsedOutput
return
}
defer resp.Body.Close()
bytes, _ := ioutil.ReadAll(resp.Body)
xml.Unmarshal(bytes, parsedOutput)
// ECS supplies the current number of active connections, but its per node
// and its part of the s3 retrieval api (ie port 9020) so lets get this and pass it along as well
// and its in yet another format ... or at least xml layed out differently, so more processing is needed
reqConnectionsURL := strings.Replace(("https://" + node + ":9021/?ping"), "-mgt", "-data", -1)
log.Debug("URL we are checking for connections is ", reqConnectionsURL)
respConn, err := http.Get(reqConnectionsURL)
if err != nil {
log.Info("Error connecting to ECS Cluster at: " + reqStatusURL)
c.ErrorCount++
ch <- *parsedOutput
return
}
defer respConn.Body.Close()
bytesConnection, _ := ioutil.ReadAll(respConn.Body)
xml.Unmarshal(bytesConnection, parsedPing)
parsedOutput.ActiveConnections = parsedPing.Value
ch <- *parsedOutput
}
// RetrieveNodeStateParallel pulls all the dtstate from nodes in the cluster all at once
func (c *EcsClient) RetrieveNodeStateParallel() []NodeState {
var NodeStates []NodeState
ch := make(chan NodeState)
for _, node := range c.nodeList {
go c.retrieveNodeState(node, ch)
}
for range c.nodeList {
NodeStates = append(NodeStates, <-ch)
}
return NodeStates
}