Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@
<scope>test</scope>
<version>4.3</version>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<scope>test</scope>
<version>2.30.1</version>
</dependency>
</dependencies>

<profiles>
Expand Down
121 changes: 92 additions & 29 deletions src/main/java/com/wavefront/sdk/common/clients/WavefrontClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
Expand All @@ -46,10 +47,10 @@
import static com.wavefront.sdk.common.Utils.eventToLineData;
import static com.wavefront.sdk.common.Utils.getSemVerGauge;
import static com.wavefront.sdk.common.Utils.histogramToLineData;
import static com.wavefront.sdk.common.Utils.logToLineData;
import static com.wavefront.sdk.common.Utils.metricToLineData;
import static com.wavefront.sdk.common.Utils.spanLogsToLineData;
import static com.wavefront.sdk.common.Utils.tracingSpanToLineData;
import static com.wavefront.sdk.common.Utils.logToLineData;

/**
* Wavefront client that sends data to Wavefront via Proxy or Directly to a Wavefront service
Expand Down Expand Up @@ -82,7 +83,8 @@ public class WavefrontClient implements WavefrontSender, Runnable {
private final LinkedBlockingQueue<String> spanLogsBuffer;
private final LinkedBlockingQueue<String> eventsBuffer;
private final LinkedBlockingQueue<String> logsBuffer;
private final ReportingService reportingService;
private final ReportingService metricsReportingService;
private final ReportingService tracesReportingService;
private final ScheduledExecutorService scheduler;
private final WavefrontSdkMetricsRegistry sdkMetricsRegistry;

Expand Down Expand Up @@ -139,6 +141,8 @@ public static class Builder {
private final String token;

// Optional parameters
private int metricsPort = -1;
private int tracesPort = -1;
private int maxQueueSize = 500000;
private int batchSize = 10000;
private long flushInterval = 1;
Expand All @@ -147,10 +151,14 @@ public static class Builder {
private boolean includeSdkMetrics = true;
private Map<String, String> tags = Maps.newHashMap();

private URI metricsUri;
private URI tracesUri;

/**
* Create a new WavefrontClient.Builder
*
* @param server A server URL of the form "https://clusterName.wavefront.com" or "http://internal.proxy.com:port"
* @param server A server URL of the form "https://clusterName.wavefront.com" or
* "http://internal.proxy.com:port"
* @param token A valid API token with direct ingestion permissions
*/
public Builder(String server, @Nullable String token) {
Expand All @@ -164,8 +172,7 @@ public Builder(String server, @Nullable String token) {
* @param proxyServer A server URL of the the form "http://internal.proxy.com:port"
*/
public Builder(String proxyServer) {
this.server = proxyServer;
this.token = null;
this(proxyServer, null);
}

/**
Expand Down Expand Up @@ -228,6 +235,18 @@ public Builder messageSizeBytes(int bytes) {
return this;
}

/**
* The port metrics will be sent to on the server or proxy, overriding any port set in the
* server URL.
*
* @param port Port number metrics will be sent to on the server or proxy.
* @return {@code this}
*/
public Builder metricsPort(int port) {
this.metricsPort = port;
return this;
}

/**
* Default is true, if false the internal metrics emitted from this sender will be disabled
*
Expand All @@ -249,13 +268,25 @@ public Builder sdkMetricsTags(Map<String, String> tags) {
return this;
}

/**
* For a given server endpoint, validate according to RFC 2396 and attempt
* to make a connection
*
* @return {@code this}
* @throws IllegalArgumentException
*/
/**
* The port traces will be sent to on the server or proxy, overriding any port set in the
* server URL.
*
* @param port Port number traces will be sent to on the server or proxy.
* @return {@code this}
*/
public Builder tracesPort(int port) {
this.tracesPort = port;
return this;
}

/**
* For a given server endpoint, validate according to RFC 2396 and attempt
* to make a connection
*
* @return {@code this}
* @throws IllegalArgumentException
*/
public Builder validateEndpoint() throws IllegalArgumentException {
URL url = null;

Expand All @@ -280,10 +311,28 @@ public Builder validateEndpoint() throws IllegalArgumentException {
* Creates a new client that flushes directly to a Proxy or Wavefront service.
*
* return {@link WavefrontClient}
* @throws IllegalStateException
*/
public WavefrontClient build() {
try {
this.metricsUri = buildUri(this.server, this.metricsPort);
this.tracesUri = buildUri(this.server, this.tracesPort);
} catch (URISyntaxException e) {
throw new IllegalStateException(e);
}

return new WavefrontClient(this);
}

private URI buildUri(String server, int port) throws URISyntaxException {
URI uri = new URI(server);
if (port <= 0) {
return uri;
}

return new URI(uri.getScheme(), null, uri.getHost(), port, uri.getPath(), uri.getQuery(),
uri.getFragment());
}
}

private WavefrontClient(Builder builder) {
Expand All @@ -304,7 +353,8 @@ private WavefrontClient(Builder builder) {
spanLogsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize);
eventsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize);
logsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize);
reportingService = new ReportingService(builder.server, builder.token);
metricsReportingService = new ReportingService(builder.metricsUri, builder.token);
tracesReportingService = new ReportingService(builder.tracesUri, builder.token);
scheduler = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("wavefrontClientSender").setDaemon(true));
scheduler.scheduleAtFixedRate(this, 1, builder.flushInterval, builder.flushIntervalTimeUnit);
Expand Down Expand Up @@ -402,7 +452,6 @@ public void sendMetric(String name, double value, @Nullable Long timestamp,
logger.log(LogMessageType.METRICS_BUFFER_FULL.toString(), Level.WARNING,
"Buffer full, dropping metric point: " + point + ". Consider increasing the batch " +
"size of your sender to increase throughput.");

}
}

