forked from openzipkin/brave
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ZipkinSpanCollector.java
170 lines (147 loc) · 6.15 KB
/
ZipkinSpanCollector.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package com.github.kristofa.brave.zipkin;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.Validate;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.kristofa.brave.ClientTracer;
import com.github.kristofa.brave.ServerTracer;
import com.github.kristofa.brave.SpanCollector;
import com.twitter.zipkin.gen.AnnotationType;
import com.twitter.zipkin.gen.BinaryAnnotation;
import com.twitter.zipkin.gen.Span;
/**
* Sends spans to Zipkin collector or Scribe.
* <p/>
* Typically the {@link ZipkinSpanCollector} should be a singleton in your application that can be used by both
* {@link ClientTracer} as {@link ServerTracer}.
* <p/>
* This SpanCollector is implemented so it puts spans on a queue which are processed by a separate thread. In this way we are
* submitting spans asynchronously and we should have minimal overhead on application performance.
* <p/>
* At this moment the number of processing threads is fixed and set to 1.
*
* @author kristof
*/
public class ZipkinSpanCollector implements SpanCollector {
private static final String UTF_8 = "UTF-8";
private static final Logger LOGGER = LoggerFactory.getLogger(ZipkinSpanCollector.class);
private final BlockingQueue<Span> spanQueue;
private final ExecutorService executorService;
private final List<SpanProcessingThread> spanProcessingThreads = new ArrayList<SpanProcessingThread>();
private final List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
private final Set<BinaryAnnotation> defaultAnnotations = new HashSet<BinaryAnnotation>();
/**
* Create a new instance with default queue size (= {@link ZipkinSpanCollectorParams#DEFAULT_QUEUE_SIZE}) and default
* batch size (= {@link ZipkinSpanCollectorParams#DEFAULT_BATCH_SIZE}).
*
* @param zipkinCollectorHost Host for zipkin collector.
* @param zipkinCollectorPort Port for zipkin collector.
*/
public ZipkinSpanCollector(final String zipkinCollectorHost, final int zipkinCollectorPort) {
this(zipkinCollectorHost, zipkinCollectorPort, new ZipkinSpanCollectorParams());
}
/**
* Create a new instance.
*
* @param zipkinCollectorHost Host for zipkin collector.
* @param zipkinCollectorPort Port for zipkin collector.
* @param params Zipkin Span Collector parameters.
*/
public ZipkinSpanCollector(final String zipkinCollectorHost, final int zipkinCollectorPort,
final ZipkinSpanCollectorParams params) {
Validate.notEmpty(zipkinCollectorHost);
Validate.notNull(params);
spanQueue = new ArrayBlockingQueue<Span>(params.getQueueSize());
executorService = Executors.newFixedThreadPool(params.getNrOfThreads());
for (int i = 1; i <= params.getNrOfThreads(); i++) {
//Creating a client provider for every spanProcessingThread.
ZipkinCollectorClientProvider clientProvider =
new ZipkinCollectorClientProvider(zipkinCollectorHost, zipkinCollectorPort, params.getSocketTimeout());
try {
clientProvider.setup();
} catch (final TException e) {
if (params.failOnSetup()) {
throw new IllegalStateException(e);
} else {
LOGGER.warn("Connection could not be established during setup.", e);
}
}
final SpanProcessingThread spanProcessingThread =
new SpanProcessingThread(spanQueue, clientProvider, params.getBatchSize());
spanProcessingThreads.add(spanProcessingThread);
futures.add(executorService.submit(spanProcessingThread));
}
}
/**
* {@inheritDoc}
*/
@Override
public void collect(final Span span) {
final long start = System.currentTimeMillis();
if (!defaultAnnotations.isEmpty()) {
for (final BinaryAnnotation ba : defaultAnnotations) {
span.addToBinary_annotations(ba);
}
}
final boolean offer = spanQueue.offer(span);
if (!offer) {
LOGGER.warn("Queue rejected Span, span not submitted: {}", span);
} else {
final long end = System.currentTimeMillis();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Adding span to queue took " + (end - start) + "ms.");
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void addDefaultAnnotation(final String key, final String value) {
Validate.notEmpty(key);
Validate.notNull(value);
try {
final ByteBuffer bb = ByteBuffer.wrap(value.getBytes(UTF_8));
final BinaryAnnotation binaryAnnotation = new BinaryAnnotation();
binaryAnnotation.setKey(key);
binaryAnnotation.setValue(bb);
binaryAnnotation.setAnnotation_type(AnnotationType.STRING);
defaultAnnotations.add(binaryAnnotation);
} catch (final UnsupportedEncodingException e) {
throw new IllegalStateException(e);
}
}
/**
* {@inheritDoc}
*/
@Override
@PreDestroy
public void close() {
LOGGER.info("Stopping SpanProcessingThread.");
for (final SpanProcessingThread thread : spanProcessingThreads) {
thread.stop();
}
for (final Future<Integer> future : futures) {
try {
final Integer spansProcessed = future.get();
LOGGER.info("SpanProcessingThread processed {} spans.", spansProcessed);
} catch (final Exception e) {
LOGGER.warn("Exception when getting result of SpanProcessingThread.", e);
}
}
executorService.shutdown();
LOGGER.info("ZipkinSpanCollector closed.");
}
}