Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 126 additions & 93 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import io.split.engine.SDKReadinessGates;
import io.split.engine.common.SyncManager;
import io.split.engine.common.SyncManagerImp;
import io.split.engine.experiments.SplitFetcherImp;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.experiments.SplitChangeFetcher;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitFetcherImp;
import io.split.engine.experiments.SplitParser;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentChangeFetcher;
import io.split.cache.SegmentCache;
import io.split.cache.SegmentCacheInMemoryImpl;
Expand Down Expand Up @@ -62,10 +63,30 @@ public class SplitFactoryImpl implements SplitFactory {

private static Random RANDOM = new Random();

private final URI _rootTarget;
private final URI _eventsRootTarget;
private final CloseableHttpClient _httpclient;
private final SDKReadinessGates _gates;
private final HttpMetrics _httpMetrics;
private final FireAndForgetMetrics _unCachedFireAndForget;
private final SegmentSynchronizationTaskImp _segmentSynchronizationTaskImp;
private final SplitFetcher _splitFetcher;
private final SplitSynchronizationTask _splitSynchronizationTask;
private final ImpressionsManagerImpl _impressionsManager;
private final FireAndForgetMetrics _cachedFireAndForgetMetrics;
private final EventClient _eventClient;
private final SyncManager _syncManager;
private final Evaluator _evaluator;
private final String _apiToken;

// Caches
private final SegmentCache _segmentCache;
private final SplitCache _splitCache;

// Client and Manager
private final SplitClient _client;
private final SplitManager _manager;
private final Runnable destroyer;
private final String _apiToken;

private boolean isTerminated = false;
private final ApiKeyCounter _apiKeyCounter;

Expand All @@ -81,108 +102,64 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn

}

final CloseableHttpClient httpclient = buildHttpClient(apiToken, config);
// SDKReadinessGates
_gates = new SDKReadinessGates();

URI rootTarget = URI.create(config.endpoint());
URI eventsRootTarget = URI.create(config.eventsEndpoint());

// Metrics
HttpMetrics httpMetrics = HttpMetrics.create(httpclient, eventsRootTarget);
final FireAndForgetMetrics uncachedFireAndForget = FireAndForgetMetrics.instance(httpMetrics, 2, 1000);
// HttpClient
_httpclient = buildHttpClient(apiToken, config);

SDKReadinessGates gates = new SDKReadinessGates();
// Roots
_rootTarget = URI.create(config.endpoint());
_eventsRootTarget = URI.create(config.eventsEndpoint());

// Segments
SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher.create(httpclient, rootTarget, uncachedFireAndForget);
//This segmentCache is for inMemory Storage (the only one supported by java-client for the moment
SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
final SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher,
findPollingPeriod(RANDOM, config.segmentsRefreshRate()),
config.numThreadsForSegmentFetch(),
gates,
segmentCache);
// HttpMetrics
_httpMetrics = HttpMetrics.create(_httpclient, _eventsRootTarget);

SplitParser splitParser = new SplitParser(segmentSynchronizationTaskImp, segmentCache);
// Cache Initialisations
_segmentCache = new SegmentCacheInMemoryImpl();
_splitCache = new InMemoryCacheImp();

// Feature Changes
SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(httpclient, rootTarget, uncachedFireAndForget);
// Metrics
_unCachedFireAndForget = FireAndForgetMetrics.instance(_httpMetrics, 2, 1000);

final SplitCache splitCache = new InMemoryCacheImp();
final SplitFetcherImp splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, gates, splitCache);
final SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask(splitFetcher, splitCache, findPollingPeriod(RANDOM, config.featuresRefreshRate()));
// Segments
_segmentSynchronizationTaskImp = buildSegments(config);

List<ImpressionListener> impressionListeners = new ArrayList<>();
// Setup integrations
if (config.integrationsConfig() != null) {
config.integrationsConfig().getImpressionsListeners(IntegrationsConfig.Execution.ASYNC).stream()
.map(l -> AsynchronousImpressionListener.build(l.listener(), l.queueSize()))
.collect(Collectors.toCollection(() -> impressionListeners));
// SplitFetcher
_splitFetcher = buildSplitFetcher();

config.integrationsConfig().getImpressionsListeners(IntegrationsConfig.Execution.SYNC).stream()
.map(IntegrationsConfig.ImpressionListenerWithMeta::listener)
.collect(Collectors.toCollection(() -> impressionListeners));
}
// SplitSynchronizationTask
_splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher, _splitCache, findPollingPeriod(RANDOM, config.featuresRefreshRate()));

// Impressions
final ImpressionsManagerImpl impressionsManager = ImpressionsManagerImpl.instance(httpclient, config, impressionListeners);
_impressionsManager = buildImpressionsManager(config);

