Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
<version>30.0-jre</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
6 changes: 3 additions & 3 deletions client/src/main/java/io/split/client/SplitFactoryBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class SplitFactoryBuilder {
* @throws IOException if the SDK was being started in 'localhost' mode, but
* there were problems reading the override file from disk.
*/
public static SplitFactory build(String apiToken) throws Exception {
public static SplitFactory build(String apiToken) throws IOException, URISyntaxException {
return build(apiToken, SplitClientConfig.builder().build());
}

Expand All @@ -36,7 +36,7 @@ public static SplitFactory build(String apiToken) throws Exception {
* @throws java.io.IOException if the SDK was being started in 'localhost' mode, but
* there were problems reading the override file from disk.
*/
public static synchronized SplitFactory build(String apiToken, SplitClientConfig config) throws Exception {
public static synchronized SplitFactory build(String apiToken, SplitClientConfig config) throws IOException, URISyntaxException {
ApiKeyValidator.validate(apiToken);

if (LocalhostSplitFactory.LOCALHOST.equals(apiToken)) {
Expand Down Expand Up @@ -66,7 +66,7 @@ public static SplitFactory local(SplitClientConfig config) throws IOException, U
return LocalhostSplitFactory.createLocalhostSplitFactory(config);
}

public static void main(String... args) throws Exception {
public static void main(String... args) throws IOException, URISyntaxException {
if (args.length != 1) {
System.out.println("Usage: <api_token>");
System.exit(1);
Expand Down
18 changes: 11 additions & 7 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.split.integrations.IntegrationsConfig;
import io.split.telemetry.storage.InMemoryTelemetryStorage;
import io.split.telemetry.storage.TelemetryStorage;
import io.split.telemetry.synchronizer.SynchronizerMemory;
import io.split.telemetry.synchronizer.TelemetrySubmitter;
import io.split.telemetry.synchronizer.TelemetrySyncTask;
import io.split.telemetry.synchronizer.TelemetrySynchronizer;
import org.apache.hc.client5.http.auth.AuthScope;
Expand Down Expand Up @@ -123,7 +123,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
// Cache Initialisations
_segmentCache = new SegmentCacheInMemoryImpl();
_splitCache = new InMemoryCacheImp();
_telemetrySynchronizer = new SynchronizerMemory(_httpclient, URI.create(config.get_telemetryURL()), _telemetryStorage, _splitCache, _segmentCache, _telemetryStorage, _startTime);
_telemetrySynchronizer = new TelemetrySubmitter(_httpclient, URI.create(config.get_telemetryURL()), _telemetryStorage, _splitCache, _segmentCache, _telemetryStorage, _startTime);


// Segments
Expand Down Expand Up @@ -182,18 +182,22 @@ public synchronized void destroy() {
if (!isTerminated) {
_log.info("Shutdown called for split");
try {
_segmentSynchronizationTaskImp.close();
_log.info("Successful shutdown of segment fetchers");
_splitSynchronizationTask.close();
_log.info("Successful shutdown of splits");
long splitCount = _splitCache.getAll().stream().count();
long segmentCount = _segmentCache.getAll().stream().count();
long segmentKeyCount = _segmentCache.getAllKeys().stream().count();
_impressionsManager.close();
_log.info("Successful shutdown of impressions manager");
_eventClient.close();
_log.info("Successful shutdown of eventClient");
_segmentSynchronizationTaskImp.close();
_log.info("Successful shutdown of segment fetchers");
_splitSynchronizationTask.close();
_log.info("Successful shutdown of splits");
_syncManager.shutdown();
_log.info("Successful shutdown of syncManager");
_telemetryStorage.recordSessionLength(System.currentTimeMillis() - _startTime);
_telemetrySyncTask.stopScheduledTask();
_telemetrySyncTask.stopScheduledTask(splitCount, segmentCount, segmentKeyCount);
_log.info("Successful shutdown of telemetry sync task");
_httpclient.close();
_log.info("Successful shutdown of httpclient");
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.split.telemetry.synchronizer;

import com.google.common.annotations.VisibleForTesting;
import io.split.cache.SegmentCache;
import io.split.cache.SplitCache;
import io.split.client.SplitClientConfig;
Expand All @@ -23,7 +24,7 @@
import java.util.List;
import java.util.Map;

public class SynchronizerMemory implements TelemetrySynchronizer{
public class TelemetrySubmitter implements TelemetrySynchronizer{

private static final int OPERATION_MODE = 0;
private static final String STORAGE = "memory";
Expand All @@ -34,7 +35,7 @@ public class SynchronizerMemory implements TelemetrySynchronizer{
private SegmentCache _segmentCache;
private final long _initStartTime;

public SynchronizerMemory(CloseableHttpClient client, URI telemetryRootEndpoint, TelemetryStorageConsumer telemetryStorageConsumer, SplitCache splitCache,
public TelemetrySubmitter(CloseableHttpClient client, URI telemetryRootEndpoint, TelemetryStorageConsumer telemetryStorageConsumer, SplitCache splitCache,
SegmentCache segmentCache, TelemetryRuntimeProducer telemetryRuntimeProducer, long initStartTime) throws URISyntaxException {
_httpHttpTelemetryMemorySender = HttpTelemetryMemorySender.create(client, telemetryRootEndpoint, telemetryRuntimeProducer);
_teleTelemetryStorageConsumer = telemetryStorageConsumer;
Expand All @@ -53,7 +54,17 @@ public void synchronizeStats() throws Exception {
_httpHttpTelemetryMemorySender.postStats(generateStats());
}

private Stats generateStats() throws Exception {
@Override
public void finalSynchronization(long splitCount, long segmentCount, long segmentKeyCount) throws Exception {
Stats stats = generateStats();
stats.set_splitCount(splitCount);
stats.set_segmentCount(segmentCount);
stats.set_segmentKeyCount(segmentKeyCount);
_httpHttpTelemetryMemorySender.postStats(stats);
}

@VisibleForTesting
Stats generateStats() throws Exception {
Stats stats = new Stats();
stats.set_lastSynchronization(_teleTelemetryStorageConsumer.getLastSynchronization());
stats.set_methodLatencies(_teleTelemetryStorageConsumer.popLatencies());
Expand All @@ -76,7 +87,8 @@ private Stats generateStats() throws Exception {
return stats;
}

private Config generateConfig(SplitClientConfig splitClientConfig, long readyTimestamp, Map<String, Long> factoryInstances, List<String> tags) {
@VisibleForTesting
Config generateConfig(SplitClientConfig splitClientConfig, long readyTimestamp, Map<String, Long> factoryInstances, List<String> tags) {
Config config = new Config();
Rates rates = new Rates();
URLOverrides urlOverrides = new URLOverrides();
Expand All @@ -93,11 +105,11 @@ private Config generateConfig(SplitClientConfig splitClientConfig, long readyTim
rates.set_segments(splitClientConfig.segmentsRefreshRate());
rates.set_splits(splitClientConfig.featuresRefreshRate());

urlOverrides.set_auth(SplitClientConfig.AUTH_ENDPOINT.equals(splitClientConfig.authServiceURL()));
urlOverrides.set_stream(SplitClientConfig.STREAMING_ENDPOINT.equals(splitClientConfig.streamingServiceURL()));
urlOverrides.set_sdk(SplitClientConfig.SDK_ENDPOINT.equals(splitClientConfig.endpoint()));
urlOverrides.set_events(SplitClientConfig.EVENTS_ENDPOINT.equals(splitClientConfig.eventsEndpoint()));
urlOverrides.set_telemetry(SplitClientConfig.TELEMETRY_ENDPOINT.equals(splitClientConfig.get_telemetryURL()));
urlOverrides.set_auth(!SplitClientConfig.AUTH_ENDPOINT.equals(splitClientConfig.authServiceURL()));
urlOverrides.set_stream(!SplitClientConfig.STREAMING_ENDPOINT.equals(splitClientConfig.streamingServiceURL()));
urlOverrides.set_sdk(!SplitClientConfig.SDK_ENDPOINT.equals(splitClientConfig.endpoint()));
urlOverrides.set_events(!SplitClientConfig.EVENTS_ENDPOINT.equals(splitClientConfig.eventsEndpoint()));
urlOverrides.set_telemetry(!SplitClientConfig.TELEMETRY_ENDPOINT.equals(splitClientConfig.get_telemetryURL()));

config.set_burTimeouts(_teleTelemetryStorageConsumer.getBURTimeouts());
config.set_nonReadyUsages(_teleTelemetryStorageConsumer.getNonReadyUsages());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.split.client.SplitManagerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -10,6 +13,7 @@

public class TelemetrySyncTask {

private static final Logger _log = LoggerFactory.getLogger(TelemetrySyncTask.class);
private final ScheduledExecutorService _telemetrySyncScheduledExecutorService;
private final TelemetrySynchronizer _telemetrySynchronizer;
private final int _telemetryRefreshRate;
Expand All @@ -30,21 +34,21 @@ public TelemetrySyncTask(int telemetryRefreshRate, TelemetrySynchronizer telemet
}

@VisibleForTesting
protected void startScheduledTask() throws Exception {
protected void startScheduledTask() {
_telemetrySyncScheduledExecutorService.scheduleWithFixedDelay(() -> {
try {
_telemetrySynchronizer.synchronizeStats();
} catch (Exception e) {
e.printStackTrace();
}
},0l, _telemetryRefreshRate, TimeUnit.SECONDS);
},_telemetryRefreshRate, _telemetryRefreshRate, TimeUnit.SECONDS);
}

public void stopScheduledTask() {
public void stopScheduledTask(long splitCount, long segmentCount, long segmentKeyCount) {
try {
_telemetrySynchronizer.synchronizeStats();
_telemetrySynchronizer.finalSynchronization(splitCount, segmentCount, segmentKeyCount);
} catch (Exception e) {
e.printStackTrace();
_log.warn("Error trying to send telemetry stats.");
}
_telemetrySyncScheduledExecutorService.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
public interface TelemetrySynchronizer {
void synchronizeConfig(SplitClientConfig config, long timeUntilReady, Map<String, Long> factoryInstances, List<String> tags);
void synchronizeStats() throws Exception;
void finalSynchronization(long splitCount, long segmentCount, long segmentKeyCount) throws Exception;
}
16 changes: 16 additions & 0 deletions client/src/test/java/io/split/client/SplitFactoryImplTest.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package io.split.client;

import io.split.client.impressions.ImpressionsManager;
import io.split.engine.segments.SegmentSynchronizationTaskImp;
import io.split.integrations.IntegrationsConfig;
import io.split.telemetry.storage.TelemetryStorage;
import junit.framework.TestCase;
import org.junit.Test;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.URISyntaxException;

public class SplitFactoryImplTest extends TestCase {
Expand Down Expand Up @@ -89,6 +94,7 @@ public void testFactoryInstantiationWithProxy() throws Exception {

@Test
public void testFactoryDestroy() throws Exception {
TelemetryStorage telemetryStorage = Mockito.mock(TelemetryStorage.class);
SplitClientConfig splitClientConfig = SplitClientConfig.builder()
.enableDebug()
.impressionsMode(ImpressionsManager.Mode.DEBUG)
Expand All @@ -98,10 +104,20 @@ public void testFactoryDestroy() throws Exception {
.authServiceURL(AUTH_SERVICE)
.setBlockUntilReadyTimeout(10000)
.build();

SplitFactoryImpl splitFactory = new SplitFactoryImpl(API_KEY, splitClientConfig);
//Before destroy we replace telemetryStorage via reflection.
Field factoryDestroy = SplitFactoryImpl.class.getDeclaredField("_telemetryStorage");
factoryDestroy.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(factoryDestroy, factoryDestroy.getModifiers() & ~Modifier.FINAL);

factoryDestroy.set(splitFactory, telemetryStorage);
splitFactory.destroy();

assertTrue(splitFactory.isDestroyed());
Mockito.verify(telemetryStorage, Mockito.times(1)).recordSessionLength(Mockito.anyLong());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import io.split.client.dtos.KeyImpression;
import io.split.client.dtos.TestImpressions;

import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum;
import io.split.telemetry.storage.InMemoryTelemetryStorage;
import io.split.telemetry.storage.TelemetryStorage;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -21,15 +23,19 @@

import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

/**
* Created by patricioe on 6/20/16.
*/
@RunWith(MockitoJUnitRunner.class)
public class ImpressionsManagerImplTest {
private static final TelemetryStorage TELEMETRY_STORAGE = Mockito.mock(InMemoryTelemetryStorage.class);
private static TelemetryStorage TELEMETRY_STORAGE = Mockito.mock(InMemoryTelemetryStorage.class);

@Before
public void setUp() {
TELEMETRY_STORAGE = Mockito.mock(InMemoryTelemetryStorage.class);
}

@Captor
private ArgumentCaptor<List<TestImpressions>> impressionsCaptor;
Expand Down Expand Up @@ -102,6 +108,7 @@ public void worksButDropsImpressions() throws URISyntaxException {
List<TestImpressions> captured = impressionsCaptor.getValue();

assertThat(captured.size(), is(equalTo(3)));
Mockito.verify(TELEMETRY_STORAGE, times(1)).recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, 1);
}

@Test
Expand Down Expand Up @@ -138,6 +145,8 @@ public void works4ImpressionsInOneTest() throws URISyntaxException {
assertThat(captured.size(), is(equalTo(1)));
assertThat(captured.get(0).keyImpressions.size(), is(equalTo(4)));
assertThat(captured.get(0).keyImpressions.get(0), is(equalTo(ki1)));
Mockito.verify(TELEMETRY_STORAGE, times(2)).recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DEDUPED, 1);
Mockito.verify(TELEMETRY_STORAGE, times(4)).recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_QUEUED, 1);
}

@Test
Expand Down
12 changes: 11 additions & 1 deletion client/src/test/java/io/split/engine/sse/AuthApiClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.core5.http.HttpStatus;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;

public class AuthApiClientTest {
private static final TelemetryStorage TELEMETRY_STORAGE = Mockito.mock(InMemoryTelemetryStorage.class);
private static TelemetryStorage TELEMETRY_STORAGE = Mockito.mock(InMemoryTelemetryStorage.class);

@Before
public void setUp() {
TELEMETRY_STORAGE = Mockito.mock(InMemoryTelemetryStorage.class);
}
@Test
public void authenticateWithPushEnabledShouldReturnSuccess() throws IOException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
CloseableHttpClient httpClientMock = TestHelper.mockHttpClient("streaming-auth-push-enabled.json", HttpStatus.SC_OK);
Expand All @@ -28,6 +34,9 @@ public void authenticateWithPushEnabledShouldReturnSuccess() throws IOException,
Assert.assertFalse(result.isRetry());
Assert.assertFalse(StringUtils.isEmpty(result.getToken()));
Assert.assertTrue(result.getExpiration() > 0);
Mockito.verify(TELEMETRY_STORAGE, Mockito.times(1)).recordTokenRefreshes();
Mockito.verify(TELEMETRY_STORAGE, Mockito.times(1)).recordSyncLatency(Mockito.anyObject(), Mockito.anyLong());
Mockito.verify(TELEMETRY_STORAGE, Mockito.times(1)).recordSuccessfulSync(Mockito.anyObject(), Mockito.anyLong());

}

Expand Down Expand Up @@ -95,5 +104,6 @@ public void authenticateServerUnauthorizedShouldReturnErrorWithoutRetry() throws
Assert.assertTrue(StringUtils.isEmpty(result.getChannels()));
Assert.assertTrue(StringUtils.isEmpty(result.getToken()));
Assert.assertFalse(result.isRetry());
Mockito.verify(TELEMETRY_STORAGE, Mockito.times(1)).recordAuthRejections();
}
}

This file was deleted.

Loading