-
Notifications
You must be signed in to change notification settings - Fork 779
/
HttpZipkinSpanReporter.java
175 lines (158 loc) · 5.45 KB
/
HttpZipkinSpanReporter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package org.springframework.cloud.sleuth.zipkin;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.logging.Log;
import org.springframework.cloud.sleuth.metric.SpanMetricReporter;
import zipkin.Codec;
import zipkin.Span;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* Submits spans using Zipkin's {@code POST /spans} endpoint.
*
* @author Adrian Cole
* @since 1.0.0
*/
public final class HttpZipkinSpanReporter
implements ZipkinSpanReporter, Flushable, Closeable {
private static final Log log = org.apache.commons.logging.LogFactory
.getLog(HttpZipkinSpanReporter.class);
private static final Charset UTF_8 = Charset.forName("UTF-8");
private final String url;
private final BlockingQueue<Span> pending = new LinkedBlockingQueue<>(1000);
private final Flusher flusher; // Nullable for testing
private final boolean compressionEnabled;
private final SpanMetricReporter spanMetricReporter;
/**
* @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/
* @param flushInterval in seconds. 0 implies spans are {@link #flush() flushed} externally.
* @param compressionEnabled compress spans using gzip before posting to the zipkin server.
* @param spanMetricReporter service to count number of accepted / dropped spans
*/
public HttpZipkinSpanReporter(String baseUrl, int flushInterval, boolean compressionEnabled,
SpanMetricReporter spanMetricReporter) {
this.url = baseUrl + (baseUrl.endsWith("/") ? "" : "/") + "api/v1/spans";
this.flusher = flushInterval > 0 ? new Flusher(this, flushInterval) : null;
this.compressionEnabled = compressionEnabled;
this.spanMetricReporter = spanMetricReporter;
}
/**
* Queues the span for collection, or drops it if the queue is full.
*
* @param span Span, should not be <code>null</code>.
*/
@Override
public void report(Span span) {
this.spanMetricReporter.incrementAcceptedSpans(1);
if (!this.pending.offer(span)) {
this.spanMetricReporter.incrementDroppedSpans(1);
}
}
/**
* Calling this will flush any pending spans to the http transport on the current thread.
*/
@Override
public void flush() {
if (this.pending.isEmpty())
return;
List<Span> drained = new ArrayList<>(this.pending.size());
this.pending.drainTo(drained);
if (drained.isEmpty())
return;
// json-encode the spans for transport
byte[] json = Codec.JSON.writeSpans(drained);
// NOTE: https://github.com/openzipkin/zipkin-java/issues/66 will throw instead of return null.
if (json == null) {
if (log.isDebugEnabled()) {
log.debug("failed to encode spans, dropping them: " + drained);
}
this.spanMetricReporter.incrementDroppedSpans(drained.size());
return;
}
// Send the json to the zipkin endpoint
try {
postSpans(json);
}
catch (IOException e) {
if (log.isDebugEnabled()) { // don't pollute logs unless debug is on.
// TODO: logger test
log.debug(
"error POSTing spans to " + this.url + ": as json: " + new String(json,
UTF_8), e);
}
this.spanMetricReporter.incrementDroppedSpans(drained.size());
}
}
/**
* Calls flush on a fixed interval
*/
static final class Flusher implements Runnable {
final Flushable flushable;
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Flusher(Flushable flushable, int flushInterval) {
this.flushable = flushable;
this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS);
}
@Override
public void run() {
try {
this.flushable.flush();
}
catch (IOException ignored) {
}
}
}
void postSpans(byte[] json) throws IOException {
// intentionally not closing the connection, so as to use keep-alives
HttpURLConnection connection = (HttpURLConnection) new URL(this.url).openConnection();
connection.setRequestMethod("POST");
connection.addRequestProperty("Content-Type", "application/json");
if (this.compressionEnabled) {
connection.addRequestProperty("Content-Encoding", "gzip");
ByteArrayOutputStream gzipped = new ByteArrayOutputStream();
try (GZIPOutputStream compressor = new GZIPOutputStream(gzipped)) {
compressor.write(json);
}
json = gzipped.toByteArray();
}
connection.setDoOutput(true);
connection.setFixedLengthStreamingMode(json.length);
connection.getOutputStream().write(json);
try (InputStream in = connection.getInputStream()) {
while (in.read() != -1); // skip
}
catch (IOException e) {
try (InputStream err = connection.getErrorStream()) {
if (err != null) { // possible, if the connection was dropped
while (err.read() != -1); // skip
}
}
throw e;
}
}
/**
* Requests a cease of delivery. There will be at most one in-flight request processing after this
* call returns.
*/
@Override
public void close() {
if (this.flusher != null)
this.flusher.scheduler.shutdown();
// throw any outstanding spans on the floor
int dropped = this.pending.drainTo(new LinkedList<>());
this.spanMetricReporter.incrementDroppedSpans(dropped);
}
}