Expand Down Expand Up @@ -455,8 +504,8 @@ public void sendDistribution(String name, List<Pair<Double, Integer>> centroids,
}

@Override
public void sendLog(String name, double value, Long timestamp, String source, Map<String, String> tags)
throws IOException {
public void sendLog(String name, double value, Long timestamp, String source,
Map<String, String> tags) throws IOException {
if (closed.get()) {
throw new IOException("attempt to send using closed sender");
}
Expand All @@ -472,8 +521,8 @@ public void sendLog(String name, double value, Long timestamp, String source, Ma
if (!logsBuffer.offer(point)) {
logsDropped.inc();
logger.log(LogMessageType.LOGS_BUFFER_FULL.toString(), Level.WARNING,
"Buffer full, dropping log point: " + point + ". Consider increasing the batch " +
"size of your sender to increase throughput.");
"Buffer full, dropping log point: " + point + ". Consider increasing the batch " +
"size of your sender to increase throughput.");

}
}
Expand All @@ -491,10 +540,12 @@ public void sendEvent(String name, long startMillis, long endMillis, @Nullable S
try {
if (uri.getScheme().equals(Constants.HTTP_PROXY_SCHEME)) {
// If the path starts with http, the event is sent with proxy.
event = eventToLineData(name, startMillis, endMillis, source, tags, annotations, defaultSource, false);
event = eventToLineData(name, startMillis, endMillis, source, tags, annotations,
defaultSource, false);
} else {
// If the path starts with https, the event is sent with direct ingestion.
event = eventToLineData(name, startMillis, endMillis, source, tags, annotations, defaultSource, true);
event = eventToLineData(name, startMillis, endMillis, source, tags, annotations,
defaultSource, true);
}
eventsValid.inc();
} catch (IllegalArgumentException e) {
Expand Down Expand Up @@ -574,10 +625,10 @@ public void run() {

@Override
public void flush() throws IOException {
if (closed.get()) {
throw new IOException("attempt to flush closed sender");
}
this.flushNoCheck();
if (closed.get()) {
throw new IOException("attempt to flush closed sender");
}
this.flushNoCheck();
}

private void flushNoCheck() throws IOException {
Expand Down Expand Up @@ -608,17 +659,29 @@ private void flushNoCheck() throws IOException {

private void internalFlush(LinkedBlockingQueue<String> buffer, String format,
String entityPrefix, String entityType,
WavefrontSdkDeltaCounter dropped, WavefrontSdkDeltaCounter reportErrors,
WavefrontSdkDeltaCounter dropped,
WavefrontSdkDeltaCounter reportErrors,
AtomicInteger featureDisabledStatusCode,
LogMessageType errorMessageType,
LogMessageType permissionsMessageType,
LogMessageType bufferFullMessageType)
throws IOException {
ReportingService entityReportingService;
switch (format) {
case Constants.WAVEFRONT_SPAN_LOG_FORMAT:
case Constants.WAVEFRONT_TRACING_SPAN_FORMAT:
entityReportingService = this.tracesReportingService;
break;
default:
entityReportingService = this.metricsReportingService;
break;
}

List<List<String>> batch = null;
if(format.equals(Constants.WAVEFRONT_EVENT_FORMAT)){
if (format.equals(Constants.WAVEFRONT_EVENT_FORMAT)) {
// Event direct ingestion now does not support batching
batch = getBatch(buffer, 1, messageSizeBytes, dropped);
}else{
} else {
batch = getBatch(buffer, batchSize, messageSizeBytes, dropped);
}
for (int i = 0; i < batch.size(); i++) {
Expand Down Expand Up @@ -648,9 +711,9 @@ private void internalFlush(LinkedBlockingQueue<String> buffer, String format,
try (InputStream is = itemsToStream(items)) {
int statusCode;
if (format.equals(Constants.WAVEFRONT_EVENT_FORMAT)) {
statusCode = reportingService.sendEvent(is);
statusCode = entityReportingService.sendEvent(is);
} else {
statusCode = reportingService.send(format, is);
statusCode = entityReportingService.send(format, is);
}
sdkMetricsRegistry.newDeltaCounter(entityPrefix + ".report." + statusCode).inc();
if ((400 <= statusCode && statusCode <= 599) || statusCode == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
*/
public class ReportingService implements ReportAPI {

private static final MessageSuppressingLogger MESSAGE_SUPPRESSING_LOGGER = new MessageSuppressingLogger(
Logger.getLogger(ReportingService.class.getCanonicalName()), 5, TimeUnit.MINUTES);
private static final MessageSuppressingLogger MESSAGE_SUPPRESSING_LOGGER =
new MessageSuppressingLogger(Logger.getLogger(ReportingService.class.getCanonicalName()),
5, TimeUnit.MINUTES);

private final String token;
private final URI uri;
Expand All @@ -35,8 +36,8 @@ public class ReportingService implements ReportAPI {
private static final int BUFFER_SIZE = 4096;
private static final int NO_HTTP_RESPONSE = -1;

public ReportingService(String server, @Nullable String token) {
this.uri = URI.create(server);
public ReportingService(URI uri, @Nullable String token) {
this.uri = uri;
this.token = token;
}

Expand Down
Loading