diff --git a/client/pom.xml b/client/pom.xml
index 1ece412a1..91771c389 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -121,7 +121,7 @@
com.google.guava
guava
- 29.0-jre
+ 30.0-jre
org.slf4j
diff --git a/client/src/main/java/io/split/client/SplitFactoryBuilder.java b/client/src/main/java/io/split/client/SplitFactoryBuilder.java
index 7f78f4bd8..f7f1fea8b 100644
--- a/client/src/main/java/io/split/client/SplitFactoryBuilder.java
+++ b/client/src/main/java/io/split/client/SplitFactoryBuilder.java
@@ -25,7 +25,7 @@ public class SplitFactoryBuilder {
* @throws IOException if the SDK was being started in 'localhost' mode, but
* there were problems reading the override file from disk.
*/
- public static SplitFactory build(String apiToken) throws Exception {
+ public static SplitFactory build(String apiToken) throws IOException, URISyntaxException {
return build(apiToken, SplitClientConfig.builder().build());
}
@@ -36,7 +36,7 @@ public static SplitFactory build(String apiToken) throws Exception {
* @throws java.io.IOException if the SDK was being started in 'localhost' mode, but
* there were problems reading the override file from disk.
*/
- public static synchronized SplitFactory build(String apiToken, SplitClientConfig config) throws Exception {
+ public static synchronized SplitFactory build(String apiToken, SplitClientConfig config) throws IOException, URISyntaxException {
ApiKeyValidator.validate(apiToken);
if (LocalhostSplitFactory.LOCALHOST.equals(apiToken)) {
@@ -66,7 +66,7 @@ public static SplitFactory local(SplitClientConfig config) throws IOException, U
return LocalhostSplitFactory.createLocalhostSplitFactory(config);
}
- public static void main(String... args) throws Exception {
+ public static void main(String... args) throws IOException, URISyntaxException {
if (args.length != 1) {
System.out.println("Usage: ");
System.exit(1);
diff --git a/client/src/main/java/io/split/client/SplitFactoryImpl.java b/client/src/main/java/io/split/client/SplitFactoryImpl.java
index 03aaedf3c..90a9e9c0f 100644
--- a/client/src/main/java/io/split/client/SplitFactoryImpl.java
+++ b/client/src/main/java/io/split/client/SplitFactoryImpl.java
@@ -27,7 +27,7 @@
import io.split.integrations.IntegrationsConfig;
import io.split.telemetry.storage.InMemoryTelemetryStorage;
import io.split.telemetry.storage.TelemetryStorage;
-import io.split.telemetry.synchronizer.SynchronizerMemory;
+import io.split.telemetry.synchronizer.TelemetrySubmitter;
import io.split.telemetry.synchronizer.TelemetrySyncTask;
import io.split.telemetry.synchronizer.TelemetrySynchronizer;
import org.apache.hc.client5.http.auth.AuthScope;
@@ -123,7 +123,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
// Cache Initialisations
_segmentCache = new SegmentCacheInMemoryImpl();
_splitCache = new InMemoryCacheImp();
- _telemetrySynchronizer = new SynchronizerMemory(_httpclient, URI.create(config.get_telemetryURL()), _telemetryStorage, _splitCache, _segmentCache, _telemetryStorage, _startTime);
+ _telemetrySynchronizer = new TelemetrySubmitter(_httpclient, URI.create(config.get_telemetryURL()), _telemetryStorage, _splitCache, _segmentCache, _telemetryStorage, _startTime);
// Segments
@@ -182,18 +182,22 @@ public synchronized void destroy() {
if (!isTerminated) {
_log.info("Shutdown called for split");
try {
- _segmentSynchronizationTaskImp.close();
- _log.info("Successful shutdown of segment fetchers");
- _splitSynchronizationTask.close();
- _log.info("Successful shutdown of splits");
+ long splitCount = _splitCache.getAll().stream().count();
+ long segmentCount = _segmentCache.getAll().stream().count();
+ long segmentKeyCount = _segmentCache.getAllKeys().stream().count();
_impressionsManager.close();
_log.info("Successful shutdown of impressions manager");
_eventClient.close();
_log.info("Successful shutdown of eventClient");
+ _segmentSynchronizationTaskImp.close();
+ _log.info("Successful shutdown of segment fetchers");
+ _splitSynchronizationTask.close();
+ _log.info("Successful shutdown of splits");
_syncManager.shutdown();
_log.info("Successful shutdown of syncManager");
_telemetryStorage.recordSessionLength(System.currentTimeMillis() - _startTime);
- _telemetrySyncTask.stopScheduledTask();
+ _telemetrySyncTask.stopScheduledTask(splitCount, segmentCount, segmentKeyCount);
+ _log.info("Successful shutdown of telemetry sync task");
_httpclient.close();
_log.info("Successful shutdown of httpclient");
} catch (IOException e) {
diff --git a/client/src/main/java/io/split/telemetry/synchronizer/SynchronizerMemory.java b/client/src/main/java/io/split/telemetry/synchronizer/TelemetrySubmitter.java
similarity index 84%
rename from client/src/main/java/io/split/telemetry/synchronizer/SynchronizerMemory.java
rename to client/src/main/java/io/split/telemetry/synchronizer/TelemetrySubmitter.java
index bf075c4ed..b0b600a50 100644
--- a/client/src/main/java/io/split/telemetry/synchronizer/SynchronizerMemory.java
+++ b/client/src/main/java/io/split/telemetry/synchronizer/TelemetrySubmitter.java
@@ -1,5 +1,6 @@
package io.split.telemetry.synchronizer;
+import com.google.common.annotations.VisibleForTesting;
import io.split.cache.SegmentCache;
import io.split.cache.SplitCache;
import io.split.client.SplitClientConfig;
@@ -23,7 +24,7 @@
import java.util.List;
import java.util.Map;
-public class SynchronizerMemory implements TelemetrySynchronizer{
+public class TelemetrySubmitter implements TelemetrySynchronizer{
private static final int OPERATION_MODE = 0;
private static final String STORAGE = "memory";
@@ -34,7 +35,7 @@ public class SynchronizerMemory implements TelemetrySynchronizer{
private SegmentCache _segmentCache;
private final long _initStartTime;
- public SynchronizerMemory(CloseableHttpClient client, URI telemetryRootEndpoint, TelemetryStorageConsumer telemetryStorageConsumer, SplitCache splitCache,
+ public TelemetrySubmitter(CloseableHttpClient client, URI telemetryRootEndpoint, TelemetryStorageConsumer telemetryStorageConsumer, SplitCache splitCache,
SegmentCache segmentCache, TelemetryRuntimeProducer telemetryRuntimeProducer, long initStartTime) throws URISyntaxException {
_httpHttpTelemetryMemorySender = HttpTelemetryMemorySender.create(client, telemetryRootEndpoint, telemetryRuntimeProducer);
_teleTelemetryStorageConsumer = telemetryStorageConsumer;
@@ -53,7 +54,17 @@ public void synchronizeStats() throws Exception {
_httpHttpTelemetryMemorySender.postStats(generateStats());
}
- private Stats generateStats() throws Exception {
+ @Override
+ public void finalSynchronization(long splitCount, long segmentCount, long segmentKeyCount) throws Exception {
+ Stats stats = generateStats();
+ stats.set_splitCount(splitCount);
+ stats.set_segmentCount(segmentCount);
+ stats.set_segmentKeyCount(segmentKeyCount);
+ _httpHttpTelemetryMemorySender.postStats(stats);
+ }
+
+ @VisibleForTesting
+ Stats generateStats() throws Exception {
Stats stats = new Stats();
stats.set_lastSynchronization(_teleTelemetryStorageConsumer.getLastSynchronization());
stats.set_methodLatencies(_teleTelemetryStorageConsumer.popLatencies());
@@ -76,7 +87,8 @@ private Stats generateStats() throws Exception {
return stats;
}
- private Config generateConfig(SplitClientConfig splitClientConfig, long readyTimestamp, Map factoryInstances, List tags) {
+ @VisibleForTesting
+ Config generateConfig(SplitClientConfig splitClientConfig, long readyTimestamp, Map factoryInstances, List tags) {
Config config = new Config();
Rates rates = new Rates();
URLOverrides urlOverrides = new URLOverrides();
@@ -93,11 +105,11 @@ private Config generateConfig(SplitClientConfig splitClientConfig, long readyTim
rates.set_segments(splitClientConfig.segmentsRefreshRate());
rates.set_splits(splitClientConfig.featuresRefreshRate());
- urlOverrides.set_auth(SplitClientConfig.AUTH_ENDPOINT.equals(splitClientConfig.authServiceURL()));
- urlOverrides.set_stream(SplitClientConfig.STREAMING_ENDPOINT.equals(splitClientConfig.streamingServiceURL()));
- urlOverrides.set_sdk(SplitClientConfig.SDK_ENDPOINT.equals(splitClientConfig.endpoint()));
- urlOverrides.set_events(SplitClientConfig.EVENTS_ENDPOINT.equals(splitClientConfig.eventsEndpoint()));
- urlOverrides.set_telemetry(SplitClientConfig.TELEMETRY_ENDPOINT.equals(splitClientConfig.get_telemetryURL()));
+ urlOverrides.set_auth(!SplitClientConfig.AUTH_ENDPOINT.equals(splitClientConfig.authServiceURL()));
+ urlOverrides.set_stream(!SplitClientConfig.STREAMING_ENDPOINT.equals(splitClientConfig.streamingServiceURL()));
+ urlOverrides.set_sdk(!SplitClientConfig.SDK_ENDPOINT.equals(splitClientConfig.endpoint()));
+ urlOverrides.set_events(!SplitClientConfig.EVENTS_ENDPOINT.equals(splitClientConfig.eventsEndpoint()));
+ urlOverrides.set_telemetry(!SplitClientConfig.TELEMETRY_ENDPOINT.equals(splitClientConfig.get_telemetryURL()));
config.set_burTimeouts(_teleTelemetryStorageConsumer.getBURTimeouts());
config.set_nonReadyUsages(_teleTelemetryStorageConsumer.getNonReadyUsages());
diff --git a/client/src/main/java/io/split/telemetry/synchronizer/TelemetrySyncTask.java b/client/src/main/java/io/split/telemetry/synchronizer/TelemetrySyncTask.java
index c5135b133..b766f7e05 100644
--- a/client/src/main/java/io/split/telemetry/synchronizer/TelemetrySyncTask.java
+++ b/client/src/main/java/io/split/telemetry/synchronizer/TelemetrySyncTask.java
@@ -2,6 +2,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.split.client.SplitManagerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -10,6 +13,7 @@
public class TelemetrySyncTask {
+ private static final Logger _log = LoggerFactory.getLogger(TelemetrySyncTask.class);
private final ScheduledExecutorService _telemetrySyncScheduledExecutorService;
private final TelemetrySynchronizer _telemetrySynchronizer;
private final int _telemetryRefreshRate;
@@ -30,21 +34,21 @@ public TelemetrySyncTask(int telemetryRefreshRate, TelemetrySynchronizer telemet
}
@VisibleForTesting
- protected void startScheduledTask() throws Exception {
+ protected void startScheduledTask() {
_telemetrySyncScheduledExecutorService.scheduleWithFixedDelay(() -> {
try {
_telemetrySynchronizer.synchronizeStats();
} catch (Exception e) {
e.printStackTrace();
}
- },0l, _telemetryRefreshRate, TimeUnit.SECONDS);
+ },_telemetryRefreshRate, _telemetryRefreshRate, TimeUnit.SECONDS);
}
- public void stopScheduledTask() {
+ public void stopScheduledTask(long splitCount, long segmentCount, long segmentKeyCount) {
try {
- _telemetrySynchronizer.synchronizeStats();
+ _telemetrySynchronizer.finalSynchronization(splitCount, segmentCount, segmentKeyCount);
} catch (Exception e) {
- e.printStackTrace();
+ _log.warn("Error trying to send telemetry stats.");
}
_telemetrySyncScheduledExecutorService.shutdown();
}
diff --git a/client/src/main/java/io/split/telemetry/synchronizer/TelemetrySynchronizer.java b/client/src/main/java/io/split/telemetry/synchronizer/TelemetrySynchronizer.java
index 859df44c8..7600a6334 100644
--- a/client/src/main/java/io/split/telemetry/synchronizer/TelemetrySynchronizer.java
+++ b/client/src/main/java/io/split/telemetry/synchronizer/TelemetrySynchronizer.java
@@ -8,4 +8,5 @@
public interface TelemetrySynchronizer {
void synchronizeConfig(SplitClientConfig config, long timeUntilReady, Map factoryInstances, List tags);
void synchronizeStats() throws Exception;
+ void finalSynchronization(long splitCount, long segmentCount, long segmentKeyCount) throws Exception;
}
diff --git a/client/src/test/java/io/split/client/SplitFactoryImplTest.java b/client/src/test/java/io/split/client/SplitFactoryImplTest.java
index 1256293b4..99123aa93 100644
--- a/client/src/test/java/io/split/client/SplitFactoryImplTest.java
+++ b/client/src/test/java/io/split/client/SplitFactoryImplTest.java
@@ -1,10 +1,15 @@
package io.split.client;
import io.split.client.impressions.ImpressionsManager;
+import io.split.engine.segments.SegmentSynchronizationTaskImp;
import io.split.integrations.IntegrationsConfig;
+import io.split.telemetry.storage.TelemetryStorage;
import junit.framework.TestCase;
import org.junit.Test;
+import org.mockito.Mockito;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.net.URISyntaxException;
public class SplitFactoryImplTest extends TestCase {
@@ -89,6 +94,7 @@ public void testFactoryInstantiationWithProxy() throws Exception {
@Test
public void testFactoryDestroy() throws Exception {
+ TelemetryStorage telemetryStorage = Mockito.mock(TelemetryStorage.class);
SplitClientConfig splitClientConfig = SplitClientConfig.builder()
.enableDebug()
.impressionsMode(ImpressionsManager.Mode.DEBUG)
@@ -98,10 +104,20 @@ public void testFactoryDestroy() throws Exception {
.authServiceURL(AUTH_SERVICE)
.setBlockUntilReadyTimeout(10000)
.build();
+
SplitFactoryImpl splitFactory = new SplitFactoryImpl(API_KEY, splitClientConfig);
+ //Before destroy we replace telemetryStorage via reflection.
+ Field factoryDestroy = SplitFactoryImpl.class.getDeclaredField("_telemetryStorage");
+ factoryDestroy.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(factoryDestroy, factoryDestroy.getModifiers() & ~Modifier.FINAL);
+
+ factoryDestroy.set(splitFactory, telemetryStorage);
splitFactory.destroy();
assertTrue(splitFactory.isDestroyed());
+ Mockito.verify(telemetryStorage, Mockito.times(1)).recordSessionLength(Mockito.anyLong());
}
}
\ No newline at end of file
diff --git a/client/src/test/java/io/split/client/impressions/ImpressionsManagerImplTest.java b/client/src/test/java/io/split/client/impressions/ImpressionsManagerImplTest.java
index aca79a6c5..c00caa5d4 100644
--- a/client/src/test/java/io/split/client/impressions/ImpressionsManagerImplTest.java
+++ b/client/src/test/java/io/split/client/impressions/ImpressionsManagerImplTest.java
@@ -4,8 +4,10 @@
import io.split.client.dtos.KeyImpression;
import io.split.client.dtos.TestImpressions;
+import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum;
import io.split.telemetry.storage.InMemoryTelemetryStorage;
import io.split.telemetry.storage.TelemetryStorage;
+import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -21,15 +23,19 @@
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
/**
* Created by patricioe on 6/20/16.
*/
@RunWith(MockitoJUnitRunner.class)
public class ImpressionsManagerImplTest {
- private static final TelemetryStorage TELEMETRY_STORAGE = Mockito.mock(InMemoryTelemetryStorage.class);
+ private static TelemetryStorage TELEMETRY_STORAGE = Mockito.mock(InMemoryTelemetryStorage.class);
+
+ @Before
+ public void setUp() {
+ TELEMETRY_STORAGE = Mockito.mock(InMemoryTelemetryStorage.class);
+ }
@Captor
private ArgumentCaptor> impressionsCaptor;
@@ -102,6 +108,7 @@ public void worksButDropsImpressions() throws URISyntaxException {
List captured = impressionsCaptor.getValue();
assertThat(captured.size(), is(equalTo(3)));
+ Mockito.verify(TELEMETRY_STORAGE, times(1)).recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, 1);
}
@Test
@@ -138,6 +145,8 @@ public void works4ImpressionsInOneTest() throws URISyntaxException {
assertThat(captured.size(), is(equalTo(1)));
assertThat(captured.get(0).keyImpressions.size(), is(equalTo(4)));
assertThat(captured.get(0).keyImpressions.get(0), is(equalTo(ki1)));
+ Mockito.verify(TELEMETRY_STORAGE, times(2)).recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DEDUPED, 1);
+ Mockito.verify(TELEMETRY_STORAGE, times(4)).recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_QUEUED, 1);
}
@Test
diff --git a/client/src/test/java/io/split/engine/sse/AuthApiClientTest.java b/client/src/test/java/io/split/engine/sse/AuthApiClientTest.java
index f90a45d80..dab743602 100644
--- a/client/src/test/java/io/split/engine/sse/AuthApiClientTest.java
+++ b/client/src/test/java/io/split/engine/sse/AuthApiClientTest.java
@@ -8,6 +8,7 @@
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.core5.http.HttpStatus;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -15,7 +16,12 @@
import java.lang.reflect.InvocationTargetException;
public class AuthApiClientTest {
- private static final TelemetryStorage TELEMETRY_STORAGE = Mockito.mock(InMemoryTelemetryStorage.class);
+ private static TelemetryStorage TELEMETRY_STORAGE = Mockito.mock(InMemoryTelemetryStorage.class);
+
+ @Before
+ public void setUp() {
+ TELEMETRY_STORAGE = Mockito.mock(InMemoryTelemetryStorage.class);
+ }
@Test
public void authenticateWithPushEnabledShouldReturnSuccess() throws IOException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
CloseableHttpClient httpClientMock = TestHelper.mockHttpClient("streaming-auth-push-enabled.json", HttpStatus.SC_OK);
@@ -28,6 +34,9 @@ public void authenticateWithPushEnabledShouldReturnSuccess() throws IOException,
Assert.assertFalse(result.isRetry());
Assert.assertFalse(StringUtils.isEmpty(result.getToken()));
Assert.assertTrue(result.getExpiration() > 0);
+ Mockito.verify(TELEMETRY_STORAGE, Mockito.times(1)).recordTokenRefreshes();
+ Mockito.verify(TELEMETRY_STORAGE, Mockito.times(1)).recordSyncLatency(Mockito.anyObject(), Mockito.anyLong());
+ Mockito.verify(TELEMETRY_STORAGE, Mockito.times(1)).recordSuccessfulSync(Mockito.anyObject(), Mockito.anyLong());
}
@@ -95,5 +104,6 @@ public void authenticateServerUnauthorizedShouldReturnErrorWithoutRetry() throws
Assert.assertTrue(StringUtils.isEmpty(result.getChannels()));
Assert.assertTrue(StringUtils.isEmpty(result.getToken()));
Assert.assertFalse(result.isRetry());
+ Mockito.verify(TELEMETRY_STORAGE, Mockito.times(1)).recordAuthRejections();
}
}
diff --git a/client/src/test/java/io/split/telemetry/synchronizer/SynchronizerMemoryTest.java b/client/src/test/java/io/split/telemetry/synchronizer/SynchronizerMemoryTest.java
deleted file mode 100644
index 0afab5ddf..000000000
--- a/client/src/test/java/io/split/telemetry/synchronizer/SynchronizerMemoryTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package io.split.telemetry.synchronizer;
-
-import io.split.TestHelper;
-import io.split.cache.SegmentCache;
-import io.split.cache.SegmentCacheInMemoryImpl;
-import io.split.cache.SplitCache;
-import io.split.client.SplitClientConfig;
-import io.split.telemetry.storage.InMemoryTelemetryStorage;
-import io.split.telemetry.storage.TelemetryRuntimeProducer;
-import io.split.telemetry.storage.TelemetryStorageConsumer;
-import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
-import org.apache.hc.core5.http.HttpStatus;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-public class SynchronizerMemoryTest {
-
- public static final String TELEMETRY_ENDPOINT = "https://telemetry.split.io/api/v1";
-
- @Test
- public void testSynchronizeConfig() throws URISyntaxException, NoSuchMethodException, IOException, IllegalAccessException, InvocationTargetException {
- CloseableHttpClient httpClient = TestHelper.mockHttpClient(TELEMETRY_ENDPOINT, HttpStatus.SC_OK);
- TelemetrySynchronizer telemetrySynchronizer = getTelemetrySynchronizer(httpClient);
- SplitClientConfig splitClientConfig = SplitClientConfig.builder().build();
-
- telemetrySynchronizer.synchronizeConfig(splitClientConfig, 100l, new HashMap(), new ArrayList());
- Mockito.verify(httpClient, Mockito.times(1)).execute(Mockito.any());
- }
-
-
- @Test
- public void testSynchronizeStats() throws Exception {
- CloseableHttpClient httpClient = TestHelper.mockHttpClient(TELEMETRY_ENDPOINT, HttpStatus.SC_OK);
- TelemetrySynchronizer telemetrySynchronizer = getTelemetrySynchronizer(httpClient);
-
- telemetrySynchronizer.synchronizeStats();
- Mockito.verify(httpClient, Mockito.times(1)).execute(Mockito.any());
- }
-
- private TelemetrySynchronizer getTelemetrySynchronizer(CloseableHttpClient httpClient) throws URISyntaxException, InvocationTargetException, NoSuchMethodException, IllegalAccessException, IOException {
- TelemetryStorageConsumer consumer = Mockito.mock(InMemoryTelemetryStorage.class);
- TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(TelemetryRuntimeProducer.class);
- SplitCache splitCache = Mockito.mock(SplitCache.class);
- SegmentCache segmentCache = Mockito.mock(SegmentCacheInMemoryImpl.class);
- SplitClientConfig config = Mockito.mock(SplitClientConfig.class);
- TelemetrySynchronizer telemetrySynchronizer = new SynchronizerMemory(httpClient, URI.create(TELEMETRY_ENDPOINT), consumer, splitCache, segmentCache, telemetryRuntimeProducer, 0l);
- return telemetrySynchronizer;
- }
-
-}
\ No newline at end of file
diff --git a/client/src/test/java/io/split/telemetry/synchronizer/TelemetrySubmitterTest.java b/client/src/test/java/io/split/telemetry/synchronizer/TelemetrySubmitterTest.java
new file mode 100644
index 000000000..d4bb42d06
--- /dev/null
+++ b/client/src/test/java/io/split/telemetry/synchronizer/TelemetrySubmitterTest.java
@@ -0,0 +1,224 @@
+package io.split.telemetry.synchronizer;
+
+import io.split.TestHelper;
+import io.split.cache.SegmentCache;
+import io.split.cache.SegmentCacheInMemoryImpl;
+import io.split.cache.SplitCache;
+import io.split.client.ApiKeyCounter;
+import io.split.client.SplitClientConfig;
+import io.split.telemetry.domain.Config;
+import io.split.telemetry.domain.Stats;
+import io.split.telemetry.domain.StreamingEvent;
+import io.split.telemetry.domain.enums.*;
+import io.split.telemetry.storage.InMemoryTelemetryStorage;
+import io.split.telemetry.storage.TelemetryRuntimeProducer;
+import io.split.telemetry.storage.TelemetryStorage;
+import io.split.telemetry.storage.TelemetryStorageConsumer;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.core5.http.HttpStatus;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class TelemetrySubmitterTest {
+ private static final String FIRST_KEY = "KEY_1";
+ private static final String SECOND_KEY = "KEY_2";
+ public static final String TELEMETRY_ENDPOINT = "https://telemetry.split.io/api/v1";
+
+ @Test
+ public void testSynchronizeConfig() throws URISyntaxException, NoSuchMethodException, IOException, IllegalAccessException, InvocationTargetException {
+ CloseableHttpClient httpClient = TestHelper.mockHttpClient(TELEMETRY_ENDPOINT, HttpStatus.SC_OK);
+ TelemetrySynchronizer telemetrySynchronizer = getTelemetrySynchronizer(httpClient);
+ SplitClientConfig splitClientConfig = SplitClientConfig.builder().build();
+
+ telemetrySynchronizer.synchronizeConfig(splitClientConfig, 100l, new HashMap(), new ArrayList());
+ Mockito.verify(httpClient, Mockito.times(1)).execute(Mockito.any());
+ }
+
+
+ @Test
+ public void testSynchronizeStats() throws Exception {
+ CloseableHttpClient httpClient = TestHelper.mockHttpClient(TELEMETRY_ENDPOINT, HttpStatus.SC_OK);
+ TelemetrySynchronizer telemetrySynchronizer = getTelemetrySynchronizer(httpClient);
+
+ telemetrySynchronizer.synchronizeStats();
+ Mockito.verify(httpClient, Mockito.times(1)).execute(Mockito.any());
+ }
+
+ @Test
+ public void testConfig() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, IOException, URISyntaxException, NoSuchFieldException, ClassNotFoundException {
+ ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY);
+ ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY);
+ ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY);
+ ApiKeyCounter.getApiKeyCounterInstance().add(SECOND_KEY);
+ ApiKeyCounter.getApiKeyCounterInstance().add(SECOND_KEY);
+ TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage();
+ CloseableHttpClient httpClient = TestHelper.mockHttpClient(TELEMETRY_ENDPOINT, HttpStatus.SC_OK);
+ TelemetrySubmitter telemetrySynchronizer = getTelemetrySynchronizer(httpClient);
+ SplitClientConfig splitClientConfig = SplitClientConfig.builder().build();
+ populateConfig(telemetryStorage);
+ Field teleTelemetryStorageConsumer = TelemetrySubmitter.class.getDeclaredField("_teleTelemetryStorageConsumer");
+ teleTelemetryStorageConsumer.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(teleTelemetryStorageConsumer, teleTelemetryStorageConsumer.getModifiers() & ~Modifier.FINAL);
+ teleTelemetryStorageConsumer.set(telemetrySynchronizer, telemetryStorage);
+ Config config = telemetrySynchronizer.generateConfig(splitClientConfig, 100l, ApiKeyCounter.getApiKeyCounterInstance().getFactoryInstances(), new ArrayList<>());
+ Assert.assertEquals(3, config.get_redundantFactories());
+ Assert.assertEquals(2, config.get_burTimeouts());
+ Assert.assertEquals(3, config.get_nonReadyUsages());
+ }
+
+ @Test
+ public void testStats() throws Exception {
+ TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage();
+ CloseableHttpClient httpClient = TestHelper.mockHttpClient(TELEMETRY_ENDPOINT, HttpStatus.SC_OK);
+ TelemetrySubmitter telemetrySynchronizer = getTelemetrySynchronizer(httpClient);
+ populateStats(telemetryStorage);
+ Field teleTelemetryStorageConsumer = TelemetrySubmitter.class.getDeclaredField("_teleTelemetryStorageConsumer");
+ teleTelemetryStorageConsumer.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(teleTelemetryStorageConsumer, teleTelemetryStorageConsumer.getModifiers() & ~Modifier.FINAL);
+
+ teleTelemetryStorageConsumer.set(telemetrySynchronizer, telemetryStorage);
+ Stats stats = telemetrySynchronizer.generateStats();
+ Assert.assertEquals(2, stats.get_methodLatencies().get_treatment().stream().mapToInt(Long::intValue).sum());
+ Assert.assertEquals(2, stats.get_methodLatencies().get_treatments().stream().mapToInt(Long::intValue).sum());
+ Assert.assertEquals(1, stats.get_methodLatencies().get_treatmentsWithConfig().stream().mapToInt(Long::intValue).sum());
+ Assert.assertEquals(1, stats.get_methodLatencies().get_treatmentWithConfig().stream().mapToInt(Long::intValue).sum());
+ Assert.assertEquals(0, stats.get_methodLatencies().get_track().stream().mapToInt(Long::intValue).sum());
+ Assert.assertEquals(3, stats.get_httpLatencies().get_splits().stream().mapToInt(Long::intValue).sum());
+ Assert.assertEquals(2, stats.get_httpLatencies().get_telemetry().stream().mapToInt(Long::intValue).sum());
+ Assert.assertEquals(2, stats.get_httpLatencies().get_events().stream().mapToInt(Long::intValue).sum());
+ Assert.assertEquals(1, stats.get_httpLatencies().get_segments().stream().mapToInt(Long::intValue).sum());
+ Assert.assertEquals(1, stats.get_httpLatencies().get_impressions().stream().mapToInt(Long::intValue).sum());
+ Assert.assertEquals(1, stats.get_httpLatencies().get_impressionsCount().stream().mapToInt(Long::intValue).sum());
+ Assert.assertEquals(0, stats.get_httpLatencies().get_token().stream().mapToInt(Long::intValue).sum());
+ Assert.assertEquals(2, stats.get_methodExceptions().get_treatment());
+ Assert.assertEquals(2, stats.get_methodExceptions().get_treatments());
+ Assert.assertEquals(1, stats.get_methodExceptions().get_treatmentsWithConfig());
+ Assert.assertEquals(1, stats.get_methodExceptions().get_treatmentWithConfig());
+ Assert.assertEquals(0, stats.get_methodExceptions().get_track());
+ Assert.assertEquals(1, stats.get_authRejections());
+ Assert.assertEquals(2, stats.get_tokenRefreshes());
+ Assert.assertEquals(4, stats.get_impressionsDeduped());
+ Assert.assertEquals(12, stats.get_impressionsDropped());
+ Assert.assertEquals(0, stats.get_impressionsQueued());
+ Assert.assertEquals(10, stats.get_eventsDropped());
+ Assert.assertEquals(3, stats.get_eventsQueued());
+ Assert.assertEquals(800, stats.get_lastSynchronization().get_events());
+ Assert.assertEquals(129, stats.get_lastSynchronization().get_token());
+ Assert.assertEquals(1580, stats.get_lastSynchronization().get_segments());
+ Assert.assertEquals(0, stats.get_lastSynchronization().get_splits());
+ Assert.assertEquals(10500, stats.get_lastSynchronization().get_impressions());
+ Assert.assertEquals(1500, stats.get_lastSynchronization().get_impressionsCount());
+ Assert.assertEquals(265, stats.get_lastSynchronization().get_telemetry());
+ Assert.assertEquals(91218, stats.get_sessionLengthMs());
+ Assert.assertEquals(2, stats.get_httpErrors().get_telemetry().get(400l).intValue());
+ Assert.assertEquals(1, stats.get_httpErrors().get_segments().get(501l).intValue());
+ Assert.assertEquals(2, stats.get_httpErrors().get_impressions().get(403l).intValue());
+ Assert.assertEquals(1, stats.get_httpErrors().get_impressionsCount().get(403l).intValue());
+ Assert.assertEquals(1, stats.get_httpErrors().get_events().get(503l).intValue());
+ Assert.assertEquals(1, stats.get_httpErrors().get_splits().get(403l).intValue());
+ Assert.assertEquals(1, stats.get_httpErrors().get_token().get(403l).intValue());
+ List streamingEvents = stats.get_streamingEvents();
+ Assert.assertEquals(290, streamingEvents.get(0).get_data());
+ Assert.assertEquals(1, streamingEvents.get(0).get_type());
+ Assert.assertEquals(91218, streamingEvents.get(0).getTimestamp());
+ }
+
+ private TelemetrySubmitter getTelemetrySynchronizer(CloseableHttpClient httpClient) throws URISyntaxException, InvocationTargetException, NoSuchMethodException, IllegalAccessException, IOException {
+ TelemetryStorageConsumer consumer = Mockito.mock(InMemoryTelemetryStorage.class);
+ TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(TelemetryRuntimeProducer.class);
+ SplitCache splitCache = Mockito.mock(SplitCache.class);
+ SegmentCache segmentCache = Mockito.mock(SegmentCacheInMemoryImpl.class);
+ TelemetrySubmitter telemetrySynchronizer = new TelemetrySubmitter(httpClient, URI.create(TELEMETRY_ENDPOINT), consumer, splitCache, segmentCache, telemetryRuntimeProducer, 0l);
+ return telemetrySynchronizer;
+ }
+
+ private void populateStats(TelemetryStorage telemetryStorage) {
+ telemetryStorage.recordLatency(MethodEnum.TREATMENT, 1500l * 1000);
+ telemetryStorage.recordLatency(MethodEnum.TREATMENT, 2000l * 1000);
+ telemetryStorage.recordLatency(MethodEnum.TREATMENTS, 3000l * 1000);
+ telemetryStorage.recordLatency(MethodEnum.TREATMENTS, 500l * 1000);
+ telemetryStorage.recordLatency(MethodEnum.TREATMENT_WITH_CONFIG, 800l * 1000);
+ telemetryStorage.recordLatency(MethodEnum.TREATMENTS_WITH_CONFIG, 1000l * 1000);
+
+ telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.TELEMETRY, 1500l * 1000);
+ telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.TELEMETRY, 2000l * 1000);
+ telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.EVENTS, 1500l * 1000);
+ telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.EVENTS, 2000l * 1000);
+ telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.SEGMENTS, 1500l * 1000);
+ telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.SPLITS, 2000l * 1000);
+ telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.SPLITS, 1500l * 1000);
+ telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.SPLITS, 2000l * 1000);
+ telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.IMPRESSIONS, 1500l * 1000);
+ telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.IMPRESSIONS_COUNT, 2000l * 1000);
+
+ telemetryStorage.recordException(MethodEnum.TREATMENT);
+ telemetryStorage.recordException(MethodEnum.TREATMENTS);
+ telemetryStorage.recordException(MethodEnum.TREATMENT);
+ telemetryStorage.recordException(MethodEnum.TREATMENTS);
+ telemetryStorage.recordException(MethodEnum.TREATMENT_WITH_CONFIG);
+ telemetryStorage.recordException(MethodEnum.TREATMENTS_WITH_CONFIG);
+
+ telemetryStorage.recordAuthRejections();
+
+ telemetryStorage.recordTokenRefreshes();
+ telemetryStorage.recordTokenRefreshes();
+
+ telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DEDUPED, 3);
+ telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DEDUPED, 1);
+ telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, 4);
+ telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, 6);
+ telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, 2);
+
+ telemetryStorage.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 3);
+ telemetryStorage.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 7);
+ telemetryStorage.recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 3);
+
+ telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.EVENTS, 1500);
+ telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.EVENTS, 800);
+ telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.IMPRESSIONS, 2500);
+ telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.IMPRESSIONS, 10500);
+ telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.IMPRESSIONS_COUNT, 1500);
+ telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.SEGMENTS, 1580);
+ telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.TELEMETRY, 265);
+ telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.TOKEN, 129);
+
+ telemetryStorage.recordSessionLength(91218);
+
+ telemetryStorage.recordSyncError(ResourceEnum.TELEMETRY_SYNC, 400);
+ telemetryStorage.recordSyncError(ResourceEnum.TELEMETRY_SYNC, 400);
+ telemetryStorage.recordSyncError(ResourceEnum.SEGMENT_SYNC, 501);
+ telemetryStorage.recordSyncError(ResourceEnum.IMPRESSION_SYNC, 403);
+ telemetryStorage.recordSyncError(ResourceEnum.IMPRESSION_SYNC, 403);
+ telemetryStorage.recordSyncError(ResourceEnum.EVENT_SYNC, 503);
+ telemetryStorage.recordSyncError(ResourceEnum.SPLIT_SYNC, 403);
+ telemetryStorage.recordSyncError(ResourceEnum.IMPRESSION_COUNT_SYNC, 403);
+ telemetryStorage.recordSyncError(ResourceEnum.TOKEN_SYNC, 403);
+
+ StreamingEvent streamingEvent = new StreamingEvent(1, 290, 91218);
+ telemetryStorage.recordStreamingEvents(streamingEvent);
+ }
+
+ private void populateConfig(TelemetryStorage telemetryStorage) {
+ telemetryStorage.recordBURTimeout();
+ telemetryStorage.recordBURTimeout();
+ telemetryStorage.recordNonReadyUsage();
+ telemetryStorage.recordNonReadyUsage();
+ telemetryStorage.recordNonReadyUsage();
+ }
+
+}
\ No newline at end of file
diff --git a/client/src/test/java/io/split/telemetry/synchronizer/TelemetrySyncTaskTest.java b/client/src/test/java/io/split/telemetry/synchronizer/TelemetrySyncTaskTest.java
index 4113313b6..46339d71f 100644
--- a/client/src/test/java/io/split/telemetry/synchronizer/TelemetrySyncTaskTest.java
+++ b/client/src/test/java/io/split/telemetry/synchronizer/TelemetrySyncTaskTest.java
@@ -7,24 +7,24 @@ public class TelemetrySyncTaskTest {
@Test
public void testSynchronizationTask() throws Exception {
- TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(SynchronizerMemory.class);
+ TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySubmitter.class);
Mockito.doNothing().when(telemetrySynchronizer).synchronizeStats();
TelemetrySyncTask telemetrySyncTask = new TelemetrySyncTask(1, telemetrySynchronizer);
Thread.sleep(2900);
- Mockito.verify(telemetrySynchronizer, Mockito.times(3)).synchronizeStats();
+ Mockito.verify(telemetrySynchronizer, Mockito.times(2)).synchronizeStats();
}
@Test
public void testStopSynchronizationTask() throws Exception {
- TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(SynchronizerMemory.class);
+ TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySubmitter.class);
// Mockito.doNothing().when(telemetrySynchronizer).synchronizeStats();
TelemetrySyncTask telemetrySyncTask = new TelemetrySyncTask(1, telemetrySynchronizer);
Thread.sleep(3000);
- Mockito.verify(telemetrySynchronizer, Mockito.times(3)).synchronizeStats();
- telemetrySyncTask.stopScheduledTask();
+ Mockito.verify(telemetrySynchronizer, Mockito.times(2)).synchronizeStats();
+ telemetrySyncTask.stopScheduledTask(1l, 1l, 1l);
Thread.sleep(2000);
- Mockito.verify(telemetrySynchronizer, Mockito.times(4)).synchronizeStats();
-
+ Mockito.verify(telemetrySynchronizer, Mockito.times(2)).synchronizeStats();
+ Mockito.verify(telemetrySynchronizer, Mockito.times(1)).finalSynchronization(1l, 1l, 1l);
}
}
\ No newline at end of file