This repository has been archived by the owner on Sep 8, 2023. It is now read-only.
/
telemetry.go
358 lines (317 loc) · 10.5 KB
/
telemetry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
// Copyright 2018-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and limitations under the License.
package telemetry
import (
"os"
"sync/atomic"
"time"
"unsafe"
"github.com/shogo82148/aws-xray-yadaemon/pkg/conn"
"github.com/shogo82148/aws-xray-yadaemon/pkg/util/timer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/xray"
log "github.com/cihub/seelog"
)
const dataCutoffIntervalSecs = 60
const bufferSize = 30
const requestSize = 10
// T is instance of Telemetry.
var T *Telemetry
// Telemetry is used to record X-Ray daemon health.
type Telemetry struct {
// Instance of XRay.
client conn.XRay
timer timer.Timer
// Amazon Resource Name (ARN) of the AWS resource running the daemon.
resourceARN string
// Instance id of the EC2 instance running X-Ray daemon.
instanceID string
// Host name of the EC2 instance running X-Ray daemon.
hostname string
// Self pointer.
currentRecord *xray.TelemetryRecord
// Timer channel.
timerChan <-chan time.Time
// Boolean channel, set to true when Quit channel is set to true.
Done chan bool
// Boolean channel, set to true when daemon is closed,
Quit chan bool
// Channel of TelemetryRecord used to send to X-Ray service.
recordChan chan *xray.TelemetryRecord
// When segment is received, postTelemetry is set to true,
// indicating send telemetry data for the received segment.
postTelemetry bool
}
// Init instantiates a new instance of Telemetry.
func Init(awsConfig *aws.Config, s *session.Session, resourceARN string, noMetadata bool) {
T = newT(awsConfig, s, resourceARN, noMetadata)
log.Debug("Telemetry initiated")
}
// EvaluateConnectionError processes error with respect to request failure status code.
func EvaluateConnectionError(err error) {
requestFailure, ok := err.(awserr.RequestFailure)
if ok {
statusCode := requestFailure.StatusCode()
if statusCode >= 500 && statusCode < 600 {
T.Connection5xx(1)
} else if statusCode >= 400 && statusCode < 500 {
T.Connection4xx(1)
} else {
T.ConnectionOther(1)
}
} else {
if conn.IsTimeoutError(err) {
T.ConnectionTimeout(1)
} else {
awsError, ok := err.(awserr.Error)
if ok {
if awsError.Code() == "RequestError" {
T.ConnectionUnknownHost(1)
}
} else {
T.ConnectionOther(1)
}
}
}
}
// GetTestTelemetry returns an empty telemetry record.
func GetTestTelemetry() *Telemetry {
return &Telemetry{
currentRecord: getEmptyTelemetryRecord(),
}
}
// SegmentReceived increments SegmentsReceivedCount for the Telemetry record.
func (t *Telemetry) SegmentReceived(count int64) {
atomic.AddInt64(t.currentRecord.SegmentsReceivedCount, count)
// Only send telemetry data when we receive any segment or else skip any telemetry data
t.postTelemetry = true
}
// SegmentSent increments SegmentsSentCount for the Telemetry record.
func (t *Telemetry) SegmentSent(count int64) {
atomic.AddInt64(t.currentRecord.SegmentsSentCount, count)
}
// SegmentSpillover increments SegmentsSpilloverCount for the Telemetry record.
func (t *Telemetry) SegmentSpillover(count int64) {
atomic.AddInt64(t.currentRecord.SegmentsSpilloverCount, count)
}
// SegmentRejected increments SegmentsRejectedCount for the Telemetry record.
func (t *Telemetry) SegmentRejected(count int64) {
atomic.AddInt64(t.currentRecord.SegmentsRejectedCount, count)
}
// ConnectionTimeout increments TimeoutCount for the Telemetry record.
func (t *Telemetry) ConnectionTimeout(count int64) {
atomic.AddInt64(t.currentRecord.BackendConnectionErrors.TimeoutCount, count)
}
// ConnectionRefusal increments ConnectionRefusedCount for the Telemetry record.
func (t *Telemetry) ConnectionRefusal(count int64) {
atomic.AddInt64(t.currentRecord.BackendConnectionErrors.ConnectionRefusedCount, count)
}
// Connection4xx increments HTTPCode4XXCount for the Telemetry record.
func (t *Telemetry) Connection4xx(count int64) {
atomic.AddInt64(t.currentRecord.BackendConnectionErrors.HTTPCode4XXCount, count)
}
// Connection5xx increments HTTPCode5XXCount count for the Telemetry record.
func (t *Telemetry) Connection5xx(count int64) {
atomic.AddInt64(t.currentRecord.BackendConnectionErrors.HTTPCode5XXCount, count)
}
// ConnectionUnknownHost increments unknown host BackendConnectionErrors count for the Telemetry record.
func (t *Telemetry) ConnectionUnknownHost(count int64) {
atomic.AddInt64(t.currentRecord.BackendConnectionErrors.UnknownHostCount, count)
}
// ConnectionOther increments other BackendConnectionErrors count for the Telemetry record.
func (t *Telemetry) ConnectionOther(count int64) {
atomic.AddInt64(t.currentRecord.BackendConnectionErrors.OtherCount, count)
}
func newT(awsConfig *aws.Config, s *session.Session, resourceARN string, noMetadata bool) *Telemetry {
timer := &timer.Client{}
hostname := ""
instanceID := ""
var metadataClient *ec2metadata.EC2Metadata
if !noMetadata {
metadataClient = ec2metadata.New(s)
}
hostnameEnv := os.Getenv("AWS_HOSTNAME")
if hostnameEnv != "" {
hostname = hostnameEnv
log.Debugf("Fetch hostname %v from environment variables", hostnameEnv)
} else if metadataClient != nil {
hn, err := metadataClient.GetMetadata("hostname")
if err != nil {
log.Debugf("Get hostname metadata failed: %s", err)
} else {
hostname = hn
log.Debugf("Using %v hostname for telemetry records", hostname)
}
} else {
log.Debug("No hostname set for telemetry records")
}
instanceIDEnv := os.Getenv("AWS_INSTANCE_ID")
if instanceIDEnv != "" {
instanceID = instanceIDEnv
log.Debugf("Fetch instance ID %v from environment variables", instanceIDEnv)
} else if metadataClient != nil {
instID, err := metadataClient.GetMetadata("instance-id")
if err != nil {
log.Errorf("Get instance id metadata failed: %s", err)
} else {
instanceID = instID
log.Debugf("Using %v Instance Id for Telemetry records", instanceID)
}
} else {
log.Debug("No Instance Id set for telemetry records")
}
record := getEmptyTelemetryRecord()
t := &Telemetry{
timer: timer,
resourceARN: resourceARN,
instanceID: instanceID,
hostname: hostname,
currentRecord: record,
timerChan: getDataCutoffDelay(timer),
Done: make(chan bool),
Quit: make(chan bool),
recordChan: make(chan *xray.TelemetryRecord, bufferSize),
postTelemetry: false,
}
telemetryClient := conn.NewXRay(awsConfig, s)
t.client = telemetryClient
go t.pushData()
return t
}
func getZeroInt64() *int64 {
var zero int64
zero = 0
return &zero
}
func getEmptyTelemetryRecord() *xray.TelemetryRecord {
return &xray.TelemetryRecord{
SegmentsReceivedCount: getZeroInt64(),
SegmentsRejectedCount: getZeroInt64(),
SegmentsSentCount: getZeroInt64(),
SegmentsSpilloverCount: getZeroInt64(),
BackendConnectionErrors: &xray.BackendConnectionErrors{
HTTPCode4XXCount: getZeroInt64(),
HTTPCode5XXCount: getZeroInt64(),
ConnectionRefusedCount: getZeroInt64(),
OtherCount: getZeroInt64(),
TimeoutCount: getZeroInt64(),
UnknownHostCount: getZeroInt64(),
},
}
}
func (t *Telemetry) pushData() {
for {
quit := false
select {
case <-t.Quit:
quit = true
break
case <-t.timerChan:
}
emptyRecord := getEmptyTelemetryRecord()
recordToReport := unsafe.Pointer(emptyRecord)
recordToPushPointer := unsafe.Pointer(t.currentRecord)
// Rotation Logic:
// Swap current record to record to report.
// Record to report is set to empty record which is set to current record
t.currentRecord = (*xray.TelemetryRecord)(atomic.SwapPointer(&recordToReport,
recordToPushPointer))
currentTime := time.Now()
record := (*xray.TelemetryRecord)(recordToReport)
record.Timestamp = ¤tTime
t.add(record)
t.sendAll()
if quit {
close(t.recordChan)
log.Debug("telemetry: done!")
t.Done <- true
break
} else {
t.timerChan = getDataCutoffDelay(t.timer)
}
}
}
func (t *Telemetry) add(record *xray.TelemetryRecord) {
// Only send telemetry data when we receive first segment or else do not send any telemetry data.
if t.postTelemetry {
select {
case t.recordChan <- record:
default:
select {
case <-t.recordChan:
log.Debug("Telemetry Buffers truncated")
t.add(record)
default:
log.Debug("Telemetry Buffers dequeued")
}
}
} else {
log.Debug("Skipped telemetry data as no segments found")
}
}
func (t *Telemetry) sendAll() {
records := t.collectAllRecords()
recordsNoSend, err := t.sendRecords(records)
if err != nil {
log.Debugf("Failed to send telemetry %v record(s). Re-queue records. %v", len(records), err)
// There might be possibility that new records might be archived during re-queue records.
// But as timer is set after records are send this will not happen
for _, record := range recordsNoSend {
t.add(record)
}
}
}
func (t *Telemetry) collectAllRecords() []*xray.TelemetryRecord {
records := make([]*xray.TelemetryRecord, bufferSize)
records = records[:0]
var record *xray.TelemetryRecord
done := false
for !done {
select {
case record = <-t.recordChan:
recordLen := len(records)
if recordLen < bufferSize {
records = append(records, record)
}
default:
done = true
}
}
return records
}
func (t *Telemetry) sendRecords(records []*xray.TelemetryRecord) ([]*xray.TelemetryRecord, error) {
if len(records) > 0 {
for i := 0; i < len(records); i = i + requestSize {
endIndex := len(records)
if endIndex > i+requestSize {
endIndex = i + requestSize
}
recordsToSend := records[i:endIndex]
input := xray.PutTelemetryRecordsInput{
EC2InstanceId: &t.instanceID,
Hostname: &t.hostname,
ResourceARN: &t.resourceARN,
TelemetryRecords: recordsToSend,
}
_, err := t.client.PutTelemetryRecords(&input)
if err != nil {
EvaluateConnectionError(err)
return records[i:], err
}
}
log.Debugf("Send %v telemetry record(s)", len(records))
}
return nil, nil
}
func getDataCutoffDelay(timer timer.Timer) <-chan time.Time {
return timer.After(time.Duration(time.Second * dataCutoffIntervalSecs))
}