CachedMetrics cachedMetrics = new CachedMetrics(httpMetrics, TimeUnit.SECONDS.toMillis(config.metricsRefreshRate()));
final FireAndForgetMetrics cachedFireAndForgetMetrics = FireAndForgetMetrics.instance(cachedMetrics, 2, 1000);
// CachedFireAndForgetMetrics
_cachedFireAndForgetMetrics = buildCachedFireAndForgetMetrics(config);

final EventClient eventClient = EventClientImpl.create(httpclient, eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown());
// EventClient
_eventClient = EventClientImpl.create(_httpclient, _eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown());

// SyncManager
final SyncManager syncManager = SyncManagerImp.build(config.streamingEnabled(), splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, config.authServiceURL(), httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config), segmentCache);
syncManager.start();
_syncManager = SyncManagerImp.build(config.streamingEnabled(), _splitSynchronizationTask, _splitFetcher, _segmentSynchronizationTaskImp, _splitCache, config.authServiceURL(), _httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config), _segmentCache);
_syncManager.start();

// Evaluator
final Evaluator evaluator = new EvaluatorImp(splitCache);

destroyer = new Runnable() {
public void run() {
_log.info("Shutdown called for split");
try {
segmentSynchronizationTaskImp.close();
_log.info("Successful shutdown of segment fetchers");
splitSynchronizationTask.close();
_log.info("Successful shutdown of splits");
impressionsManager.close();
_log.info("Successful shutdown of impressions manager");
uncachedFireAndForget.close();
_log.info("Successful shutdown of metrics 1");
cachedFireAndForgetMetrics.close();
_log.info("Successful shutdown of metrics 2");
httpclient.close();
_log.info("Successful shutdown of httpclient");
eventClient.close();
_log.info("Successful shutdown of eventClient");
new Thread(syncManager::shutdown).start();
_log.info("Successful shutdown of syncManager");
} catch (IOException e) {
_log.error("We could not shutdown split", e);
}
}
};
_evaluator = new EvaluatorImp(_splitCache);

// SplitClient
_client = new SplitClientImpl(this, _splitCache, _impressionsManager, _cachedFireAndForgetMetrics, _eventClient, config, _gates, _evaluator);

// SplitManager
_manager = new SplitManagerImpl(_splitCache, config, _gates);

// DestroyOnShutDown
if (config.destroyOnShutDown()) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Using the full path to avoid conflicting with Thread.destroy()
SplitFactoryImpl.this.destroy();
}
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Using the full path to avoid conflicting with Thread.destroy()
SplitFactoryImpl.this.destroy();
}));
}

_client = new SplitClientImpl(this,
splitCache,
impressionsManager,
cachedFireAndForgetMetrics,
eventClient,
config,
gates,
evaluator);
_manager = new SplitManagerImpl(splitCache, config, gates);
}

@Override
Expand All @@ -196,13 +173,31 @@ public SplitManager manager() {
}

