-
Notifications
You must be signed in to change notification settings - Fork 68
/
AsyncReporter.java
195 lines (167 loc) · 6.66 KB
/
AsyncReporter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package zipkin2.reporter;
import java.io.Closeable;
import java.io.Flushable;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* As spans are reported, they are encoded and added to a pending queue. The task of sending spans
* happens on a separate thread which calls {@link #flush()}. By doing so, callers are protected
* from latency or exceptions possible when exporting spans out of process.
*
* <p>Spans are bundled into messages based on size in bytes or a timeout, whichever happens first.
*
* <p>The thread that sends flushes spans to the {@linkplain BytesMessageSender} does so in a synchronous loop.
* This means that even asynchronous transports will wait for an ack before sending a next message.
* We do this so that a surge of spans doesn't overrun memory or bandwidth via hundreds or
* thousands of in-flight messages. The downside of this is that reporting is limited in speed to
* what a single thread can clear. When a thread cannot clear the backlog, new spans are dropped.
*
* @param <S> type of the span, usually {@link zipkin2.Span}
*/
// This is effectively, but not explicitly final as it was not final in version 2.x.
public class AsyncReporter<S> extends Component implements Reporter<S>, Closeable, Flushable {
/** @deprecated Since 3.2, use {@link #create(BytesMessageSender)} */
@Deprecated public static AsyncReporter<zipkin2.Span> create(Sender sender) {
return create((BytesMessageSender) sender);
}
/**
* Builds a json reporter for <a href="https://zipkin.io/zipkin-api/#/">Zipkin V2</a>. If http,
* the endpoint of the sender is usually "http://zipkinhost:9411/api/v2/spans".
*
* <p>After a certain threshold, spans are drained and {@link BytesMessageSender#send(List) sent}
* to Zipkin collectors.
*
* @since 3.2
*/
public static AsyncReporter<zipkin2.Span> create(BytesMessageSender sender) {
return new Builder(sender).build();
}
/** @deprecated Since 3.2, use {@link #builder(BytesMessageSender)} */
@Deprecated public static Builder builder(Sender sender) {
return builder((BytesMessageSender) sender);
}
/** Like {@link #create(BytesMessageSender)}, except you can configure settings such as the timeout. */
public static Builder builder(BytesMessageSender sender) {
return new Builder(sender);
}
final zipkin2.reporter.internal.AsyncReporter<S> delegate;
AsyncReporter(zipkin2.reporter.internal.AsyncReporter<S> delegate) {
this.delegate = delegate;
}
@Override public void report(S span) {
delegate.report(span);
}
/**
* Calling this will flush any pending spans to the transport on the current thread.
*
* <p>Note: If you set {@link Builder#messageTimeout(long, TimeUnit) message timeout} to zero, you
* must call this externally as otherwise spans will never be sent.
*
* @throws IllegalStateException if closed
*/
@Override public void flush() {
delegate.flush();
}
/** Shuts down the sender thread, and increments drop metrics if there were any unsent spans. */
@Override public void close() {
delegate.close();
}
@Override public String toString() {
return delegate.toString();
}
public static final class Builder {
final zipkin2.reporter.internal.AsyncReporter.Builder delegate;
final Encoding encoding;
Builder(BytesMessageSender sender) {
this.delegate = zipkin2.reporter.internal.AsyncReporter.newBuilder(sender);
this.encoding = sender.encoding();
}
/**
* Launches the flush thread when {@link #messageTimeout} is greater than zero.
*/
public Builder threadFactory(ThreadFactory threadFactory) {
this.delegate.threadFactory(threadFactory);
return this;
}
/**
* Aggregates and reports reporter metrics to a monitoring system. Defaults to no-op.
*/
public Builder metrics(ReporterMetrics metrics) {
this.delegate.metrics(metrics);
return this;
}
/**
* Maximum bytes sendable per message including overhead. Defaults to, and is limited by {@link
* BytesMessageSender#messageMaxBytes()}.
*/
public Builder messageMaxBytes(int messageMaxBytes) {
this.delegate.messageMaxBytes(messageMaxBytes);
return this;
}
/**
* Default 1 second. 0 implies spans are {@link #flush() flushed} externally.
*
* <p>Instead of sending one message at a time, spans are bundled into messages, up to {@link
* BytesMessageSender#messageMaxBytes()}. This timeout ensures that spans are not stuck in an
* incomplete message.
*
* <p>Note: this timeout starts when the first unsent span is reported.
*/
public Builder messageTimeout(long timeout, TimeUnit unit) {
this.delegate.messageTimeout(timeout, unit);
return this;
}
/** How long to block for in-flight spans to send out-of-process on close. Default 1 second */
public Builder closeTimeout(long timeout, TimeUnit unit) {
this.delegate.closeTimeout(timeout, unit);
return this;
}
/** Maximum backlog of spans reported vs sent. Default 10000 */
public Builder queuedMaxSpans(int queuedMaxSpans) {
this.delegate.queuedMaxSpans(queuedMaxSpans);
return this;
}
/**
* Maximum backlog of span bytes reported vs sent. Disabled by default
*
* @deprecated This will be removed in version 4.0. Use {@link #queuedMaxSpans(int)} instead.
*/
@Deprecated
public Builder queuedMaxBytes(int queuedMaxBytes) {
this.delegate.queuedMaxBytes(queuedMaxBytes);
return this;
}
/** Builds an async reporter that encodes zipkin spans as they are reported. */
public AsyncReporter<zipkin2.Span> build() {
return build(SpanBytesEncoder.forEncoding(encoding));
}
/** Builds an async reporter that encodes arbitrary spans as they are reported. */
public <S> AsyncReporter<S> build(BytesEncoder<S> encoder) {
if (encoder == null) throw new NullPointerException("encoder == null");
return new AsyncReporter<S>(delegate.build(new BytesEncoderAdapter<S>(encoder)));
}
}
static final class BytesEncoderAdapter<S> implements BytesEncoder<S> {
final BytesEncoder<S> delegate;
BytesEncoderAdapter(BytesEncoder<S> delegate) {
this.delegate = delegate;
}
@Override public Encoding encoding() {
return delegate.encoding();
}
@Override public int sizeInBytes(S input) {
return delegate.sizeInBytes(input);
}
@Override public byte[] encode(S input) {
return delegate.encode(input);
}
@Override public String toString() {
return delegate.toString();
}
}
}