/
analytics_events.go
148 lines (122 loc) · 3.84 KB
/
analytics_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Copyright 2020 New Relic Corporation. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package internal
import (
"bytes"
"container/heap"
"github.com/newrelic/go-agent/internal/jsonx"
)
type analyticsEvent struct {
priority Priority
jsonWriter
}
type analyticsEventHeap []analyticsEvent
type analyticsEvents struct {
numSeen int
events analyticsEventHeap
failedHarvests int
}
func (events *analyticsEvents) NumSeen() float64 { return float64(events.numSeen) }
func (events *analyticsEvents) NumSaved() float64 { return float64(len(events.events)) }
func (h analyticsEventHeap) Len() int { return len(h) }
func (h analyticsEventHeap) Less(i, j int) bool { return h[i].priority.isLowerPriority(h[j].priority) }
func (h analyticsEventHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
// Push and Pop are unused: only heap.Init and heap.Fix are used.
func (h analyticsEventHeap) Push(x interface{}) {}
func (h analyticsEventHeap) Pop() interface{} { return nil }
func newAnalyticsEvents(max int) *analyticsEvents {
return &analyticsEvents{
numSeen: 0,
events: make(analyticsEventHeap, 0, max),
failedHarvests: 0,
}
}
func (events *analyticsEvents) capacity() int {
return cap(events.events)
}
func (events *analyticsEvents) addEvent(e analyticsEvent) {
events.numSeen++
if events.capacity() == 0 {
// Configurable event harvest limits may be zero.
return
}
if len(events.events) < cap(events.events) {
events.events = append(events.events, e)
if len(events.events) == cap(events.events) {
// Delay heap initialization so that we can have
// deterministic ordering for integration tests (the max
// is not being reached).
heap.Init(events.events)
}
return
}
if e.priority.isLowerPriority((events.events)[0].priority) {
return
}
events.events[0] = e
heap.Fix(events.events, 0)
}
func (events *analyticsEvents) mergeFailed(other *analyticsEvents) {
fails := other.failedHarvests + 1
if fails >= failedEventsAttemptsLimit {
return
}
events.failedHarvests = fails
events.Merge(other)
}
func (events *analyticsEvents) Merge(other *analyticsEvents) {
allSeen := events.numSeen + other.numSeen
for _, e := range other.events {
events.addEvent(e)
}
events.numSeen = allSeen
}
func (events *analyticsEvents) CollectorJSON(agentRunID string) ([]byte, error) {
if 0 == len(events.events) {
return nil, nil
}
estimate := 256 * len(events.events)
buf := bytes.NewBuffer(make([]byte, 0, estimate))
buf.WriteByte('[')
jsonx.AppendString(buf, agentRunID)
buf.WriteByte(',')
buf.WriteByte('{')
buf.WriteString(`"reservoir_size":`)
jsonx.AppendUint(buf, uint64(cap(events.events)))
buf.WriteByte(',')
buf.WriteString(`"events_seen":`)
jsonx.AppendUint(buf, uint64(events.numSeen))
buf.WriteByte('}')
buf.WriteByte(',')
buf.WriteByte('[')
for i, e := range events.events {
if i > 0 {
buf.WriteByte(',')
}
e.WriteJSON(buf)
}
buf.WriteByte(']')
buf.WriteByte(']')
return buf.Bytes(), nil
}
// split splits the events into two. NOTE! The two event pools are not valid
// priority queues, and should only be used to create JSON, not for adding any
// events.
func (events *analyticsEvents) split() (*analyticsEvents, *analyticsEvents) {
// numSeen is conserved: e1.numSeen + e2.numSeen == events.numSeen.
e1 := &analyticsEvents{
numSeen: len(events.events) / 2,
events: make([]analyticsEvent, len(events.events)/2),
failedHarvests: events.failedHarvests,
}
e2 := &analyticsEvents{
numSeen: events.numSeen - e1.numSeen,
events: make([]analyticsEvent, len(events.events)-len(e1.events)),
failedHarvests: events.failedHarvests,
}
// Note that slicing is not used to ensure that length == capacity for
// e1.events and e2.events.
copy(e1.events, events.events)
copy(e2.events, events.events[len(events.events)/2:])
return e1, e2
}