@Override
public void destroy() {
synchronized (SplitFactoryImpl.class) {
if (!isTerminated) {
destroyer.run();
_apiKeyCounter.remove(_apiToken);
isTerminated = true;
public synchronized void destroy() {
if (!isTerminated) {
_log.info("Shutdown called for split");
try {
_segmentSynchronizationTaskImp.close();
_log.info("Successful shutdown of segment fetchers");
_splitSynchronizationTask.close();
_log.info("Successful shutdown of splits");
_impressionsManager.close();
_log.info("Successful shutdown of impressions manager");
_unCachedFireAndForget.close();
_log.info("Successful shutdown of metrics 1");
_cachedFireAndForgetMetrics.close();
_log.info("Successful shutdown of metrics 2");
_httpclient.close();
_log.info("Successful shutdown of httpclient");
_eventClient.close();
_log.info("Successful shutdown of eventClient");
_syncManager.shutdown();
_log.info("Successful shutdown of syncManager");
} catch (IOException e) {
_log.error("We could not shutdown split", e);
}
_apiKeyCounter.remove(_apiToken);
isTerminated = true;
}
}

Expand Down Expand Up @@ -299,4 +294,42 @@ private static int findPollingPeriod(Random rand, int max) {
int min = max / 2;
return rand.nextInt((max - min) + 1) + min;
}

private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config) throws URISyntaxException {
SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher.create(_httpclient, _rootTarget, _unCachedFireAndForget);

return new SegmentSynchronizationTaskImp(segmentChangeFetcher,
findPollingPeriod(RANDOM, config.segmentsRefreshRate()),
config.numThreadsForSegmentFetch(),
_gates,
_segmentCache);
}

private SplitFetcher buildSplitFetcher() throws URISyntaxException {
SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(_httpclient, _rootTarget, _unCachedFireAndForget);
SplitParser splitParser = new SplitParser(_segmentSynchronizationTaskImp, _segmentCache);

return new SplitFetcherImp(splitChangeFetcher, splitParser, _gates, _splitCache);
}

private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config) throws URISyntaxException {
List<ImpressionListener> impressionListeners = new ArrayList<>();
if (config.integrationsConfig() != null) {
config.integrationsConfig().getImpressionsListeners(IntegrationsConfig.Execution.ASYNC).stream()
.map(l -> AsynchronousImpressionListener.build(l.listener(), l.queueSize()))
.collect(Collectors.toCollection(() -> impressionListeners));

config.integrationsConfig().getImpressionsListeners(IntegrationsConfig.Execution.SYNC).stream()
.map(IntegrationsConfig.ImpressionListenerWithMeta::listener)
.collect(Collectors.toCollection(() -> impressionListeners));
}

return ImpressionsManagerImpl.instance(_httpclient, config, impressionListeners);
}

private FireAndForgetMetrics buildCachedFireAndForgetMetrics(SplitClientConfig config) {
CachedMetrics cachedMetrics = new CachedMetrics(_httpMetrics, TimeUnit.SECONDS.toMillis(config.metricsRefreshRate()));

return FireAndForgetMetrics.instance(cachedMetrics, 2, 1000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.split.cache.SegmentCache;
import io.split.cache.SplitCache;
import io.split.engine.experiments.SplitFetcherImp;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentSynchronizationTaskImp;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
Expand Down Expand Up @@ -48,7 +48,7 @@ public class SyncManagerImp implements SyncManager {

public static SyncManagerImp build(boolean streamingEnabledConfig,
SplitSynchronizationTask splitSynchronizationTask,
SplitFetcherImp splitFetcher,
SplitFetcher splitFetcher,
SegmentSynchronizationTaskImp segmentSynchronizationTaskImp,
SplitCache splitCache,
String authUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.split.cache.SegmentCache;
import io.split.cache.SplitCache;
import io.split.engine.experiments.SplitFetcherImp;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentFetcher;
import io.split.engine.segments.SegmentSynchronizationTask;
Expand All @@ -21,14 +21,14 @@ public class SynchronizerImp implements Synchronizer {
private static final Logger _log = LoggerFactory.getLogger(Synchronizer.class);

private final SplitSynchronizationTask _splitSynchronizationTask;
private final SplitFetcherImp _splitFetcher;
private final SplitFetcher _splitFetcher;
private final SegmentSynchronizationTask _segmentSynchronizationTaskImp;
private final ScheduledExecutorService _syncAllScheduledExecutorService;
private final SplitCache _splitCache;
private final SegmentCache _segmentCache;

public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
SplitFetcherImp splitFetcher,
SplitFetcher splitFetcher,
SegmentSynchronizationTask segmentSynchronizationTaskImp,
SplitCache splitCache,
SegmentCache segmentCache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/**
* Created by adilaijaz on 5/8/15.
*/
public interface SplitFetcher {
public interface SplitFetcher extends Runnable {
/**
* Forces a sync of splits, outside of any scheduled
* syncs. This method MUST NOT throw any exceptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
* @author adil
*/
public class SplitFetcherImp implements SplitFetcher, Runnable {
public class SplitFetcherImp implements SplitFetcher {

private static final Logger _log = LoggerFactory.getLogger(SplitFetcherImp.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public class SplitSynchronizationTask implements Closeable {
private static final Logger _log = LoggerFactory.getLogger(SplitSynchronizationTask.class);

private final AtomicReference<SplitFetcherImp> _splitFetcher = new AtomicReference<SplitFetcherImp>();
private final AtomicReference<SplitFetcher> _splitFetcher = new AtomicReference<>();
private final AtomicReference<SplitCache> _splitCache = new AtomicReference<SplitCache>();
private final AtomicReference<ScheduledExecutorService> _executorService = new AtomicReference<>();
private final AtomicLong _refreshEveryNSeconds;
Expand All @@ -36,7 +36,7 @@ public class SplitSynchronizationTask implements Closeable {

private ScheduledFuture<?> _scheduledFuture;

public SplitSynchronizationTask(SplitFetcherImp splitFetcher, SplitCache splitCache, long refreshEveryNSeconds) {
public SplitSynchronizationTask(SplitFetcher splitFetcher, SplitCache splitCache, long refreshEveryNSeconds) {
_splitFetcher.set(checkNotNull(splitFetcher));
_splitCache.set(checkNotNull(splitCache));
checkArgument(refreshEveryNSeconds >= 0L);
Expand Down