/
MutableSpanMap.java
153 lines (134 loc) · 5.29 KB
/
MutableSpanMap.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package brave.internal.recorder;
import brave.Clock;
import brave.propagation.TraceContext;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import zipkin.Endpoint;
import zipkin.internal.V2SpanConverter;
import zipkin.reporter.Reporter;
/**
* Similar to Finagle's deadline span map, except this is GC pressure as opposed to timeout driven.
* This means there's no bookkeeping thread required in order to flush orphaned spans.
*
* <p>Spans are weakly referenced by their owning context. When the keys are collected, they are
* transferred to a queue, waiting to be reported. A call to modify any span will implicitly flush
* orphans to Zipkin. Spans in this state will have a "brave.flush" annotation added to them.
*
* <p>The internal implementation is derived from WeakConcurrentMap by Rafael Winterhalter. See
* https://github.com/raphw/weak-lock-free/blob/master/src/main/java/com/blogspot/mydailyjava/weaklockfree/WeakConcurrentMap.java
*/
final class MutableSpanMap extends ReferenceQueue<TraceContext> {
static final Logger logger = Logger.getLogger(MutableSpanMap.class.getName());
// Eventhough we only put by RealKey, we allow get and remove by LookupKey
final ConcurrentMap<Object, MutableSpan> delegate = new ConcurrentHashMap<>(64);
final Endpoint localEndpoint;
final Clock clock;
final Reporter<zipkin.Span> reporter;
final AtomicBoolean noop;
MutableSpanMap(
Endpoint localEndpoint,
Clock clock,
Reporter<zipkin.Span> reporter,
AtomicBoolean noop
) {
this.localEndpoint = localEndpoint;
this.clock = clock;
this.reporter = reporter;
this.noop = noop;
}
@Nullable MutableSpan get(TraceContext context) {
if (context == null) throw new NullPointerException("context == null");
reportOrphanedSpans();
return delegate.get(new LookupKey(context));
}
MutableSpan getOrCreate(TraceContext context) {
MutableSpan result = get(context);
if (result != null) return result;
MutableSpan newSpan = new MutableSpan(context, localEndpoint);
MutableSpan previousSpan = delegate.putIfAbsent(new RealKey(context, this), newSpan);
if (previousSpan != null) return previousSpan; // lost race
return newSpan;
}
@Nullable MutableSpan remove(TraceContext context) {
if (context == null) throw new NullPointerException("context == null");
MutableSpan result = delegate.remove(new LookupKey(context));
reportOrphanedSpans(); // also clears the reference relating to the recent remove
return result;
}
/** Reports spans orphaned by garbage collection. */
void reportOrphanedSpans() {
Reference<? extends TraceContext> reference;
while ((reference = poll()) != null) {
TraceContext context = reference.get();
MutableSpan value = delegate.remove(reference);
if (value == null || noop.get()) continue;
try {
value.annotate(clock.currentTimeMicroseconds(), "brave.flush");
reporter.report(V2SpanConverter.toSpan(value.toSpan()));
} catch (RuntimeException e) {
// don't crash the caller if there was a problem reporting an unrelated span.
if (context != null && logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "error flushing " + context, e);
}
}
}
}
/**
* Real keys contain a reference to the real context associated with a span. This is a weak
* reference, so that we get notified on GC pressure.
*
* <p>Since {@linkplain TraceContext}'s hash code is final, it is used directly both here and in
* lookup keys.
*/
static final class RealKey extends WeakReference<TraceContext> {
final int hashCode;
RealKey(TraceContext context, ReferenceQueue<TraceContext> queue) {
super(context, queue);
hashCode = context.hashCode();
}
@Override public String toString() {
TraceContext context = get();
return context != null ? "WeakReference(" + context + ")" : "ClearedReference()";
}
@Override public int hashCode() {
return this.hashCode;
}
/** Resolves hash code collisions */
@Override public boolean equals(Object other) {
TraceContext thisContext = get(), thatContext = ((RealKey) other).get();
if (thisContext == null) {
return thatContext == null;
} else {
return thisContext.equals(thatContext);
}
}
}
/**
* Lookup keys are cheaper than real keys as reference tracking is not involved. We cannot use
* {@linkplain TraceContext} directly as a lookup key, as eventhough it has the same hash code as
* the real key, it would fail in equals comparison.
*/
static final class LookupKey {
final TraceContext context;
LookupKey(TraceContext context) {
this.context = context;
}
@Override public int hashCode() {
return context.hashCode();
}
/** Resolves hash code collisions */
@Override public boolean equals(Object other) {
return context.equals(((RealKey) other).get());
}
}
@Override public String toString() {
return "MutableSpanMap" + delegate.keySet();
}
}