diff --git a/README.md b/README.md
index 2771f2a1c..c361a6894 100644
--- a/README.md
+++ b/README.md
@@ -64,7 +64,7 @@ Split has built and maintains SDKs for:
* Java [Github](https://github.com/splitio/java-client) [Docs](https://help.split.io/hc/en-us/articles/360020405151-Java-SDK)
* Javascript [Github](https://github.com/splitio/javascript-client) [Docs](https://help.split.io/hc/en-us/articles/360020448791-JavaScript-SDK)
* Node [Github](https://github.com/splitio/javascript-client) [Docs](https://help.split.io/hc/en-us/articles/360020564931-Node-js-SDK)
-* .NET [Github](https://github.com/splitio/.net-core-client) [Docs](https://help.split.io/hc/en-us/articles/360020240172--NET-SDK)
+* .NET [Github](https://github.com/splitio/dotnet-client) [Docs](https://help.split.io/hc/en-us/articles/360020240172--NET-SDK)
* Ruby [Github](https://github.com/splitio/ruby-client) [Docs](https://help.split.io/hc/en-us/articles/360020673251-Ruby-SDK)
* PHP [Github](https://github.com/splitio/php-client) [Docs](https://help.split.io/hc/en-us/articles/360020350372-PHP-SDK)
* Python [Github](https://github.com/splitio/python-client) [Docs](https://help.split.io/hc/en-us/articles/360020359652-Python-SDK)
diff --git a/client/CHANGES.txt b/client/CHANGES.txt
index e02d816d8..2d413bf6e 100644
--- a/client/CHANGES.txt
+++ b/client/CHANGES.txt
@@ -1,10 +1,15 @@
CHANGES
+4.2.0 (Jun 7, 2021)
+- Updated SDK telemetry storage, metrics and updater to be more effective and send less often.
+- Improved the synchronization flow to be more reliable in the event of an edge case generating delay in cache purge propagation, keeping the SDK cache properly synced.
+- Fixed issue where the SDK was validating no Split had over 50 conditions (legacy code).
+
4.1.6 (Apr 15, 2021)
--Updated log level and message in some messages.
+- Updated log level and message in some messages.
4.1.5 (Apr 6, 2021)
--Updated: Streaming retry fix.
+- Updated streaming logic to use limited fetch retry attempts.
4.1.4 (Mar 19, 2021)
- Updated: Internal cache structure refactor.
diff --git a/client/pom.xml b/client/pom.xml
index 5336cf310..15019f068 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -5,7 +5,7 @@
io.split.client
java-client-parent
- 4.1.6
+ 4.2.0
java-client
jar
@@ -121,7 +121,7 @@
com.google.guava
guava
- 29.0-jre
+ 30.0-jre
org.slf4j
diff --git a/client/src/main/java/io/split/cache/SegmentCache.java b/client/src/main/java/io/split/cache/SegmentCache.java
index d75fe11d3..81349a5b9 100644
--- a/client/src/main/java/io/split/cache/SegmentCache.java
+++ b/client/src/main/java/io/split/cache/SegmentCache.java
@@ -1,6 +1,9 @@
package io.split.cache;
+import io.split.engine.segments.SegmentImp;
+
import java.util.List;
+import java.util.Set;
/**
* Memory for segments
@@ -42,4 +45,16 @@ public interface SegmentCache {
* clear all segments
*/
void clear();
+
+ /**
+ * return every segment
+ * @return
+ */
+ List getAll();
+
+ /**
+ * return key count
+ * @return
+ */
+ long getKeyCount();
}
diff --git a/client/src/main/java/io/split/cache/SegmentCacheInMemoryImpl.java b/client/src/main/java/io/split/cache/SegmentCacheInMemoryImpl.java
index 0c705c016..774c8d20f 100644
--- a/client/src/main/java/io/split/cache/SegmentCacheInMemoryImpl.java
+++ b/client/src/main/java/io/split/cache/SegmentCacheInMemoryImpl.java
@@ -6,7 +6,9 @@
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
/**
* InMemoryCache Implementation
@@ -59,4 +61,14 @@ public long getChangeNumber(String segmentName) {
public void clear() {
_segments.clear();
}
+
+ @Override
+ public List getAll() {
+ return _segments.values().stream().collect(Collectors.toList());
+ }
+
+ @Override
+ public long getKeyCount() {
+ return _segments.values().stream().mapToLong(SegmentImp::getKeysSize).sum();
+ }
}
diff --git a/client/src/main/java/io/split/client/ApiKeyCounter.java b/client/src/main/java/io/split/client/ApiKeyCounter.java
index 8c39394dd..426ade17b 100644
--- a/client/src/main/java/io/split/client/ApiKeyCounter.java
+++ b/client/src/main/java/io/split/client/ApiKeyCounter.java
@@ -6,6 +6,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
public class ApiKeyCounter {
private static final Logger _log = LoggerFactory.getLogger(ApiKeyCounter.class);
@@ -63,4 +66,18 @@ boolean isApiKeyPresent(String apiKey) {
int getCount(String apiKey) {
return USED_API_KEYS.count(apiKey);
}
+
+ public Map getFactoryInstances() {
+ Map factoryInstances = new HashMap<>();
+ for (String factory :USED_API_KEYS) {
+ factoryInstances.putIfAbsent(factory, new Long(getCount(factory)));
+ }
+
+ return factoryInstances;
+ }
+
+ @VisibleForTesting
+ void clearApiKeys() {
+ USED_API_KEYS.clear();
+ }
}
diff --git a/client/src/main/java/io/split/client/EventClientImpl.java b/client/src/main/java/io/split/client/EventClientImpl.java
index 5c3ae8c13..eb4da7703 100644
--- a/client/src/main/java/io/split/client/EventClientImpl.java
+++ b/client/src/main/java/io/split/client/EventClientImpl.java
@@ -4,6 +4,11 @@
import io.split.client.dtos.Event;
import io.split.client.utils.GenericClientUtil;
import io.split.client.utils.Utils;
+import io.split.telemetry.domain.enums.EventsDataRecordsEnum;
+import io.split.telemetry.domain.enums.HTTPLatenciesEnum;
+import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
+import io.split.telemetry.storage.TelemetryEvaluationProducer;
+import io.split.telemetry.storage.TelemetryRuntimeProducer;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.MIN_PRIORITY;
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* Responsible for sending events added via .track() to Split collection services
@@ -45,34 +51,28 @@ public class EventClientImpl implements EventClient {
private final CloseableHttpClient _httpclient;
private final URI _target;
private final int _waitBeforeShutdown;
+ private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
ThreadFactory eventClientThreadFactory(final String name) {
- return new ThreadFactory() {
- @Override
- public Thread newThread(final Runnable r) {
- return new Thread(new Runnable() {
- @Override
- public void run() {
- Thread.currentThread().setPriority(MIN_PRIORITY);
- r.run();
- }
- }, name);
- }
- };
+ return r -> new Thread(() -> {
+ Thread.currentThread().setPriority(MIN_PRIORITY);
+ r.run();
+ }, name);
}
- public static EventClientImpl create(CloseableHttpClient httpclient, URI eventsRootTarget, int maxQueueSize, long flushIntervalMillis, int waitBeforeShutdown) throws URISyntaxException {
- return new EventClientImpl(new LinkedBlockingQueue(),
+ public static EventClientImpl create(CloseableHttpClient httpclient, URI eventsRootTarget, int maxQueueSize, long flushIntervalMillis, int waitBeforeShutdown, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
+ return new EventClientImpl(new LinkedBlockingQueue<>(maxQueueSize),
httpclient,
Utils.appendPath(eventsRootTarget, "api/events/bulk"),
maxQueueSize,
flushIntervalMillis,
- waitBeforeShutdown);
+ waitBeforeShutdown,
+ telemetryRuntimeProducer);
}
EventClientImpl(BlockingQueue eventQueue, CloseableHttpClient httpclient, URI target, int maxQueueSize,
- long flushIntervalMillis, int waitBeforeShutdown) throws URISyntaxException {
+ long flushIntervalMillis, int waitBeforeShutdown, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
_httpclient = httpclient;
@@ -83,6 +83,7 @@ public static EventClientImpl create(CloseableHttpClient httpclient, URI eventsR
_maxQueueSize = maxQueueSize;
_flushIntervalMillis = flushIntervalMillis;
+ _telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
_senderExecutor = new ThreadPoolExecutor(
1,
@@ -122,9 +123,16 @@ public boolean track(Event event, int eventSize) {
if (event == null) {
return false;
}
- _eventQueue.put(new WrappedEvent(event, eventSize));
+ if(_eventQueue.offer(new WrappedEvent(event, eventSize))) {
+ _telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 1);
+ }
+ else {
+ _log.warn("Event dropped.");
+ _telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
+ }
- } catch (InterruptedException e) {
+ } catch (ClassCastException | NullPointerException | IllegalArgumentException e) {
+ _telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
_log.warn("Interruption when adding event withed while adding message %s.", event);
return false;
}
@@ -153,7 +161,7 @@ public void run() {
List events = new ArrayList<>();
long accumulated = 0;
try {
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
WrappedEvent data = _eventQueue.take();
Event event = data.event();
Long size = data.size();
@@ -169,7 +177,7 @@ public void run() {
continue;
}
-
+ long initTime = System.currentTimeMillis();
if (events.size() >= _maxQueueSize || accumulated >= MAX_SIZE_BYTES || event == CENTINEL) {
// Send over the network
@@ -183,6 +191,8 @@ public void run() {
// Clear the queue of events for the next batch.
events = new ArrayList<>();
accumulated = 0;
+ _telemetryRuntimeProducer.recordSyncLatency(HTTPLatenciesEnum.EVENTS, System.currentTimeMillis()-initTime);
+ _telemetryRuntimeProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.EVENTS, System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
diff --git a/client/src/main/java/io/split/client/HttpSegmentChangeFetcher.java b/client/src/main/java/io/split/client/HttpSegmentChangeFetcher.java
index 7d7d735f6..a9475320f 100644
--- a/client/src/main/java/io/split/client/HttpSegmentChangeFetcher.java
+++ b/client/src/main/java/io/split/client/HttpSegmentChangeFetcher.java
@@ -4,11 +4,18 @@
import io.split.client.dtos.SegmentChange;
import io.split.client.utils.Json;
import io.split.client.utils.Utils;
+import io.split.engine.common.FetchOptions;
import io.split.engine.metrics.Metrics;
import io.split.engine.segments.SegmentChangeFetcher;
+import io.split.telemetry.domain.enums.HTTPLatenciesEnum;
+import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
+import io.split.telemetry.domain.enums.ResourceEnum;
+import io.split.telemetry.storage.TelemetryRuntimeProducer;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.net.URIBuilder;
import org.slf4j.Logger;
@@ -17,6 +24,8 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -27,57 +36,72 @@ public final class HttpSegmentChangeFetcher implements SegmentChangeFetcher {
private static final Logger _log = LoggerFactory.getLogger(HttpSegmentChangeFetcher.class);
private static final String SINCE = "since";
+ private static final String TILL = "till";
private static final String PREFIX = "segmentChangeFetcher";
- private static final String NAME_CACHE = "Cache-Control";
- private static final String VALUE_CACHE = "no-cache";
+ private static final String CACHE_CONTROL_HEADER_NAME = "Cache-Control";
+ private static final String CACHE_CONTROL_HEADER_VALUE = "no-cache";
+
+ private static final String HEADER_FASTLY_DEBUG_NAME = "Fastly-Debug";
+ private static final String HEADER_FASTLY_DEBUG_VALUE = "1";
private final CloseableHttpClient _client;
private final URI _target;
- private final Metrics _metrics;
-
- public static HttpSegmentChangeFetcher create(CloseableHttpClient client, URI root) throws URISyntaxException {
- return create(client, root, new Metrics.NoopMetrics());
- }
+ private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
- public static HttpSegmentChangeFetcher create(CloseableHttpClient client, URI root, Metrics metrics) throws URISyntaxException {
- return new HttpSegmentChangeFetcher(client, Utils.appendPath(root, "api/segmentChanges"), metrics);
+ public static HttpSegmentChangeFetcher create(CloseableHttpClient client, URI root, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
+ return new HttpSegmentChangeFetcher(client, Utils.appendPath(root, "api/segmentChanges"), telemetryRuntimeProducer);
}
- private HttpSegmentChangeFetcher(CloseableHttpClient client, URI uri, Metrics metrics) {
+ private HttpSegmentChangeFetcher(CloseableHttpClient client, URI uri, TelemetryRuntimeProducer telemetryRuntimeProducer) {
_client = client;
_target = uri;
- _metrics = metrics;
checkNotNull(_target);
+ _telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
}
@Override
- public SegmentChange fetch(String segmentName, long since, boolean addCacheHeader) {
+ public SegmentChange fetch(String segmentName, long since, FetchOptions options) {
long start = System.currentTimeMillis();
CloseableHttpResponse response = null;
try {
String path = _target.getPath() + "/" + segmentName;
- URI uri = new URIBuilder(_target).setPath(path).addParameter(SINCE, "" + since).build();
+ URIBuilder uriBuilder = new URIBuilder(_target)
+ .setPath(path)
+ .addParameter(SINCE, "" + since);
+ if (options.hasCustomCN()) {
+ uriBuilder.addParameter(TILL, "" + options.targetCN());
+ }
+
+ URI uri = uriBuilder.build();
HttpGet request = new HttpGet(uri);
- if(addCacheHeader) {
- request.setHeader(NAME_CACHE, VALUE_CACHE);
+
+ if(options.cacheControlHeadersEnabled()) {
+ request.setHeader(CACHE_CONTROL_HEADER_NAME, CACHE_CONTROL_HEADER_VALUE);
+ }
+
+ if (options.fastlyDebugHeaderEnabled()) {
+ request.addHeader(HEADER_FASTLY_DEBUG_NAME, HEADER_FASTLY_DEBUG_VALUE);
}
+
response = _client.execute(request);
+ options.handleResponseHeaders(Arrays.stream(response.getHeaders())
+ .collect(Collectors.toMap(Header::getName, Header::getValue)));
int statusCode = response.getCode();
- if (statusCode < 200 || statusCode >= 300) {
+ if (statusCode < HttpStatus.SC_OK || statusCode >= HttpStatus.SC_MULTIPLE_CHOICES) {
+ _telemetryRuntimeProducer.recordSyncError(ResourceEnum.SEGMENT_SYNC, statusCode);
_log.error("Response status was: " + statusCode);
- if (statusCode == 403) {
+ if (statusCode == HttpStatus.SC_FORBIDDEN) {
_log.error("factory instantiation: you passed a browser type api_key, " +
"please grab an api key from the Split console that is of type sdk");
}
- _metrics.count(PREFIX + ".status." + statusCode, 1);
throw new IllegalStateException("Could not retrieve segment changes for " + segmentName + "; http return code " + statusCode);
}
-
+ _telemetryRuntimeProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.SEGMENTS, System.currentTimeMillis());
String json = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
if (_log.isDebugEnabled()) {
@@ -86,11 +110,10 @@ public SegmentChange fetch(String segmentName, long since, boolean addCacheHeade
return Json.fromJson(json, SegmentChange.class);
} catch (Throwable t) {
- _metrics.count(PREFIX + ".exception", 1);
throw new IllegalStateException("Problem fetching segmentChanges: " + t.getMessage(), t);
} finally {
+ _telemetryRuntimeProducer.recordSyncLatency(HTTPLatenciesEnum.SEGMENTS, System.currentTimeMillis()-start);
Utils.forceClose(response);
- _metrics.time(PREFIX + ".time", System.currentTimeMillis() - start);
}
diff --git a/client/src/main/java/io/split/client/HttpSplitChangeFetcher.java b/client/src/main/java/io/split/client/HttpSplitChangeFetcher.java
index 3c5f9b8fc..9d77654e5 100644
--- a/client/src/main/java/io/split/client/HttpSplitChangeFetcher.java
+++ b/client/src/main/java/io/split/client/HttpSplitChangeFetcher.java
@@ -4,11 +4,18 @@
import io.split.client.dtos.SplitChange;
import io.split.client.utils.Json;
import io.split.client.utils.Utils;
+import io.split.engine.common.FetchOptions;
import io.split.engine.experiments.SplitChangeFetcher;
import io.split.engine.metrics.Metrics;
+import io.split.telemetry.domain.enums.HTTPLatenciesEnum;
+import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
+import io.split.telemetry.domain.enums.ResourceEnum;
+import io.split.telemetry.storage.TelemetryRuntimeProducer;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.net.URIBuilder;
import org.slf4j.Logger;
@@ -17,6 +24,8 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -27,49 +36,66 @@ public final class HttpSplitChangeFetcher implements SplitChangeFetcher {
private static final Logger _log = LoggerFactory.getLogger(HttpSplitChangeFetcher.class);
private static final String SINCE = "since";
+ private static final String TILL = "till";
private static final String PREFIX = "splitChangeFetcher";
- private static final String NAME_CACHE = "Cache-Control";
- private static final String VALUE_CACHE = "no-cache";
+
+ private static final String HEADER_CACHE_CONTROL_NAME = "Cache-Control";
+ private static final String HEADER_CACHE_CONTROL_VALUE = "no-cache";
+
+ private static final String HEADER_FASTLY_DEBUG_NAME = "Fastly-Debug";
+ private static final String HEADER_FASTLY_DEBUG_VALUE = "1";
private final CloseableHttpClient _client;
private final URI _target;
- private final Metrics _metrics;
-
- public static HttpSplitChangeFetcher create(CloseableHttpClient client, URI root) throws URISyntaxException {
- return create(client, root, new Metrics.NoopMetrics());
- }
+ private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
- public static HttpSplitChangeFetcher create(CloseableHttpClient client, URI root, Metrics metrics) throws URISyntaxException {
- return new HttpSplitChangeFetcher(client, Utils.appendPath(root, "api/splitChanges"), metrics);
+ public static HttpSplitChangeFetcher create(CloseableHttpClient client, URI root, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
+ return new HttpSplitChangeFetcher(client, Utils.appendPath(root, "api/splitChanges"), telemetryRuntimeProducer);
}
- private HttpSplitChangeFetcher(CloseableHttpClient client, URI uri, Metrics metrics) {
+ private HttpSplitChangeFetcher(CloseableHttpClient client, URI uri, TelemetryRuntimeProducer telemetryRuntimeProducer) {
_client = client;
_target = uri;
- _metrics = metrics;
checkNotNull(_target);
+ _telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
+ }
+
+ long makeRandomTill() {
+
+ return (-1)*(long)Math.floor(Math.random()*(Math.pow(2, 63)));
}
@Override
- public SplitChange fetch(long since, boolean addCacheHeader) {
+ public SplitChange fetch(long since, FetchOptions options) {
long start = System.currentTimeMillis();
CloseableHttpResponse response = null;
try {
- URI uri = new URIBuilder(_target).addParameter(SINCE, "" + since).build();
+ URIBuilder uriBuilder = new URIBuilder(_target).addParameter(SINCE, "" + since);
+ if (options.hasCustomCN()) {
+ uriBuilder.addParameter(TILL, "" + options.targetCN());
+ }
+ URI uri = uriBuilder.build();
HttpGet request = new HttpGet(uri);
- if(addCacheHeader) {
- request.setHeader(NAME_CACHE, VALUE_CACHE);
+ if(options.cacheControlHeadersEnabled()) {
+ request.setHeader(HEADER_CACHE_CONTROL_NAME, HEADER_CACHE_CONTROL_VALUE);
}
+
+ if (options.fastlyDebugHeaderEnabled()) {
+ request.addHeader(HEADER_FASTLY_DEBUG_NAME, HEADER_FASTLY_DEBUG_VALUE);
+ }
+
response = _client.execute(request);
+ options.handleResponseHeaders(Arrays.stream(response.getHeaders())
+ .collect(Collectors.toMap(Header::getName, Header::getValue)));
int statusCode = response.getCode();
- if (statusCode < 200 || statusCode >= 300) {
- _metrics.count(PREFIX + ".status." + statusCode, 1);
+ if (statusCode < HttpStatus.SC_OK || statusCode >= HttpStatus.SC_MULTIPLE_CHOICES) {
+ _telemetryRuntimeProducer.recordSyncError(ResourceEnum.SPLIT_SYNC, statusCode);
throw new IllegalStateException("Could not retrieve splitChanges; http return code " + statusCode);
}
@@ -81,11 +107,10 @@ public SplitChange fetch(long since, boolean addCacheHeader) {
return Json.fromJson(json, SplitChange.class);
} catch (Throwable t) {
- _metrics.count(PREFIX + ".exception", 1);
throw new IllegalStateException("Problem fetching splitChanges: " + t.getMessage(), t);
} finally {
+ _telemetryRuntimeProducer.recordSyncLatency(HTTPLatenciesEnum.SPLITS, System.currentTimeMillis()-start);
Utils.forceClose(response);
- _metrics.time(PREFIX + ".time", System.currentTimeMillis() - start);
}
}
diff --git a/client/src/main/java/io/split/client/LocalhostSplitFactory.java b/client/src/main/java/io/split/client/LocalhostSplitFactory.java
index 0ec01f8c9..3a2f0a14b 100644
--- a/client/src/main/java/io/split/client/LocalhostSplitFactory.java
+++ b/client/src/main/java/io/split/client/LocalhostSplitFactory.java
@@ -6,6 +6,8 @@
import io.split.engine.SDKReadinessGates;
import io.split.engine.evaluator.EvaluatorImp;
import io.split.engine.metrics.Metrics;
+import io.split.telemetry.storage.InMemoryTelemetryStorage;
+import io.split.telemetry.storage.NoopTelemetryStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,12 +56,12 @@ public LocalhostSplitFactory(String directory, String file) throws IOException {
SplitCache splitCache = new InMemoryCacheImp();
SDKReadinessGates sdkReadinessGates = new SDKReadinessGates();
- sdkReadinessGates.splitsAreReady();
_cacheUpdaterService = new CacheUpdaterService(splitCache);
_cacheUpdaterService.updateCache(splitAndKeyToTreatment);
+ sdkReadinessGates.sdkInternalReady();
_client = new SplitClientImpl(this, splitCache,
- new ImpressionsManager.NoOpImpressionsManager(), new Metrics.NoopMetrics(), new NoopEventClient(),
- SplitClientConfig.builder().setBlockUntilReadyTimeout(1).build(), sdkReadinessGates, new EvaluatorImp(splitCache));
+ new ImpressionsManager.NoOpImpressionsManager(), new NoopEventClient(),
+ SplitClientConfig.builder().setBlockUntilReadyTimeout(1).build(), sdkReadinessGates, new EvaluatorImp(splitCache), new NoopTelemetryStorage(), new NoopTelemetryStorage());
_manager = LocalhostSplitManager.of(splitAndKeyToTreatment);
_splitFile.registerWatcher();
diff --git a/client/src/main/java/io/split/client/SplitClientConfig.java b/client/src/main/java/io/split/client/SplitClientConfig.java
index cafd52ebb..7b2ae0e5b 100644
--- a/client/src/main/java/io/split/client/SplitClientConfig.java
+++ b/client/src/main/java/io/split/client/SplitClientConfig.java
@@ -17,6 +17,11 @@
public class SplitClientConfig {
public static final String LOCALHOST_DEFAULT_FILE = "split.yaml";
+ public static final String SDK_ENDPOINT = "https://sdk.split.io";
+ public static final String EVENTS_ENDPOINT = "https://events.split.io";
+ public static final String AUTH_ENDPOINT = "https://auth.split.io/api/auth";
+ public static final String STREAMING_ENDPOINT = "https://streaming.split.io/sse";
+ public static final String TELEMETRY_ENDPOINT = "https://telemetry.split.io/api/v1";
private final String _endpoint;
private final String _eventsEndpoint;
@@ -46,6 +51,12 @@ public class SplitClientConfig {
private final int _streamingReconnectBackoffBase;
private final String _authServiceURL;
private final String _streamingServiceURL;
+ private final String _telemetryURL;
+ private final int _telemetryRefreshRate;
+ private final int _onDemandFetchRetryDelayMs;
+ private final int _onDemandFetchMaxRetries;
+ private final int _failedAttemptsBeforeLogging;
+ private final boolean _cdnDebugLogging;
// Proxy configs
private final HttpHost _proxy;
@@ -89,7 +100,13 @@ private SplitClientConfig(String endpoint,
int authRetryBackoffBase,
int streamingReconnectBackoffBase,
String authServiceURL,
- String streamingServiceURL) {
+ String streamingServiceURL,
+ String telemetryURL,
+ int telemetryRefreshRate,
+ int onDemandFetchRetryDelayMs,
+ int onDemandFetchMaxRetries,
+ int failedAttemptsBeforeLogging,
+ boolean cdnDebugLogging) {
_endpoint = endpoint;
_eventsEndpoint = eventsEndpoint;
_featuresRefreshRate = pollForFeatureChangesEveryNSeconds;
@@ -120,6 +137,12 @@ private SplitClientConfig(String endpoint,
_streamingReconnectBackoffBase = streamingReconnectBackoffBase;
_authServiceURL = authServiceURL;
_streamingServiceURL = streamingServiceURL;
+ _telemetryURL = telemetryURL;
+ _telemetryRefreshRate = telemetryRefreshRate;
+ _onDemandFetchRetryDelayMs = onDemandFetchRetryDelayMs;
+ _onDemandFetchMaxRetries = onDemandFetchMaxRetries;
+ _failedAttemptsBeforeLogging = failedAttemptsBeforeLogging;
+ _cdnDebugLogging = cdnDebugLogging;
Properties props = new Properties();
try {
@@ -248,11 +271,27 @@ public String streamingServiceURL() {
return _streamingServiceURL;
}
+ public String telemetryURL() {
+ return _telemetryURL;
+ }
+
+ public int get_telemetryRefreshRate() {
+ return _telemetryRefreshRate;
+ }
+ public int streamingRetryDelay() {return _onDemandFetchRetryDelayMs;}
+
+ public int streamingFetchMaxRetries() {return _onDemandFetchMaxRetries;}
+
+ public int failedAttemptsBeforeLogging() {return _failedAttemptsBeforeLogging;}
+
+ public boolean cdnDebugLogging() { return _cdnDebugLogging; }
+
+
public static final class Builder {
- private String _endpoint = "https://sdk.split.io";
+ private String _endpoint = SDK_ENDPOINT;
private boolean _endpointSet = false;
- private String _eventsEndpoint = "https://events.split.io";
+ private String _eventsEndpoint = EVENTS_ENDPOINT;
private boolean _eventsEndpointSet = false;
private int _featuresRefreshRate = 60;
private int _segmentsRefreshRate = 60;
@@ -281,8 +320,14 @@ public static final class Builder {
private boolean _streamingEnabled = true;
private int _authRetryBackoffBase = 1;
private int _streamingReconnectBackoffBase = 1;
- private String _authServiceURL = "https://auth.split.io/api/auth";
- private String _streamingServiceURL = "https://streaming.split.io/sse";
+ private String _authServiceURL = AUTH_ENDPOINT;
+ private String _streamingServiceURL = STREAMING_ENDPOINT;
+ private String _telemetryURl = TELEMETRY_ENDPOINT;
+ private int _telemetryRefreshRate = 60;
+ private int _onDemandFetchRetryDelayMs = 50;
+ private final int _onDemandFetchMaxRetries = 10;
+ private final int _failedAttemptsBeforeLogging = 10;
+ private final boolean _cdnDebugLogging = true;
public Builder() {
}
@@ -674,6 +719,27 @@ public Builder streamingServiceURL(String streamingServiceURL) {
return this;
}
+ /**
+ * Set telemetry service URL.
+ * @param telemetryURL
+ * @return
+ */
+ public Builder telemetryURL(String telemetryURL) {
+ _telemetryURl = telemetryURL;
+ return this;
+ }
+
+ /**
+ * How often send telemetry data
+ *
+ * @param telemetryRefreshRate
+ * @return this builder
+ */
+ public Builder telemetryRefreshRate(int telemetryRefreshRate) {
+ _telemetryRefreshRate = telemetryRefreshRate;
+ return this;
+ }
+
public SplitClientConfig build() {
if (_featuresRefreshRate < 5 ) {
throw new IllegalArgumentException("featuresRefreshRate must be >= 5: " + _featuresRefreshRate);
@@ -744,6 +810,17 @@ public SplitClientConfig build() {
throw new IllegalArgumentException("streamingServiceURL must not be null");
}
+ if (_telemetryURl == null) {
+ throw new IllegalArgumentException("telemetryURl must not be null");
+ }
+
+ if (_onDemandFetchRetryDelayMs <= 0) {
+ throw new IllegalStateException("streamingRetryDelay must be > 0");
+ }
+ if(_onDemandFetchMaxRetries <= 0) {
+ throw new IllegalStateException("_onDemandFetchMaxRetries must be > 0");
+ }
+
return new SplitClientConfig(
_endpoint,
_eventsEndpoint,
@@ -774,7 +851,13 @@ public SplitClientConfig build() {
_authRetryBackoffBase,
_streamingReconnectBackoffBase,
_authServiceURL,
- _streamingServiceURL);
+ _streamingServiceURL,
+ _telemetryURl,
+ _telemetryRefreshRate,
+ _onDemandFetchRetryDelayMs,
+ _onDemandFetchMaxRetries,
+ _failedAttemptsBeforeLogging,
+ _cdnDebugLogging);
}
}
}
diff --git a/client/src/main/java/io/split/client/SplitClientImpl.java b/client/src/main/java/io/split/client/SplitClientImpl.java
index 7792f5a1f..7427165ab 100644
--- a/client/src/main/java/io/split/client/SplitClientImpl.java
+++ b/client/src/main/java/io/split/client/SplitClientImpl.java
@@ -10,12 +10,14 @@
import io.split.engine.evaluator.Evaluator;
import io.split.engine.evaluator.EvaluatorImp;
import io.split.engine.evaluator.Labels;
-import io.split.engine.metrics.Metrics;
import io.split.grammar.Treatments;
import io.split.inputValidation.EventsValidator;
import io.split.inputValidation.KeyValidator;
import io.split.inputValidation.SplitNameValidator;
import io.split.inputValidation.TrafficTypeValidator;
+import io.split.telemetry.domain.enums.MethodEnum;
+import io.split.telemetry.storage.TelemetryConfigProducer;
+import io.split.telemetry.storage.TelemetryEvaluationProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,36 +37,36 @@
public final class SplitClientImpl implements SplitClient {
public static final SplitResult SPLIT_RESULT_CONTROL = new SplitResult(Treatments.CONTROL, null);
- private static final String GET_TREATMENT = "getTreatment";
- private static final String GET_TREATMENT_WITH_CONFIG = "getTreatmentWithConfig";
-
private static final Logger _log = LoggerFactory.getLogger(SplitClientImpl.class);
private final SplitFactory _container;
private final SplitCache _splitCache;
private final ImpressionsManager _impressionManager;
- private final Metrics _metrics;
private final SplitClientConfig _config;
private final EventClient _eventClient;
private final SDKReadinessGates _gates;
private final Evaluator _evaluator;
+ private final TelemetryEvaluationProducer _telemetryEvaluationProducer;
+ private final TelemetryConfigProducer _telemetryConfigProducer;
public SplitClientImpl(SplitFactory container,
SplitCache splitCache,
ImpressionsManager impressionManager,
- Metrics metrics,
EventClient eventClient,
SplitClientConfig config,
SDKReadinessGates gates,
- Evaluator evaluator) {
+ Evaluator evaluator,
+ TelemetryEvaluationProducer telemetryEvaluationProducer,
+ TelemetryConfigProducer telemetryConfigProducer) {
_container = container;
_splitCache = checkNotNull(splitCache);
_impressionManager = checkNotNull(impressionManager);
- _metrics = metrics;
_eventClient = eventClient;
_config = config;
_gates = checkNotNull(gates);
_evaluator = checkNotNull(evaluator);
+ _telemetryEvaluationProducer = checkNotNull(telemetryEvaluationProducer);
+ _telemetryConfigProducer = checkNotNull(telemetryConfigProducer);
}
@Override
@@ -74,27 +76,27 @@ public String getTreatment(String key, String split) {
@Override
public String getTreatment(String key, String split, Map attributes) {
- return getTreatmentWithConfigInternal(GET_TREATMENT, key, null, split, attributes).treatment();
+ return getTreatmentWithConfigInternal(key, null, split, attributes, MethodEnum.TREATMENT).treatment();
}
@Override
public String getTreatment(Key key, String split, Map attributes) {
- return getTreatmentWithConfigInternal(GET_TREATMENT, key.matchingKey(), key.bucketingKey(), split, attributes).treatment();
+ return getTreatmentWithConfigInternal(key.matchingKey(), key.bucketingKey(), split, attributes, MethodEnum.TREATMENT).treatment();
}
@Override
public SplitResult getTreatmentWithConfig(String key, String split) {
- return getTreatmentWithConfigInternal(GET_TREATMENT_WITH_CONFIG, key, null, split, Collections.emptyMap());
+ return getTreatmentWithConfigInternal(key, null, split, Collections.emptyMap(), MethodEnum.TREATMENT_WITH_CONFIG);
}
@Override
public SplitResult getTreatmentWithConfig(String key, String split, Map attributes) {
- return getTreatmentWithConfigInternal(GET_TREATMENT_WITH_CONFIG, key, null, split, attributes);
+ return getTreatmentWithConfigInternal(key, null, split, attributes, MethodEnum.TREATMENT_WITH_CONFIG);
}
@Override
public SplitResult getTreatmentWithConfig(Key key, String split, Map attributes) {
- return getTreatmentWithConfigInternal(GET_TREATMENT_WITH_CONFIG, key.matchingKey(), key.bucketingKey(), split, attributes);
+ return getTreatmentWithConfigInternal(key.matchingKey(), key.bucketingKey(), split, attributes, MethodEnum.TREATMENT_WITH_CONFIG);
}
@Override
@@ -132,7 +134,7 @@ public void blockUntilReady() throws TimeoutException, InterruptedException {
if (_config.blockUntilReady() <= 0) {
throw new IllegalArgumentException("setBlockUntilReadyTimeout must be positive but in config was: " + _config.blockUntilReady());
}
- if (!_gates.isSDKReady(_config.blockUntilReady())) {
+ if (!_gates.waitUntilInternalReady(_config.blockUntilReady())) {
throw new TimeoutException("SDK was not ready in " + _config.blockUntilReady()+ " milliseconds");
}
_log.debug(String.format("Split SDK ready in %d ms", (System.currentTimeMillis() - startTime)));
@@ -144,6 +146,7 @@ public void destroy() {
}
private boolean track(Event event) {
+ long initTime = System.currentTimeMillis();
if (_container.isDestroyed()) {
_log.error("Client has already been destroyed - no calls possible");
return false;
@@ -173,26 +176,32 @@ private boolean track(Event event) {
}
event.properties = propertiesResult.getValue();
+ _telemetryEvaluationProducer.recordLatency(MethodEnum.TRACK, System.currentTimeMillis() - initTime);
return _eventClient.track(event, propertiesResult.getEventSize());
}
- private SplitResult getTreatmentWithConfigInternal(String method, String matchingKey, String bucketingKey, String split, Map attributes) {
+ private SplitResult getTreatmentWithConfigInternal(String matchingKey, String bucketingKey, String split, Map attributes, MethodEnum methodEnum) {
+ long initTime = System.currentTimeMillis();
try {
+ if(!_gates.isSDKReady()){
+ _log.warn(methodEnum.getMethod() + ": the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method");
+ _telemetryConfigProducer.recordNonReadyUsage();
+ }
if (_container.isDestroyed()) {
_log.error("Client has already been destroyed - no calls possible");
return SPLIT_RESULT_CONTROL;
}
- if (!KeyValidator.isValid(matchingKey, "matchingKey", _config.maxStringLength(), method)) {
+ if (!KeyValidator.isValid(matchingKey, "matchingKey", _config.maxStringLength(), methodEnum.getMethod())) {
return SPLIT_RESULT_CONTROL;
}
- if (!KeyValidator.bucketingKeyIsValid(bucketingKey, _config.maxStringLength(), method)) {
+ if (!KeyValidator.bucketingKeyIsValid(bucketingKey, _config.maxStringLength(), methodEnum.getMethod())) {
return SPLIT_RESULT_CONTROL;
}
- Optional splitNameResult = SplitNameValidator.isValid(split, method);
+ Optional splitNameResult = SplitNameValidator.isValid(split, methodEnum.getMethod());
if (!splitNameResult.isPresent()) {
return SPLIT_RESULT_CONTROL;
}
@@ -202,7 +211,7 @@ private SplitResult getTreatmentWithConfigInternal(String method, String matchin
EvaluatorImp.TreatmentLabelAndChangeNumber result = _evaluator.evaluateFeature(matchingKey, bucketingKey, split, attributes);
- if (result.treatment.equals(Treatments.CONTROL) && result.label.equals(Labels.DEFINITION_NOT_FOUND) && _gates.isSDKReadyNow()) {
+ if (result.treatment.equals(Treatments.CONTROL) && result.label.equals(Labels.DEFINITION_NOT_FOUND) && _gates.isSDKReady()) {
_log.warn(
"getTreatment: you passed \"" + split + "\" that does not exist in this environment, " +
"please double check what Splits exist in the web console.");
@@ -214,15 +223,16 @@ private SplitResult getTreatmentWithConfigInternal(String method, String matchin
split,
start,
result.treatment,
- String.format("sdk.%s", method),
+ String.format("sdk.%s", methodEnum.getMethod()),
_config.labelsEnabled() ? result.label : null,
result.changeNumber,
attributes
);
-
+ _telemetryEvaluationProducer.recordLatency(methodEnum, System.currentTimeMillis()-initTime);
return new SplitResult(result.treatment, result.configurations);
} catch (Exception e) {
try {
+ _telemetryEvaluationProducer.recordException(methodEnum);
_log.error("CatchAll Exception", e);
} catch (Exception e1) {
// ignore
@@ -235,7 +245,6 @@ private void recordStats(String matchingKey, String bucketingKey, String split,
String operation, String label, Long changeNumber, Map attributes) {
try {
_impressionManager.track(new Impression(matchingKey, bucketingKey, split, result, System.currentTimeMillis(), label, changeNumber, attributes));
- _metrics.time(operation, System.currentTimeMillis() - start);
} catch (Throwable t) {
_log.error("Exception", t);
}
diff --git a/client/src/main/java/io/split/client/SplitFactoryBuilder.java b/client/src/main/java/io/split/client/SplitFactoryBuilder.java
index f18032416..f7f1fea8b 100644
--- a/client/src/main/java/io/split/client/SplitFactoryBuilder.java
+++ b/client/src/main/java/io/split/client/SplitFactoryBuilder.java
@@ -66,7 +66,7 @@ public static SplitFactory local(SplitClientConfig config) throws IOException, U
return LocalhostSplitFactory.createLocalhostSplitFactory(config);
}
- public static void main(String... args) throws IOException, InterruptedException, TimeoutException, URISyntaxException {
+ public static void main(String... args) throws IOException, URISyntaxException {
if (args.length != 1) {
System.out.println("Usage: ");
System.exit(1);
diff --git a/client/src/main/java/io/split/client/SplitFactoryImpl.java b/client/src/main/java/io/split/client/SplitFactoryImpl.java
index 8b576d0dd..1489bafec 100644
--- a/client/src/main/java/io/split/client/SplitFactoryImpl.java
+++ b/client/src/main/java/io/split/client/SplitFactoryImpl.java
@@ -3,12 +3,11 @@
import io.split.client.impressions.AsynchronousImpressionListener;
import io.split.client.impressions.ImpressionListener;
import io.split.client.impressions.ImpressionsManagerImpl;
-import io.split.client.interceptors.AddSplitHeadersFilter;
+import io.split.client.interceptors.AuthorizationInterceptorFilter;
+import io.split.client.interceptors.ClientKeyInterceptorFilter;
import io.split.client.interceptors.GzipDecoderResponseInterceptor;
import io.split.client.interceptors.GzipEncoderRequestInterceptor;
-import io.split.client.metrics.CachedMetrics;
-import io.split.client.metrics.FireAndForgetMetrics;
-import io.split.client.metrics.HttpMetrics;
+import io.split.client.interceptors.SdkMetadataInterceptorFilter;
import io.split.cache.InMemoryCacheImp;
import io.split.cache.SplitCache;
import io.split.engine.evaluator.Evaluator;
@@ -26,6 +25,11 @@
import io.split.cache.SegmentCacheInMemoryImpl;
import io.split.engine.segments.SegmentSynchronizationTaskImp;
import io.split.integrations.IntegrationsConfig;
+import io.split.telemetry.storage.InMemoryTelemetryStorage;
+import io.split.telemetry.storage.TelemetryStorage;
+import io.split.telemetry.synchronizer.TelemetrySubmitter;
+import io.split.telemetry.synchronizer.TelemetrySyncTask;
+import io.split.telemetry.synchronizer.TelemetrySynchronizer;
import org.apache.hc.client5.http.auth.AuthScope;
import org.apache.hc.client5.http.auth.Credentials;
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
@@ -50,10 +54,10 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class SplitFactoryImpl implements SplitFactory {
@@ -67,13 +71,10 @@ public class SplitFactoryImpl implements SplitFactory {
private final URI _eventsRootTarget;
private final CloseableHttpClient _httpclient;
private final SDKReadinessGates _gates;
- private final HttpMetrics _httpMetrics;
- private final FireAndForgetMetrics _unCachedFireAndForget;
private final SegmentSynchronizationTaskImp _segmentSynchronizationTaskImp;
private final SplitFetcher _splitFetcher;
private final SplitSynchronizationTask _splitSynchronizationTask;
private final ImpressionsManagerImpl _impressionsManager;
- private final FireAndForgetMetrics _cachedFireAndForgetMetrics;
private final EventClient _eventClient;
private final SyncManager _syncManager;
private final Evaluator _evaluator;
@@ -89,12 +90,19 @@ public class SplitFactoryImpl implements SplitFactory {
private boolean isTerminated = false;
private final ApiKeyCounter _apiKeyCounter;
+ private final TelemetryStorage _telemetryStorage;
+ private final TelemetrySynchronizer _telemetrySynchronizer;
+ private final TelemetrySyncTask _telemetrySyncTask;
+ private final long _startTime;
public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyntaxException {
+ _startTime = System.currentTimeMillis();
_apiToken = apiToken;
_apiKeyCounter = ApiKeyCounter.getApiKeyCounterInstance();
_apiKeyCounter.add(apiToken);
+ _telemetryStorage = new InMemoryTelemetryStorage();
+
if (config.blockUntilReady() == -1) {
//BlockUntilReady not been set
_log.warn("no setBlockUntilReadyTimeout parameter has been set - incorrect control treatments could be logged” " +
@@ -112,15 +120,11 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
_rootTarget = URI.create(config.endpoint());
_eventsRootTarget = URI.create(config.eventsEndpoint());
- // HttpMetrics
- _httpMetrics = HttpMetrics.create(_httpclient, _eventsRootTarget);
-
// Cache Initialisations
_segmentCache = new SegmentCacheInMemoryImpl();
_splitCache = new InMemoryCacheImp();
+ _telemetrySynchronizer = new TelemetrySubmitter(_httpclient, URI.create(config.telemetryURL()), _telemetryStorage, _splitCache, _segmentCache, _telemetryStorage, _startTime);
- // Metrics
- _unCachedFireAndForget = FireAndForgetMetrics.instance(_httpMetrics, 2, 1000);
// Segments
_segmentSynchronizationTaskImp = buildSegments(config);
@@ -129,36 +133,66 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
_splitFetcher = buildSplitFetcher();
// SplitSynchronizationTask
- _splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher, _splitCache, findPollingPeriod(RANDOM, config.featuresRefreshRate()));
+ _splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher,
+ _splitCache,
+ findPollingPeriod(RANDOM, config.featuresRefreshRate()));
// Impressions
_impressionsManager = buildImpressionsManager(config);
- // CachedFireAndForgetMetrics
- _cachedFireAndForgetMetrics = buildCachedFireAndForgetMetrics(config);
-
// EventClient
- _eventClient = EventClientImpl.create(_httpclient, _eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown());
+ _eventClient = EventClientImpl.create(_httpclient,
+ _eventsRootTarget,
+ config.eventsQueueSize(),
+ config.eventFlushIntervalInMillis(),
+ config.waitBeforeShutdown(),
+ _telemetryStorage);
- // SyncManager
- _syncManager = SyncManagerImp.build(config.streamingEnabled(), _splitSynchronizationTask, _splitFetcher, _segmentSynchronizationTaskImp, _splitCache, config.authServiceURL(), _httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config), _segmentCache);
- _syncManager.start();
+ _telemetrySyncTask = new TelemetrySyncTask(config.get_telemetryRefreshRate(), _telemetrySynchronizer);
// Evaluator
_evaluator = new EvaluatorImp(_splitCache);
// SplitClient
- _client = new SplitClientImpl(this, _splitCache, _impressionsManager, _cachedFireAndForgetMetrics, _eventClient, config, _gates, _evaluator);
+ _client = new SplitClientImpl(this,
+ _splitCache,
+ _impressionsManager,
+ _eventClient,
+ config,
+ _gates,
+ _evaluator,
+ _telemetryStorage,
+ _telemetryStorage);
// SplitManager
- _manager = new SplitManagerImpl(_splitCache, config, _gates);
+ _manager = new SplitManagerImpl(_splitCache, config, _gates, _telemetryStorage);
+
+ // SyncManager
+ _syncManager = SyncManagerImp.build(config.streamingEnabled(),
+ _splitSynchronizationTask,
+ _splitFetcher,
+ _segmentSynchronizationTaskImp,
+ _splitCache,
+ config.authServiceURL(),
+ _httpclient,
+ config.streamingServiceURL(),
+ config.authRetryBackoffBase(),
+ buildSSEdHttpClient(apiToken, config),
+ _segmentCache,
+ config.streamingRetryDelay(),
+ config.streamingFetchMaxRetries(),
+ config.failedAttemptsBeforeLogging(),
+ config.cdnDebugLogging(), _gates, _telemetryStorage, _telemetrySynchronizer,config);
+ _syncManager.start();
// DestroyOnShutDown
if (config.destroyOnShutDown()) {
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ Thread shutdown = new Thread(() -> {
// Using the full path to avoid conflicting with Thread.destroy()
SplitFactoryImpl.this.destroy();
- }));
+ });
+ shutdown.setName("split-destroy-worker");
+ Runtime.getRuntime().addShutdownHook(shutdown);
}
}
@@ -177,22 +211,24 @@ public synchronized void destroy() {
if (!isTerminated) {
_log.info("Shutdown called for split");
try {
- _segmentSynchronizationTaskImp.close();
- _log.info("Successful shutdown of segment fetchers");
- _splitSynchronizationTask.close();
- _log.info("Successful shutdown of splits");
+ long splitCount = _splitCache.getAll().stream().count();
+ long segmentCount = _segmentCache.getAll().stream().count();
+ long segmentKeyCount = _segmentCache.getKeyCount();
_impressionsManager.close();
_log.info("Successful shutdown of impressions manager");
- _unCachedFireAndForget.close();
- _log.info("Successful shutdown of metrics 1");
- _cachedFireAndForgetMetrics.close();
- _log.info("Successful shutdown of metrics 2");
- _httpclient.close();
- _log.info("Successful shutdown of httpclient");
_eventClient.close();
_log.info("Successful shutdown of eventClient");
+ _segmentSynchronizationTaskImp.close();
+ _log.info("Successful shutdown of segment fetchers");
+ _splitSynchronizationTask.close();
+ _log.info("Successful shutdown of splits");
_syncManager.shutdown();
_log.info("Successful shutdown of syncManager");
+ _telemetryStorage.recordSessionLength(System.currentTimeMillis() - _startTime);
+ _telemetrySyncTask.stopScheduledTask(splitCount, segmentCount, segmentKeyCount);
+ _log.info("Successful shutdown of telemetry sync task");
+ _httpclient.close();
+ _log.info("Successful shutdown of httpclient");
} catch (IOException e) {
_log.error("We could not shutdown split", e);
}
@@ -207,7 +243,6 @@ public boolean isDestroyed() {
}
private static CloseableHttpClient buildHttpClient(String apiToken, SplitClientConfig config) {
-
SSLConnectionSocketFactory sslSocketFactory = SSLConnectionSocketFactoryBuilder.create()
.setSslContext(SSLContexts.createSystemDefault())
.setTlsVersions(TLS.V_1_1, TLS.V_1_2)
@@ -230,7 +265,8 @@ private static CloseableHttpClient buildHttpClient(String apiToken, SplitClientC
HttpClientBuilder httpClientbuilder = HttpClients.custom()
.setConnectionManager(cm)
.setDefaultRequestConfig(requestConfig)
- .addRequestInterceptorLast(AddSplitHeadersFilter.instance(apiToken, config.ipAddressEnabled()))
+ .addRequestInterceptorLast(AuthorizationInterceptorFilter.instance(apiToken))
+ .addRequestInterceptorLast(SdkMetadataInterceptorFilter.instance(config.ipAddressEnabled(), SplitClientConfig.splitSdkVersion))
.addRequestInterceptorLast(new GzipEncoderRequestInterceptor())
.addResponseInterceptorLast((new GzipDecoderResponseInterceptor()));
@@ -242,7 +278,7 @@ private static CloseableHttpClient buildHttpClient(String apiToken, SplitClientC
return httpClientbuilder.build();
}
- private static CloseableHttpClient buildSSEdHttpClient(SplitClientConfig config) {
+ private static CloseableHttpClient buildSSEdHttpClient(String apiToken, SplitClientConfig config) {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(Timeout.ofMilliseconds(SSE_CONNECT_TIMEOUT))
.build();
@@ -263,7 +299,9 @@ private static CloseableHttpClient buildSSEdHttpClient(SplitClientConfig config)
HttpClientBuilder httpClientbuilder = HttpClients.custom()
.setConnectionManager(cm)
- .setDefaultRequestConfig(requestConfig);
+ .setDefaultRequestConfig(requestConfig)
+ .addRequestInterceptorLast(SdkMetadataInterceptorFilter.instance(config.ipAddressEnabled(), SplitClientConfig.splitSdkVersion))
+ .addRequestInterceptorLast(ClientKeyInterceptorFilter.instance(apiToken));
// Set up proxy is it exists
if (config.proxy() != null) {
@@ -296,20 +334,21 @@ private static int findPollingPeriod(Random rand, int max) {
}
private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config) throws URISyntaxException {
- SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher.create(_httpclient, _rootTarget, _unCachedFireAndForget);
+ SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher.create(_httpclient, _rootTarget, _telemetryStorage);
return new SegmentSynchronizationTaskImp(segmentChangeFetcher,
findPollingPeriod(RANDOM, config.segmentsRefreshRate()),
config.numThreadsForSegmentFetch(),
_gates,
- _segmentCache);
+ _segmentCache,
+ _telemetryStorage);
}
private SplitFetcher buildSplitFetcher() throws URISyntaxException {
- SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(_httpclient, _rootTarget, _unCachedFireAndForget);
+ SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(_httpclient, _rootTarget, _telemetryStorage);
SplitParser splitParser = new SplitParser(_segmentSynchronizationTaskImp, _segmentCache);
- return new SplitFetcherImp(splitChangeFetcher, splitParser, _gates, _splitCache);
+ return new SplitFetcherImp(splitChangeFetcher, splitParser, _splitCache, _telemetryStorage);
}
private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config) throws URISyntaxException {
@@ -324,12 +363,6 @@ private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config)
.collect(Collectors.toCollection(() -> impressionListeners));
}
- return ImpressionsManagerImpl.instance(_httpclient, config, impressionListeners);
- }
-
- private FireAndForgetMetrics buildCachedFireAndForgetMetrics(SplitClientConfig config) {
- CachedMetrics cachedMetrics = new CachedMetrics(_httpMetrics, TimeUnit.SECONDS.toMillis(config.metricsRefreshRate()));
-
- return FireAndForgetMetrics.instance(cachedMetrics, 2, 1000);
+ return ImpressionsManagerImpl.instance(_httpclient, config, impressionListeners, _telemetryStorage);
}
}
diff --git a/client/src/main/java/io/split/client/SplitManagerImpl.java b/client/src/main/java/io/split/client/SplitManagerImpl.java
index 5304b5911..0edd88aaa 100644
--- a/client/src/main/java/io/split/client/SplitManagerImpl.java
+++ b/client/src/main/java/io/split/client/SplitManagerImpl.java
@@ -6,6 +6,7 @@
import io.split.cache.SplitCache;
import io.split.engine.experiments.ParsedSplit;
import io.split.inputValidation.SplitNameValidator;
+import io.split.telemetry.storage.TelemetryConfigProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,18 +27,25 @@ public class SplitManagerImpl implements SplitManager {
private final SplitCache _splitCache;
private final SplitClientConfig _config;
private final SDKReadinessGates _gates;
+ private final TelemetryConfigProducer _telemetryConfigProducer;
public SplitManagerImpl(SplitCache splitCache,
SplitClientConfig config,
- SDKReadinessGates gates) {
+ SDKReadinessGates gates,
+ TelemetryConfigProducer telemetryConfigProducer) {
_config = Preconditions.checkNotNull(config);
_splitCache = Preconditions.checkNotNull(splitCache);
_gates = Preconditions.checkNotNull(gates);
+ _telemetryConfigProducer = telemetryConfigProducer;
}
@Override
public List splits() {
+ if (!_gates.isSDKReady()) { {
+ _log.warn("splits: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method");
+ _telemetryConfigProducer.recordNonReadyUsage();
+ }}
List result = new ArrayList<>();
Collection parsedSplits = _splitCache.getAll();
for (ParsedSplit split : parsedSplits) {
@@ -49,6 +57,10 @@ public List splits() {
@Override
public SplitView split(String featureName) {
+ if (!_gates.isSDKReady()) { {
+ _log.warn("split: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method");
+ _telemetryConfigProducer.recordNonReadyUsage();
+ }}
Optional result = SplitNameValidator.isValid(featureName, "split");
if (!result.isPresent()) {
return null;
@@ -57,7 +69,7 @@ public SplitView split(String featureName) {
ParsedSplit parsedSplit = _splitCache.get(featureName);
if (parsedSplit == null) {
- if (_gates.isSDKReadyNow()) {
+ if (_gates.isSDKReady()) {
_log.warn("split: you passed \"" + featureName + "\" that does not exist in this environment, " +
"please double check what Splits exist in the web console.");
}
@@ -69,6 +81,10 @@ public SplitView split(String featureName) {
@Override
public List splitNames() {
+ if (!_gates.isSDKReady()) { {
+ _log.warn("splitNames: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method");
+ _telemetryConfigProducer.recordNonReadyUsage();
+ }}
List result = new ArrayList<>();
Collection parsedSplits = _splitCache.getAll();
for (ParsedSplit split : parsedSplits) {
@@ -83,7 +99,8 @@ public void blockUntilReady() throws TimeoutException, InterruptedException {
if (_config.blockUntilReady() <= 0) {
throw new IllegalArgumentException("setBlockUntilReadyTimeout must be positive but in config was: " + _config.blockUntilReady());
}
- if (!_gates.isSDKReady(_config.blockUntilReady())) {
+ if (!_gates.waitUntilInternalReady(_config.blockUntilReady())) {
+ _telemetryConfigProducer.recordBURTimeout();
throw new TimeoutException("SDK was not ready in " + _config.blockUntilReady()+ " milliseconds");
}
}
diff --git a/client/src/main/java/io/split/client/YamlLocalhostSplitFile.java b/client/src/main/java/io/split/client/YamlLocalhostSplitFile.java
index 926bab165..b9ece01c5 100644
--- a/client/src/main/java/io/split/client/YamlLocalhostSplitFile.java
+++ b/client/src/main/java/io/split/client/YamlLocalhostSplitFile.java
@@ -13,7 +13,7 @@
public class YamlLocalhostSplitFile extends AbstractLocalhostSplitFile {
- private static final Logger _log = LoggerFactory.getLogger(LegacyLocalhostSplitFile.class);
+ private static final Logger _log = LoggerFactory.getLogger(YamlLocalhostSplitFile.class);
public YamlLocalhostSplitFile(LocalhostSplitFactory localhostSplitFactory, String directory, String filenameYaml) throws IOException {
super(localhostSplitFactory, directory, filenameYaml);
diff --git a/client/src/main/java/io/split/client/impressions/AsynchronousImpressionListener.java b/client/src/main/java/io/split/client/impressions/AsynchronousImpressionListener.java
index ed6a1e811..66f1d17d4 100644
--- a/client/src/main/java/io/split/client/impressions/AsynchronousImpressionListener.java
+++ b/client/src/main/java/io/split/client/impressions/AsynchronousImpressionListener.java
@@ -44,12 +44,7 @@ public AsynchronousImpressionListener(ImpressionListener delegate, ExecutorServi
@Override
public void log(final Impression impression) {
try {
- _executor.execute(new Runnable() {
- @Override
- public void run() {
- _delegate.log(impression);
- }
- });
+ _executor.execute(() -> _delegate.log(impression));
}
catch (Exception e) {
_log.warn("Unable to send impression to impression listener", e);
diff --git a/client/src/main/java/io/split/client/impressions/HttpImpressionsSender.java b/client/src/main/java/io/split/client/impressions/HttpImpressionsSender.java
index 336e7f2f5..017ef45a3 100644
--- a/client/src/main/java/io/split/client/impressions/HttpImpressionsSender.java
+++ b/client/src/main/java/io/split/client/impressions/HttpImpressionsSender.java
@@ -5,10 +5,15 @@
import io.split.client.dtos.TestImpressions;
import io.split.client.utils.Utils;
+import io.split.telemetry.domain.enums.HTTPLatenciesEnum;
+import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
+import io.split.telemetry.domain.enums.ResourceEnum;
+import io.split.telemetry.storage.TelemetryRuntimeProducer;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.core5.http.HttpEntity;
+import org.apache.hc.core5.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -18,6 +23,8 @@
import java.util.HashMap;
import java.util.List;
+import static com.google.common.base.Preconditions.checkNotNull;
+
/**
* Created by patricioe on 6/20/16.
*/
@@ -33,26 +40,29 @@ public class HttpImpressionsSender implements ImpressionsSender {
private final URI _impressionBulkTarget;
private final URI _impressionCountTarget;
private final ImpressionsManager.Mode _mode;
+ private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
- public static HttpImpressionsSender create(CloseableHttpClient client, URI eventsRootEndpoint, ImpressionsManager.Mode mode) throws URISyntaxException {
+ public static HttpImpressionsSender create(CloseableHttpClient client, URI eventsRootEndpoint, ImpressionsManager.Mode mode, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
return new HttpImpressionsSender(client,
Utils.appendPath(eventsRootEndpoint, BULK_ENDPOINT_PATH),
Utils.appendPath(eventsRootEndpoint, COUNT_ENDPOINT_PATH),
- mode);
+ mode,
+ telemetryRuntimeProducer);
}
- private HttpImpressionsSender(CloseableHttpClient client, URI impressionBulkTarget, URI impressionCountTarget, ImpressionsManager.Mode mode) {
+ private HttpImpressionsSender(CloseableHttpClient client, URI impressionBulkTarget, URI impressionCountTarget, ImpressionsManager.Mode mode, TelemetryRuntimeProducer telemetryRuntimeProducer) {
_client = client;
_mode = mode;
_impressionBulkTarget = impressionBulkTarget;
_impressionCountTarget = impressionCountTarget;
+ _telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
}
@Override
public void postImpressionsBulk(List impressions) {
CloseableHttpResponse response = null;
-
+ long initTime = System.currentTimeMillis();
try {
HttpEntity entity = Utils.toJsonEntity(impressions);
@@ -64,13 +74,16 @@ public void postImpressionsBulk(List impressions) {
int status = response.getCode();
- if (status < 200 || status >= 300) {
+ if (status < HttpStatus.SC_OK || status >= HttpStatus.SC_MULTIPLE_CHOICES) {
+ _telemetryRuntimeProducer.recordSyncError(ResourceEnum.IMPRESSION_SYNC, status);
_logger.warn("Response status was: " + status);
}
+ _telemetryRuntimeProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.IMPRESSIONS, System.currentTimeMillis());
} catch (Throwable t) {
_logger.warn("Exception when posting impressions" + impressions, t);
} finally {
+ _telemetryRuntimeProducer.recordSyncLatency(HTTPLatenciesEnum.IMPRESSIONS, System.currentTimeMillis() - initTime);
Utils.forceClose(response);
}
@@ -78,6 +91,7 @@ public void postImpressionsBulk(List impressions) {
@Override
public void postCounters(HashMap raw) {
+ long initTime = System.currentTimeMillis();
if (_mode.equals(ImpressionsManager.Mode.DEBUG)) {
_logger.warn("Attempted to submit counters in impressions debugging mode. Ignoring");
return;
@@ -87,9 +101,12 @@ public void postCounters(HashMap raw) {
request.setEntity(Utils.toJsonEntity(ImpressionCount.fromImpressionCounterData(raw)));
try (CloseableHttpResponse response = _client.execute(request)) {
int status = response.getCode();
- if (status < 200 || status >= 300) {
+ if (status < HttpStatus.SC_OK || status >= HttpStatus.SC_MULTIPLE_CHOICES) {
+ _telemetryRuntimeProducer.recordSyncError(ResourceEnum.IMPRESSION_COUNT_SYNC, status);
_logger.warn("Response status was: " + status);
}
+ _telemetryRuntimeProducer.recordSyncLatency(HTTPLatenciesEnum.IMPRESSIONS_COUNT, System.currentTimeMillis() - initTime);
+ _telemetryRuntimeProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.IMPRESSIONS_COUNT, System.currentTimeMillis());
} catch (IOException exc) {
_logger.warn("Exception when posting impression counters: ", exc);
}
diff --git a/client/src/main/java/io/split/client/impressions/ImpressionsManagerImpl.java b/client/src/main/java/io/split/client/impressions/ImpressionsManagerImpl.java
index ef20caff6..e32455fdf 100644
--- a/client/src/main/java/io/split/client/impressions/ImpressionsManagerImpl.java
+++ b/client/src/main/java/io/split/client/impressions/ImpressionsManagerImpl.java
@@ -5,6 +5,8 @@
import io.split.client.SplitClientConfig;
import io.split.client.dtos.KeyImpression;
import io.split.client.dtos.TestImpressions;
+import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum;
+import io.split.telemetry.storage.TelemetryRuntimeProducer;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,36 +43,41 @@ public class ImpressionsManagerImpl implements ImpressionsManager, Closeable {
private final ImpressionCounter _counter;
private final ImpressionListener _listener;
private final ImpressionsManager.Mode _mode;
+ private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
public static ImpressionsManagerImpl instance(CloseableHttpClient client,
SplitClientConfig config,
- List listeners) throws URISyntaxException {
- return new ImpressionsManagerImpl(client, config, null, listeners);
+ List listeners,
+ TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
+ return new ImpressionsManagerImpl(client, config, null, listeners, telemetryRuntimeProducer);
}
public static ImpressionsManagerImpl instanceForTest(CloseableHttpClient client,
SplitClientConfig config,
ImpressionsSender impressionsSender,
- List listeners) throws URISyntaxException {
- return new ImpressionsManagerImpl(client, config, impressionsSender, listeners);
+ List listeners,
+ TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
+ return new ImpressionsManagerImpl(client, config, impressionsSender, listeners, telemetryRuntimeProducer);
}
private ImpressionsManagerImpl(CloseableHttpClient client,
SplitClientConfig config,
ImpressionsSender impressionsSender,
- List listeners) throws URISyntaxException {
+ List listeners,
+ TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
_config = checkNotNull(config);
_mode = checkNotNull(config.impressionsMode());
+ _telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
_storage = new InMemoryImpressionsStorage(config.impressionsQueueSize());
_impressionObserver = new ImpressionObserver(LAST_SEEN_CACHE_SIZE);
_counter = new ImpressionCounter();
_impressionsSender = (null != impressionsSender) ? impressionsSender
- : HttpImpressionsSender.create(client, URI.create(config.eventsEndpoint()), _mode);
+ : HttpImpressionsSender.create(client, URI.create(config.eventsEndpoint()), _mode, telemetryRuntimeProducer);
_scheduler = buildExecutor();
- _scheduler.scheduleAtFixedRate(this::sendImpressions, BULK_INITIAL_DELAY_SECONDS,config.impressionsRefreshRate(), TimeUnit.SECONDS);
+ _scheduler.scheduleAtFixedRate(this::sendImpressions, BULK_INITIAL_DELAY_SECONDS, config.impressionsRefreshRate(), TimeUnit.SECONDS);
if (Mode.OPTIMIZED.equals(_mode)) {
_scheduler.scheduleAtFixedRate(this::sendImpressionCounters, COUNT_INITIAL_DELAY_SECONDS, COUNT_REFRESH_RATE_SECONDS, TimeUnit.SECONDS);
}
@@ -97,9 +104,15 @@ public void track(Impression impression) {
_counter.inc(impression.split(), impression.time(), 1);
}
- if (Mode.DEBUG.equals(_mode) || shouldQueueImpression(impression)) {
- _storage.put(KeyImpression.fromImpression(impression));
+ if (Mode.OPTIMIZED.equals(_mode) && !shouldQueueImpression(impression)) {
+ _telemetryRuntimeProducer.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DEDUPED, 1);
+ return;
+ }
+ if (!_storage.put(KeyImpression.fromImpression(impression))) {
+ _telemetryRuntimeProducer.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, 1);
+ return;
}
+ _telemetryRuntimeProducer.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_QUEUED, 1);
}
@Override
@@ -129,14 +142,14 @@ public void close() {
}
_impressionsSender.postImpressionsBulk(TestImpressions.fromKeyImpressions(impressions));
- if(_config.debugEnabled()) {
+ if (_config.debugEnabled()) {
_log.info(String.format("Posting %d Split impressions took %d millis",
impressions.size(), (System.currentTimeMillis() - start)));
}
}
@VisibleForTesting
- /* package private */ void sendImpressionCounters() {
+ /* package private */ void sendImpressionCounters() {
if (!_counter.isEmpty()) {
_impressionsSender.postCounters(_counter.popAll());
}
diff --git a/client/src/main/java/io/split/client/interceptors/AddSplitHeadersFilter.java b/client/src/main/java/io/split/client/interceptors/AddSplitHeadersFilter.java
deleted file mode 100644
index 3367ac53d..000000000
--- a/client/src/main/java/io/split/client/interceptors/AddSplitHeadersFilter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package io.split.client.interceptors;
-
-import io.split.client.SplitClientConfig;
-import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.HttpException;
-import org.apache.hc.core5.http.HttpRequest;
-import org.apache.hc.core5.http.HttpRequestInterceptor;
-import org.apache.hc.core5.http.protocol.HttpContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Created by adilaijaz on 5/22/15.
- */
-public class AddSplitHeadersFilter implements HttpRequestInterceptor {
- private static final Logger _log = LoggerFactory.getLogger(AddSplitHeadersFilter.class);
-
- /* package private for testing purposes */
- static final String AUTHORIZATION_HEADER = "Authorization";
- static final String CLIENT_MACHINE_NAME_HEADER = "SplitSDKMachineName";
- static final String CLIENT_MACHINE_IP_HEADER = "SplitSDKMachineIP";
- static final String CLIENT_VERSION = "SplitSDKVersion";
-
- private final String _apiTokenBearer;
- private final String _hostname;
- private final String _ip;
-
- public static AddSplitHeadersFilter instance(String apiToken, boolean ipAddressEnabled) {
- if (!ipAddressEnabled) {
- return new AddSplitHeadersFilter(apiToken, null, null);
- }
-
- String hostname = null;
- String ip = null;
-
- try {
- InetAddress localHost = InetAddress.getLocalHost();
- hostname = localHost.getHostName();
- ip = localHost.getHostAddress();
- } catch (Exception e) {
- _log.error("Could not resolve InetAddress", e);
- }
-
- return new AddSplitHeadersFilter(apiToken, hostname, ip);
- }
-
- private AddSplitHeadersFilter(String apiToken, String hostname, String ip) {
- checkNotNull(apiToken);
-
- _apiTokenBearer = "Bearer " + apiToken;
- _hostname = hostname;
- _ip = ip;
- }
-
- @Override
- public void process(HttpRequest request, EntityDetails entity, HttpContext context) throws HttpException, IOException {
- request.addHeader(AUTHORIZATION_HEADER, _apiTokenBearer);
- request.addHeader(CLIENT_VERSION, SplitClientConfig.splitSdkVersion);
-
- if (_hostname != null) {
- request.addHeader(CLIENT_MACHINE_NAME_HEADER, _hostname);
- }
-
- if (_ip != null) {
- request.addHeader(CLIENT_MACHINE_IP_HEADER, _ip);
- }
- }
-}
diff --git a/client/src/main/java/io/split/client/interceptors/AuthorizationInterceptorFilter.java b/client/src/main/java/io/split/client/interceptors/AuthorizationInterceptorFilter.java
new file mode 100644
index 000000000..43b50949c
--- /dev/null
+++ b/client/src/main/java/io/split/client/interceptors/AuthorizationInterceptorFilter.java
@@ -0,0 +1,30 @@
+package io.split.client.interceptors;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpRequestInterceptor;
+import org.apache.hc.core5.http.protocol.HttpContext;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class AuthorizationInterceptorFilter implements HttpRequestInterceptor {
+ static final String AUTHORIZATION_HEADER = "Authorization";
+
+ private final String _apiTokenBearer;
+
+ public static AuthorizationInterceptorFilter instance(String apiToken) {
+ return new AuthorizationInterceptorFilter(apiToken);
+ }
+
+ private AuthorizationInterceptorFilter(String apiToken) {
+ _apiTokenBearer = "Bearer " + checkNotNull(apiToken);
+ }
+
+ @Override
+ public void process(HttpRequest httpRequest, EntityDetails entityDetails, HttpContext httpContext) throws HttpException, IOException {
+ httpRequest.addHeader(AUTHORIZATION_HEADER, _apiTokenBearer);
+ }
+}
diff --git a/client/src/main/java/io/split/client/interceptors/ClientKeyInterceptorFilter.java b/client/src/main/java/io/split/client/interceptors/ClientKeyInterceptorFilter.java
new file mode 100644
index 000000000..ad3c82594
--- /dev/null
+++ b/client/src/main/java/io/split/client/interceptors/ClientKeyInterceptorFilter.java
@@ -0,0 +1,32 @@
+package io.split.client.interceptors;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpRequestInterceptor;
+import org.apache.hc.core5.http.protocol.HttpContext;
+
+import java.io.IOException;
+
+public class ClientKeyInterceptorFilter implements HttpRequestInterceptor {
+ static final String CLIENT_KEY = "SplitSDKClientKey";
+
+ private final String _clientKey;
+
+ public static ClientKeyInterceptorFilter instance(String apiToken) {
+ return new ClientKeyInterceptorFilter(getKey(apiToken));
+ }
+
+ private ClientKeyInterceptorFilter(String clientKey) {
+ _clientKey = clientKey;
+ }
+
+ @Override
+ public void process(HttpRequest httpRequest, EntityDetails entityDetails, HttpContext httpContext) throws HttpException, IOException {
+ httpRequest.addHeader(CLIENT_KEY, _clientKey);
+ }
+
+ private static String getKey(String clientKey) {
+ return clientKey.length() >4 ? clientKey.substring(clientKey.length() - 4) : clientKey;
+ }
+}
diff --git a/client/src/main/java/io/split/client/interceptors/SdkMetadataInterceptorFilter.java b/client/src/main/java/io/split/client/interceptors/SdkMetadataInterceptorFilter.java
new file mode 100644
index 000000000..6cc059579
--- /dev/null
+++ b/client/src/main/java/io/split/client/interceptors/SdkMetadataInterceptorFilter.java
@@ -0,0 +1,61 @@
+package io.split.client.interceptors;
+
+import io.split.client.SplitClientConfig;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpRequestInterceptor;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+public class SdkMetadataInterceptorFilter implements HttpRequestInterceptor {
+ private static final Logger _log = LoggerFactory.getLogger(SdkMetadataInterceptorFilter.class);
+
+ static final String CLIENT_MACHINE_NAME_HEADER = "SplitSDKMachineName";
+ static final String CLIENT_MACHINE_IP_HEADER = "SplitSDKMachineIP";
+ static final String CLIENT_VERSION = "SplitSDKVersion";
+
+ private final String _hostname;
+ private final String _ip;
+ private final String _sdkVersion;
+
+ public static SdkMetadataInterceptorFilter instance(boolean ipAddressEnabled, String sdkVersion) {
+ String hostName = null;
+ String ip = null;
+
+ if (ipAddressEnabled) {
+ try {
+ InetAddress localHost = InetAddress.getLocalHost();
+ hostName = localHost.getHostName();
+ ip = localHost.getHostAddress();
+ } catch (Exception e) {
+ _log.error("Could not resolve InetAddress", e);
+ }
+ }
+
+ return new SdkMetadataInterceptorFilter(hostName, ip, sdkVersion);
+ }
+
+ private SdkMetadataInterceptorFilter(String hostName, String ip, String sdkVersion) {
+ _sdkVersion = sdkVersion;
+ _hostname = hostName;
+ _ip = ip;
+ }
+
+ @Override
+ public void process(HttpRequest httpRequest, EntityDetails entityDetails, HttpContext httpContext) throws HttpException, IOException {
+ httpRequest.addHeader(CLIENT_VERSION, SplitClientConfig.splitSdkVersion);
+
+ if (_hostname != null) {
+ httpRequest.addHeader(CLIENT_MACHINE_NAME_HEADER, _hostname);
+ }
+
+ if (_ip != null) {
+ httpRequest.addHeader(CLIENT_MACHINE_IP_HEADER, _ip);
+ }
+ }
+}
diff --git a/client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java b/client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java
index e5d49e115..0fcbea305 100644
--- a/client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java
+++ b/client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java
@@ -3,6 +3,7 @@
import io.split.cache.SegmentCache;
import io.split.cache.SplitCache;
import io.split.client.SplitClient;
+import io.split.engine.common.FetchOptions;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.segments.SegmentFetcher;
import io.split.engine.segments.SegmentSynchronizationTask;
@@ -34,7 +35,7 @@ public SplitJmxMonitor(SplitClient splitClient, SplitFetcher featureFetcher, Spl
@Override
public boolean forceSyncFeatures() {
- _featureFetcher.forceRefresh(true);
+ _featureFetcher.forceRefresh(new FetchOptions.Builder().cacheControlHeaders(true).build());
_log.info("Features successfully refreshed via JMX");
return true;
}
@@ -43,7 +44,7 @@ public boolean forceSyncFeatures() {
public boolean forceSyncSegment(String segmentName) {
SegmentFetcher fetcher = _segmentSynchronizationTask.getFetcher(segmentName);
try{
- fetcher.fetch(true);
+ fetcher.fetch(new FetchOptions.Builder().build());
}
//We are sure this will never happen because getFetcher firts initiate the segment. This try/catch is for safe only.
catch (NullPointerException np){
diff --git a/client/src/main/java/io/split/client/metrics/BinarySearchLatencyTracker.java b/client/src/main/java/io/split/client/metrics/BinarySearchLatencyTracker.java
deleted file mode 100644
index 35efff10f..000000000
--- a/client/src/main/java/io/split/client/metrics/BinarySearchLatencyTracker.java
+++ /dev/null
@@ -1,131 +0,0 @@
-package io.split.client.metrics;
-
-import java.util.Arrays;
-
-/**
- * Tracks latencies pero bucket of time.
- * Each bucket represent a latency greater than the one before
- * and each number within each bucket is a number of calls in the range.
- *
- * (1) 1.00
- * (2) 1.50
- * (3) 2.25
- * (4) 3.38
- * (5) 5.06
- * (6) 7.59
- * (7) 11.39
- * (8) 17.09
- * (9) 25.63
- * (10) 38.44
- * (11) 57.67
- * (12) 86.50
- * (13) 129.75
- * (14) 194.62
- * (15) 291.93
- * (16) 437.89
- * (17) 656.84
- * (18) 985.26
- * (19) 1,477.89
- * (20) 2,216.84
- * (21) 3,325.26
- * (22) 4,987.89
- * (23) 7,481.83
- *
- * Thread-safety: This class is not thread safe.
- *
- * Created by patricioe on 2/10/16.
- */
-public class BinarySearchLatencyTracker implements ILatencyTracker {
-
- static final long[] BUCKETS = {
- 1000, 1500, 2250, 3375, 5063,
- 7594, 11391, 17086, 25629, 38443,
- 57665, 86498, 129746, 194620, 291929,
- 437894, 656841, 985261, 1477892, 2216838,
- 3325257, 4987885, 7481828
- };
-
- static final long MAX_LATENCY = 7481828;
-
- long[] latencies = new long[BUCKETS.length];
-
- /**
- * Increment the internal counter for the bucket this latency falls into.
- *
- * @param millis
- */
- public void addLatencyMillis(long millis) {
- int index = findIndex(millis * 1000);
- latencies[index]++;
- }
-
- /**
- * Increment the internal counter for the bucket this latency falls into.
- *
- * @param micros
- */
- public void addLatencyMicros(long micros) {
- int index = findIndex(micros);
- latencies[index]++;
- }
-
- /**
- * Returns the list of latencies buckets as an array.
- *
- * @return the list of latencies buckets as an array.
- */
- public long[] getLatencies() {
- return latencies;
- }
-
- @Override
- public long getLatency(int index) {
- return latencies[index];
- }
-
- public void clear() {
- latencies = new long[BUCKETS.length];
- }
-
- /**
- * Returns the counts in the bucket this latency falls into.
- * The latencies will no be updated.
- *
- * @param latency
- * @return the bucket content for the latency.
- */
- public long getBucketForLatencyMillis(long latency) {
- return latencies[findIndex(latency * 1000)];
- }
-
- /**
- * Returns the counts in the bucket this latency falls into.
- * The latencies will no be updated.
- *
- * @param latency
- * @return the bucket content for the latency.
- */
- public long getBucketForLatencyMicros(long latency) {
- return latencies[findIndex(latency)];
- }
-
-
- private int findIndex(long micros) {
- if (micros > MAX_LATENCY) {
- return BUCKETS.length - 1;
- }
-
- int index = Arrays.binarySearch(BUCKETS, micros);
-
- if (index < 0) {
-
- // Adjust the index based on Java Array javadocs. <0 means the value wasn't found and it's module value
- // is where it should be inserted (in this case, it means the counter it applies - unless it's equals to the
- // length of the array).
-
- index = -(index + 1);
- }
- return index;
- }
-
-}
diff --git a/client/src/main/java/io/split/client/metrics/CachedMetrics.java b/client/src/main/java/io/split/client/metrics/CachedMetrics.java
deleted file mode 100644
index 41f066a38..000000000
--- a/client/src/main/java/io/split/client/metrics/CachedMetrics.java
+++ /dev/null
@@ -1,142 +0,0 @@
-
-package io.split.client.metrics;
-
-import com.google.common.collect.Maps;
-import com.google.common.primitives.Longs;
-import io.split.client.dtos.Counter;
-import io.split.client.dtos.Latency;
-import io.split.engine.metrics.Metrics;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-
-/**
- * Created by adilaijaz on 9/4/15.
- */
-public class CachedMetrics implements Metrics {
-
- private final DTOMetrics _metrics;
-
- private final Map _latencyMap;
- private final Map _countMap;
-
-
- private final Object _latencyLock = new Object();
- private AtomicLong _latencyLastUpdateTimeMillis = new AtomicLong(System.currentTimeMillis());
-
- private final Object _counterLock = new Object();
- private AtomicLong _counterLastUpdateTimeMillis = new AtomicLong(System.currentTimeMillis());
-
- private long _refreshPeriodInMillis;
-
- private final int _queueForTheseManyCalls;
-
- /**
- * For unit testing only.
- *
- * @param httpMetrics
- * @param queueForTheseManyCalls
- */
- /*package private*/ CachedMetrics(DTOMetrics httpMetrics, int queueForTheseManyCalls) {
- this(httpMetrics, queueForTheseManyCalls, TimeUnit.MINUTES.toMillis(1));
- }
-
- public CachedMetrics(DTOMetrics httpMetrics, long refreshPeriodInMillis) {
- this(httpMetrics, 100, refreshPeriodInMillis);
- }
-
- private CachedMetrics(DTOMetrics metrics, int queueForTheseManyCalls, long refreshPeriodInMillis) {
- _metrics = metrics;
- _latencyMap = Maps.newHashMap();
- _countMap = Maps.newHashMap();
- checkArgument(queueForTheseManyCalls > 0, "queue for cache should be greater than zero");
- _queueForTheseManyCalls = queueForTheseManyCalls;
- _refreshPeriodInMillis = refreshPeriodInMillis;
- }
-
- @Override
- public void count(String counter, long delta) {
- if (delta <= 0) {
- return;
- }
-
- if (counter == null || counter.trim().isEmpty()) {
- return;
- }
-
- synchronized (_counterLock) {
- SumAndCount sumAndCount = _countMap.get(counter);
- if (sumAndCount == null) {
- sumAndCount = new SumAndCount();
- _countMap.put(counter, sumAndCount);
- }
-
- sumAndCount.addDelta(delta);
-
- if (sumAndCount._count >= _queueForTheseManyCalls || hasTimeElapsed(_counterLastUpdateTimeMillis)) {
- Counter dto = new Counter();
- dto.name = counter;
- dto.delta = sumAndCount._sum;
-
- sumAndCount.clear();
- _counterLastUpdateTimeMillis.set(System.currentTimeMillis());
- _metrics.count(dto);
- }
- }
- }
-
- private boolean hasTimeElapsed(AtomicLong lastRefreshTime) {
- return (System.currentTimeMillis() - lastRefreshTime.get()) > _refreshPeriodInMillis;
- }
-
- @Override
- public void time(String operation, long timeInMs) {
- if (operation == null || operation.trim().isEmpty() || timeInMs < 0L) {
- // error
- return;
- }
- synchronized (_latencyLock) {
- if (!_latencyMap.containsKey(operation)) {
- ILatencyTracker latencies = new BinarySearchLatencyTracker();
- _latencyMap.put(operation, latencies);
- }
-
- ILatencyTracker tracker = _latencyMap.get(operation);
- tracker.addLatencyMillis((int) timeInMs);
-
- if (hasTimeElapsed(_latencyLastUpdateTimeMillis)) {
-
- Latency dto = new Latency();
- dto.name = operation;
- dto.latencies = Longs.asList(tracker.getLatencies());
-
- tracker.clear();
- _latencyLastUpdateTimeMillis.set(System.currentTimeMillis());
- _metrics.time(dto);
-
- }
- }
- }
-
-
- private static final class SumAndCount {
- private int _count = 0;
- private long _sum = 0L;
-
- public void addDelta(long delta) {
- _count++;
- _sum += delta;
- }
-
- public void clear() {
- _count = 0;
- _sum = 0L;
- }
-
- }
-
-}
diff --git a/client/src/main/java/io/split/client/metrics/DTOMetrics.java b/client/src/main/java/io/split/client/metrics/DTOMetrics.java
deleted file mode 100644
index 86c793cb6..000000000
--- a/client/src/main/java/io/split/client/metrics/DTOMetrics.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package io.split.client.metrics;
-
-import io.split.client.dtos.Counter;
-import io.split.client.dtos.Latency;
-
-/**
- * Created by adilaijaz on 6/14/16.
- */
-public interface DTOMetrics {
- void time(Latency dto);
-
- void count(Counter dto);
-}
diff --git a/client/src/main/java/io/split/client/metrics/FireAndForgetMetrics.java b/client/src/main/java/io/split/client/metrics/FireAndForgetMetrics.java
deleted file mode 100644
index 43aa702de..000000000
--- a/client/src/main/java/io/split/client/metrics/FireAndForgetMetrics.java
+++ /dev/null
@@ -1,123 +0,0 @@
-package io.split.client.metrics;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import io.split.engine.metrics.Metrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Created by adilaijaz on 9/4/15.
- */
-public class FireAndForgetMetrics implements Metrics, Closeable {
-
- private static final Logger _log = LoggerFactory.getLogger(FireAndForgetMetrics.class);
-
- private final ExecutorService _executorService;
- private final Metrics _delegate;
-
- public static FireAndForgetMetrics instance(Metrics delegate, int numberOfThreads, int queueSize) {
- ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
- threadFactoryBuilder.setDaemon(true);
- threadFactoryBuilder.setNameFormat("split-fireAndForgetMetrics-%d");
- threadFactoryBuilder.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- _log.error("Error in thread: " + t.getName(), e);
- }
- });
-
- final ExecutorService executorService = new ThreadPoolExecutor(numberOfThreads,
- numberOfThreads,
- 0L,
- TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue(queueSize),
- threadFactoryBuilder.build(),
- new ThreadPoolExecutor.DiscardPolicy());
-
-
- return new FireAndForgetMetrics(delegate, executorService);
- }
-
- private FireAndForgetMetrics(Metrics delegate, ExecutorService executorService) {
- _delegate = delegate;
- _executorService = executorService;
- }
-
-
- @Override
- public void count(String counter, long delta) {
- try {
- _executorService.submit(new CountRunnable(_delegate, counter, delta));
- } catch (Throwable t) {
- _log.warn("CountRunnable failed", t);
- }
- }
-
- @Override
- public void time(String operation, long timeInMs) {
- try {
- _executorService.submit(new TimeRunnable(_delegate, operation, timeInMs));
- } catch (Throwable t) {
- _log.warn("TimeRunnable failed", t);
- }
- }
-
- public void close() {
- _executorService.shutdown();
- try {
- if (!_executorService.awaitTermination(10L, TimeUnit.SECONDS)) { //optional *
- _log.info("Executor did not terminate in the specified time.");
- List droppedTasks = _executorService.shutdownNow(); //optional **
- _log.info("Executor was abruptly shut down. These tasks will not be executed: " + droppedTasks);
- }
- } catch (InterruptedException e) {
- // reset the interrupt.
- Thread.currentThread().interrupt();
- }
- }
-
-
- private static final class CountRunnable implements Runnable {
-
- private final Metrics _delegate;
- private final String _name;
- private final long _delta;
-
- public CountRunnable(Metrics delegate, String name, long delta) {
- _delegate = delegate;
- _name = name;
- _delta = delta;
- }
-
- @Override
- public void run() {
- _delegate.count(_name, _delta);
- }
- }
-
- private static final class TimeRunnable implements Runnable {
-
- private final Metrics _delegate;
- private final String _name;
- private final long _timeInMs;
-
- public TimeRunnable(Metrics delegate, String name, long timeInMs) {
- _delegate = delegate;
- _name = name;
- _timeInMs = timeInMs;
- }
-
- @Override
- public void run() {
- _delegate.time(_name, _timeInMs);
- }
- }
-
-}
diff --git a/client/src/main/java/io/split/client/metrics/HttpMetrics.java b/client/src/main/java/io/split/client/metrics/HttpMetrics.java
deleted file mode 100644
index 14d63b0f4..000000000
--- a/client/src/main/java/io/split/client/metrics/HttpMetrics.java
+++ /dev/null
@@ -1,135 +0,0 @@
-package io.split.client.metrics;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import io.split.client.dtos.Counter;
-import io.split.client.dtos.Latency;
-import io.split.client.utils.Utils;
-import io.split.engine.metrics.Metrics;
-import org.apache.hc.client5.http.classic.methods.HttpPost;
-import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
-import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
-import org.apache.hc.core5.http.HttpEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-/**
- * Created by adilaijaz on 9/4/15.
- */
-public class HttpMetrics implements Metrics, DTOMetrics {
- private static final Logger _log = LoggerFactory.getLogger(HttpMetrics.class);
-
- private final CloseableHttpClient _client;
- private final URI _timeTarget;
- private final URI _counterTarget;
-
-
- public static HttpMetrics create(CloseableHttpClient client, URI root) throws URISyntaxException {
- return new HttpMetrics(client, root);
- }
-
-
- public HttpMetrics(CloseableHttpClient client, URI root) throws URISyntaxException {
- Preconditions.checkNotNull(root);
- _client = Preconditions.checkNotNull(client);
- _timeTarget = Utils.appendPath(root, "api/metrics/time");
- _counterTarget = Utils.appendPath(root, "api/metrics/counter");
- }
-
-
- @Override
- public void time(Latency dto) {
- if (dto.latencies.isEmpty()) {
- return;
- }
-
- try {
- post(_timeTarget, dto);
- } catch (Throwable t) {
- _log.warn("Exception when posting metric " + dto, t);
- }
- ;
-
- }
-
- @Override
- public void count(Counter dto) {
- try {
- post(_counterTarget, dto);
- } catch (Throwable t) {
- _log.warn("Exception when posting metric " + dto, t);
- }
-
- }
-
- private void post(URI uri, Object dto) {
-
- CloseableHttpResponse response = null;
-
- try {
- HttpEntity entity = Utils.toJsonEntity(dto);
-
- HttpPost request = new HttpPost(uri);
- request.setEntity(entity);
-
- response = _client.execute(request);
-
- int status = response.getCode();
-
- if (status < 200 || status >= 300) {
- _log.warn("Response status was: " + status);
- }
-
- } catch (Throwable t) {
- _log.warn("Exception when posting metrics: " + t.getMessage());
- if (_log.isDebugEnabled()) {
- _log.debug("Reason: ", t);
- }
- } finally {
- Utils.forceClose(response);
- }
-
- }
-
- @Override
- public void count(String counter, long delta) {
- try {
- Counter dto = new Counter();
- dto.name = counter;
- dto.delta = delta;
-
- count(dto);
- } catch (Throwable t) {
- _log.info("Could not count metric " + counter, t);
- }
-
- }
-
- @Override
- public void time(String operation, long timeInMs) {
- try {
- Latency dto = new Latency();
- dto.name = operation;
- dto.latencies = Lists.newArrayList(timeInMs);
-
- time(dto);
- } catch (Throwable t) {
- _log.info("Could not time metric " + operation, t);
- }
- }
-
- @VisibleForTesting
- URI getTimeTarget() {
- return _timeTarget;
- }
-
- @VisibleForTesting
- URI getCounterTarget() {
- return _counterTarget;
- }
-
-}
diff --git a/client/src/main/java/io/split/client/metrics/ILatencyTracker.java b/client/src/main/java/io/split/client/metrics/ILatencyTracker.java
deleted file mode 100644
index 282db06cb..000000000
--- a/client/src/main/java/io/split/client/metrics/ILatencyTracker.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package io.split.client.metrics;
-
-/**
- * Created by patricioe on 2/10/16.
- */
-public interface ILatencyTracker {
-
- void addLatencyMillis(long millis);
-
- void addLatencyMicros(long micros);
-
- long[] getLatencies();
-
- long getLatency(int index);
-
- void clear();
-
- long getBucketForLatencyMillis(long latency);
-
- long getBucketForLatencyMicros(long latency);
-
-}
diff --git a/client/src/main/java/io/split/client/metrics/LogarithmicSearchLatencyTracker.java b/client/src/main/java/io/split/client/metrics/LogarithmicSearchLatencyTracker.java
deleted file mode 100644
index 4034d8de6..000000000
--- a/client/src/main/java/io/split/client/metrics/LogarithmicSearchLatencyTracker.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package io.split.client.metrics;
-
-/**
- * Tracks latencies pero bucket of time.
- * Each bucket represent a latency greater than the one before
- * and each number within each bucket is a number of calls in the range.
- *
- * (1) 1.00
- * (2) 1.50
- * (3) 2.25
- * (4) 3.38
- * (5) 5.06
- * (6) 7.59
- * (7) 11.39
- * (8) 17.09
- * (9) 25.63
- * (10) 38.44
- * (11) 57.67
- * (12) 86.50
- * (13) 129.75
- * (14) 194.62
- * (15) 291.93
- * (16) 437.89
- * (17) 656.84
- * (18) 985.26
- * (19) 1,477.89
- * (20) 2,216.84
- * (21) 3,325.26
- * (22) 4,987.89
- * (23) 7,481.83
- *
- * Thread-safety: This class is not thread safe.
- *
- * Created by patricioe on 2/10/16.
- */
-public class LogarithmicSearchLatencyTracker implements ILatencyTracker {
-
- static final int BUCKETS = 23;
- private static final double LOG_10_1000_MICROS = Math.log10(1000);
- private static final double LOG_10_1_5_MICROS = Math.log10(Double.valueOf("1.5").doubleValue());
-
-
- long[] latencies = new long[BUCKETS];
-
- /**
- * Increment the internal counter for the bucket this latency falls into.
- *
- * @param millis
- */
- public void addLatencyMillis(long millis) {
- int index = findIndex(millis * 1000);
- latencies[index]++;
- }
-
- /**
- * Increment the internal counter for the bucket this latency falls into.
- *
- * @param micros
- */
- public void addLatencyMicros(long micros) {
- int index = findIndex(micros);
- latencies[index]++;
- }
-
- /**
- * Returns the list of latencies buckets as an array.
- *
- * @return the list of latencies buckets as an array.
- */
- public long[] getLatencies() {
- return latencies;
- }
-
- @Override
- public long getLatency(int index) {
- return latencies[index];
- }
-
- public void clear() {
- latencies = new long[BUCKETS];
- }
-
- /**
- * Returns the counts in the bucket this latency falls into.
- * The latencies will no be updated.
- *
- * @param latency
- * @return the bucket content for the latency.
- */
- public long getBucketForLatencyMillis(long latency) {
- return latencies[findIndex(latency * 1000)];
- }
-
- /**
- * Returns the counts in the bucket this latency falls into.
- * The latencies will no be updated.
- *
- * @param latency
- * @return the bucket content for the latency.
- */
- public long getBucketForLatencyMicros(long latency) {
- return latencies[findIndex(latency)];
- }
-
-
- private int findIndex(long micros) {
-
- if (micros <= 1000) return 0;
- if (micros > 4987885) return 22;
-
- double raw = (Math.log10(micros) - LOG_10_1000_MICROS) / LOG_10_1_5_MICROS;
- double rounded = Math.round(raw * 1000000d) / 1000000d;
- return (int) Math.ceil(rounded);
- }
-
-}
diff --git a/client/src/main/java/io/split/engine/SDKReadinessGates.java b/client/src/main/java/io/split/engine/SDKReadinessGates.java
index ae0fd8ad4..10a18fbba 100644
--- a/client/src/main/java/io/split/engine/SDKReadinessGates.java
+++ b/client/src/main/java/io/split/engine/SDKReadinessGates.java
@@ -15,9 +15,7 @@
public class SDKReadinessGates {
private static final Logger _log = LoggerFactory.getLogger(SDKReadinessGates.class);
- private final CountDownLatch _splitsAreReady = new CountDownLatch(1);
- private final ConcurrentMap _segmentsAreReady = new ConcurrentHashMap<>();
-
+ private final CountDownLatch _internalReady = new CountDownLatch(1);
/**
* Returns true if the SDK is ready. The SDK is ready when:
@@ -34,136 +32,15 @@ public class SDKReadinessGates {
* @return true if the sdk is ready, false otherwise.
* @throws InterruptedException if this operation was interrupted.
*/
- public boolean isSDKReady(long milliseconds) throws InterruptedException {
- long end = System.currentTimeMillis() + milliseconds;
- long timeLeft = milliseconds;
-
- boolean splits = areSplitsReady(timeLeft);
- if (!splits) {
- return false;
- }
-
- timeLeft = end - System.currentTimeMillis();
-
- return areSegmentsReady(timeLeft);
- }
-
- public boolean isSDKReadyNow() {
- try {
- return isSDKReady(0);
- } catch (InterruptedException e) {
- return false;
- }
- }
-
- /**
- * Records that the SDK split initialization is done.
- * This operation is atomic and idempotent. Repeated invocations
- * will not have any impact on the state.
- */
- public void splitsAreReady() {
- long originalCount = _splitsAreReady.getCount();
- _splitsAreReady.countDown();
- if (originalCount > 0L) {
- _log.info("splits are ready");
- }
- }
-
- /**
- * Registers a segment that the SDK should download before it is ready.
- * This method should be called right after the first successful download
- * of split definitions.
- *
- * Note that if this method is called in subsequent fetches of splits,
- * it will return false; meaning any segments used in new splits
- * will not be able to block the SDK from being marked as complete.
- *
- * @param segmentName the segment to register
- * @return true if the segments were registered, false otherwise.
- * @throws InterruptedException
- */
- public boolean registerSegment(String segmentName) throws InterruptedException {
- if (segmentName == null || segmentName.isEmpty() || areSplitsReady(0L)) {
- return false;
- }
-
- _segmentsAreReady.putIfAbsent(segmentName, new CountDownLatch(1));
- _log.info("Registered segment: " + segmentName);
- return true;
- }
-
- /**
- * Records that the SDK segment initialization for this segment is done.
- * This operation is atomic and idempotent. Repeated invocations
- * will not have any impact on the state.
- */
- public void segmentIsReady(String segmentName) {
- CountDownLatch cdl = _segmentsAreReady.get(segmentName);
- if (cdl == null) {
- return;
- }
-
- long originalCount = cdl.getCount();
-
- cdl.countDown();
-
- if (originalCount > 0L) {
- _log.info(segmentName + " segment is ready");
- }
- }
-
- public boolean isSegmentRegistered(String segmentName) {
- return _segmentsAreReady.get(segmentName) != null;
+ public boolean waitUntilInternalReady(long milliseconds) throws InterruptedException {
+ return _internalReady.await(milliseconds, TimeUnit.MILLISECONDS);
}
- /**
- * Returns true if the SDK is ready w.r.t segments. In other words, this method returns true if:
- *
- * - The SDK has fetched segment definitions the first time.
- *
- *
- * This operation will block until the SDK is ready or 'milliseconds' have passed. If the milliseconds
- * are less than or equal to zero, the operation will not block and return immediately
- *
- * @param milliseconds time to wait for an answer. if the value is zero or negative, we will not
- * block for an answer.
- * @return true if the sdk is ready w.r.t splits, false otherwise.
- * @throws InterruptedException if this operation was interrupted.
- */
- public boolean areSegmentsReady(long milliseconds) throws InterruptedException {
- long end = System.currentTimeMillis() + milliseconds;
- long timeLeft = milliseconds;
-
- for (Map.Entry entry : _segmentsAreReady.entrySet()) {
- String segmentName = entry.getKey();
- CountDownLatch cdl = entry.getValue();
-
- if (!cdl.await(timeLeft, TimeUnit.MILLISECONDS)) {
- _log.error(segmentName + " is not ready yet");
- return false;
- }
-
- timeLeft = end - System.currentTimeMillis();
- }
-
- return true;
+ public boolean isSDKReady() {
+ return _internalReady.getCount() == 0;
}
- /**
- * Returns true if the SDK is ready w.r.t splits. In other words, this method returns true if:
- *
- * - The SDK has fetched Split definitions the first time.
- *
- *
- * This operation will block until the SDK is ready or 'milliseconds' have passed. If the milliseconds
- * are less than or equal to zero, the operation will not block and return immediately
- *
- * @param milliseconds time to wait for an answer. if the value is zero or negative, we will not
- * block for an answer.
- * @return true if the sdk is ready w.r.t splits, false otherwise.
- * @throws InterruptedException if this operation was interrupted.
- */
- public boolean areSplitsReady(long milliseconds) throws InterruptedException {
- return _splitsAreReady.await(milliseconds, TimeUnit.MILLISECONDS);
+ public void sdkInternalReady() {
+ _internalReady.countDown();
}
}
diff --git a/client/src/main/java/io/split/engine/common/Backoff.java b/client/src/main/java/io/split/engine/common/Backoff.java
index 13bcb5425..285e0c824 100644
--- a/client/src/main/java/io/split/engine/common/Backoff.java
+++ b/client/src/main/java/io/split/engine/common/Backoff.java
@@ -5,20 +5,26 @@
import static com.google.common.base.Preconditions.checkNotNull;
public class Backoff {
- private static final long BACKOFF_MAX_SECONDS_ALLOWED = 1800;
+ private static final long BACKOFF_MAX_ALLOWED = 1800;
private final long _backoffBase;
private AtomicInteger _attempt;
+ private final long _maxAllowed;
public Backoff(long backoffBase) {
+ this(backoffBase, BACKOFF_MAX_ALLOWED);
+ }
+
+ public Backoff(long backoffBase, long maxAllowed) {
_backoffBase = checkNotNull(backoffBase);
_attempt = new AtomicInteger(0);
+ _maxAllowed = maxAllowed;
}
public long interval() {
long interval = _backoffBase * (long) Math.pow(2, _attempt.getAndIncrement());
- return interval >= BACKOFF_MAX_SECONDS_ALLOWED ? BACKOFF_MAX_SECONDS_ALLOWED : interval;
+ return interval >= _maxAllowed ? BACKOFF_MAX_ALLOWED : interval;
}
public synchronized void reset() {
diff --git a/client/src/main/java/io/split/engine/common/FastlyHeadersCaptor.java b/client/src/main/java/io/split/engine/common/FastlyHeadersCaptor.java
new file mode 100644
index 000000000..143f9523b
--- /dev/null
+++ b/client/src/main/java/io/split/engine/common/FastlyHeadersCaptor.java
@@ -0,0 +1,35 @@
+package io.split.engine.common;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class FastlyHeadersCaptor {
+
+ private static final Set HEADERS_TO_CAPTURE = new HashSet<>(Arrays.asList(
+ "Fastly-Debug-Path",
+ "Fastly-Debug-TTL",
+ "Fastly-Debug-Digest",
+ "X-Served-By",
+ "X-Cache",
+ "X-Cache-Hits",
+ "X-Timer",
+ "Surrogate-Key",
+ "ETag",
+ "Cache-Control",
+ "X-Request-ID",
+ "Last-Modified"
+ ));
+
+ private final List