This repository has been archived by the owner on Oct 23, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 183
/
tracker.go
174 lines (148 loc) · 5.15 KB
/
tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package tracetracker
import (
"container/list"
"context"
"sync"
"sync/atomic"
"time"
"github.com/signalfx/golib/datapoint"
"github.com/signalfx/golib/sfxclient"
"github.com/signalfx/golib/trace"
log "github.com/sirupsen/logrus"
)
const spanCorrelationMetricName = "sf.int.service.heartbeat"
// ActiveServiceTracker keeps track of which services are seen in the trace
// spans passed through ProcessSpans. It supports expiry of service names if
// they are not seen for a certain amount of time.
type ActiveServiceTracker struct {
sync.Mutex
// How long to keep sending metrics for a particular service name after it
// is last seen
timeout time.Duration
// A linked list of serviceNames sorted by time last seen
serviceNameByTime *list.List
// Which service names are active currently. The value is an entry in the
// serviceNameByTime linked list so that it can be quickly accessed and
// moved to the back of the list.
serviceNamesActive map[string]*list.Element
// Datapoints don't change over time for a given service, so make them once
// and stick them in here.
dpCache map[string]*datapoint.Datapoint
// A callback that gets called with the correlation datapoint when a
// service is first seen
newServiceCallback func(dp *datapoint.Datapoint)
timeNow func() time.Time
// Internal metrics
activeServiceCount int64
purgedServiceCount int64
spansProcessed int64
}
type serviceStatus struct {
LastSeen time.Time
ServiceName string
}
// New creates a new initialized service tracker
func New(timeout time.Duration, newServiceCallback func(dp *datapoint.Datapoint)) *ActiveServiceTracker {
return &ActiveServiceTracker{
timeout: timeout,
serviceNameByTime: list.New(),
serviceNamesActive: make(map[string]*list.Element),
dpCache: make(map[string]*datapoint.Datapoint),
newServiceCallback: newServiceCallback,
timeNow: time.Now,
}
}
// CorrelationDatapoints returns a list of host correlation datapoints based on
// the spans sent through ProcessSpans
func (a *ActiveServiceTracker) CorrelationDatapoints() []*datapoint.Datapoint {
a.Lock()
defer a.Unlock()
// Get rid of everything that is old
a.purgeOldServices()
out := make([]*datapoint.Datapoint, 0, len(a.dpCache))
for _, dp := range a.dpCache {
out = append(out, dp)
}
return out
}
// AddSpans accepts a list of trace spans and uses them to update the
// current list of active services. This is thread-safe.
func (a *ActiveServiceTracker) AddSpans(ctx context.Context, spans []*trace.Span) error {
// Take current time once since this is a system call.
now := a.timeNow()
a.Lock()
defer a.Unlock()
for i := range spans {
a.processSpan(spans[i], now)
}
// Protected by lock above
a.spansProcessed += int64(len(spans))
return nil
}
func (a *ActiveServiceTracker) processSpan(span *trace.Span, now time.Time) {
// Can't do anything if the spans don't have a local service name
if span.LocalEndpoint == nil || span.LocalEndpoint.ServiceName == nil {
return
}
service := *span.LocalEndpoint.ServiceName
a.ensureServiceActive(service, now)
}
func (a *ActiveServiceTracker) ensureServiceActive(service string, now time.Time) {
if timeElm, ok := a.serviceNamesActive[service]; ok {
timeElm.Value.(*serviceStatus).LastSeen = now
a.serviceNameByTime.MoveToFront(timeElm)
} else {
elm := a.serviceNameByTime.PushFront(&serviceStatus{
LastSeen: now,
ServiceName: service,
})
a.serviceNamesActive[service] = elm
dp := dpForService(service)
a.dpCache[service] = dp
atomic.AddInt64(&a.activeServiceCount, 1)
if a.newServiceCallback != nil {
a.newServiceCallback(dp)
}
log.WithFields(log.Fields{
"service": service,
}).Debug("Tracking service name from trace span")
}
}
// Walks the serviceNameByTime list backwards and deletes until it finds an
// element that is not timed out.
func (a *ActiveServiceTracker) purgeOldServices() {
now := a.timeNow()
for {
elm := a.serviceNameByTime.Back()
if elm == nil {
break
}
status := elm.Value.(*serviceStatus)
// If this one isn't timed out, nothing else in the list is either.
if now.Sub(status.LastSeen) < a.timeout {
break
}
a.serviceNameByTime.Remove(elm)
delete(a.serviceNamesActive, status.ServiceName)
delete(a.dpCache, status.ServiceName)
atomic.AddInt64(&a.activeServiceCount, -1)
atomic.AddInt64(&a.purgedServiceCount, 1)
log.WithFields(log.Fields{
"serviceName": status.ServiceName,
}).Debug("No longer tracking service name from trace span")
}
}
// InternalMetrics returns datapoint describing the status of the tracker
func (a *ActiveServiceTracker) InternalMetrics() []*datapoint.Datapoint {
return []*datapoint.Datapoint{
sfxclient.Gauge("sfxagent.tracing_active_services", nil, atomic.LoadInt64(&a.activeServiceCount)),
sfxclient.CumulativeP("sfxagent.tracing_purged_services", nil, &a.purgedServiceCount),
sfxclient.CumulativeP("sfxagent.tracing_spans_processed", nil, &a.spansProcessed),
}
}
func dpForService(service string) *datapoint.Datapoint {
return sfxclient.Gauge(spanCorrelationMetricName, map[string]string{
"sf_hasService": service,
// Host dimensions get added in the writer just like datapoints
}, 0)
}