Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.split.engine.cache;
package io.split.cache;

import com.google.common.collect.ConcurrentHashMultiset;
import com.google.common.collect.Maps;
Expand All @@ -11,7 +11,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -100,6 +99,25 @@ public boolean trafficTypeExists(String trafficTypeName) {
return Sets.newHashSet(_concurrentTrafficTypeNameSet.elementSet()).contains(trafficTypeName);
}

@Override
public void kill(String splitName, String defaultTreatment, long changeNumber) {
ParsedSplit parsedSplit = _concurrentMap.get(splitName);

ParsedSplit updatedSplit = new ParsedSplit(parsedSplit.feature(),
parsedSplit.seed(),
true,
defaultTreatment,
parsedSplit.parsedConditions(),
parsedSplit.trafficTypeName(),
changeNumber,
parsedSplit.trafficAllocation(),
parsedSplit.trafficAllocationSeed(),
parsedSplit.algo(),
parsedSplit.configurations());

_concurrentMap.put(splitName, updatedSplit);
}

@Override
public void clear() {
_concurrentMap.clear();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package io.split.engine.cache;
package io.split.cache;

import io.split.engine.experiments.ParsedSplit;

import java.util.Collection;
import java.util.List;
import java.util.Set;

public interface SplitCache {
void put(ParsedSplit split);
Expand All @@ -15,5 +14,6 @@ public interface SplitCache {
long getChangeNumber();
void setChangeNumber(long changeNumber);
boolean trafficTypeExists(String trafficTypeName);
void kill(String splitName, String defaultTreatment, long changeNumber);
void clear();
}
17 changes: 12 additions & 5 deletions client/src/main/java/io/split/client/SplitClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import io.split.client.dtos.Event;
import io.split.client.impressions.Impression;
import io.split.client.impressions.ImpressionsManager;
import io.split.cache.SplitCache;
import io.split.engine.evaluator.Evaluator;
import io.split.engine.SDKReadinessGates;
import io.split.engine.evaluator.EvaluatorImp;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.metrics.Metrics;
import io.split.grammar.Treatments;
import org.slf4j.Logger;
Expand All @@ -32,11 +32,12 @@ public final class SplitClientImpl implements SplitClient {
public static final SplitResult SPLIT_RESULT_CONTROL = new SplitResult(Treatments.CONTROL, null);

private static final String GET_TREATMENT_LABEL = "sdk.getTreatment";
private static final String DEFINITION_NOT_FOUND = "definition not found";

private static final Logger _log = LoggerFactory.getLogger(SplitClientImpl.class);

private final SplitFactory _container;
private final SplitFetcher _splitFetcher;
private final SplitCache _splitCache;
private final ImpressionsManager _impressionManager;
private final Metrics _metrics;
private final SplitClientConfig _config;
Expand All @@ -45,15 +46,15 @@ public final class SplitClientImpl implements SplitClient {
private final Evaluator _evaluator;

public SplitClientImpl(SplitFactory container,
SplitFetcher splitFetcher,
SplitCache splitCache,
ImpressionsManager impressionManager,
Metrics metrics,
EventClient eventClient,
SplitClientConfig config,
SDKReadinessGates gates,
Evaluator evaluator) {
_container = container;
_splitFetcher = checkNotNull(splitFetcher);
_splitCache = checkNotNull(splitCache);
_impressionManager = checkNotNull(impressionManager);
_metrics = metrics;
_eventClient = eventClient;
Expand Down Expand Up @@ -192,7 +193,7 @@ private boolean track(Event event) {
event.trafficTypeName = event.trafficTypeName.toLowerCase();
}

if (!_splitFetcher.trafficTypeExists(event.trafficTypeName)) {
if (!_splitCache.trafficTypeExists(event.trafficTypeName)) {
_log.warn("track: Traffic Type " + event.trafficTypeName + " does not have any corresponding Splits in this environment, " +
"make sure you’re tracking your events to a valid traffic type defined in the Split console.");
}
Expand Down Expand Up @@ -314,6 +315,12 @@ private SplitResult getTreatmentWithConfigInternal(String label, String matching

EvaluatorImp.TreatmentLabelAndChangeNumber result = _evaluator.evaluateFeature(matchingKey, bucketingKey, split, attributes);

if (result.treatment.equals(Treatments.CONTROL) && result.label.equals(DEFINITION_NOT_FOUND) && _gates.isSDKReadyNow()) {
_log.warn(
"getTreatment: you passed \"" + split + "\" that does not exist in this environment, " +
"please double check what Splits exist in the web console.");
}

recordStats(
matchingKey,
bucketingKey,
Expand Down
30 changes: 14 additions & 16 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@
import io.split.client.metrics.CachedMetrics;
import io.split.client.metrics.FireAndForgetMetrics;
import io.split.client.metrics.HttpMetrics;
import io.split.engine.cache.InMemoryCacheImp;
import io.split.engine.cache.SplitCache;
import io.split.cache.InMemoryCacheImp;
import io.split.cache.SplitCache;
import io.split.engine.evaluator.Evaluator;
import io.split.engine.evaluator.EvaluatorImp;
import io.split.engine.SDKReadinessGates;
import io.split.engine.common.SyncManager;
import io.split.engine.common.SyncManagerImp;
import io.split.engine.experiments.RefreshableSplitFetcherProvider;
import io.split.engine.experiments.SplitFetcherImp;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.experiments.SplitChangeFetcher;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitParser;
import io.split.engine.segments.RefreshableSegmentFetcher;
import io.split.engine.segments.SegmentChangeFetcher;
Expand Down Expand Up @@ -177,7 +177,6 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn

}


final CloseableHttpClient httpclient = buildHttpClient(apiToken, config);

URI rootTarget = URI.create(config.endpoint());
Expand All @@ -202,9 +201,9 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
// Feature Changes
SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(httpclient, rootTarget, uncachedFireAndForget);

final SplitCache cache = new InMemoryCacheImp();
final RefreshableSplitFetcherProvider splitFetcherProvider = new RefreshableSplitFetcherProvider(splitChangeFetcher, splitParser, findPollingPeriod(RANDOM, config.featuresRefreshRate()), gates, cache);

final SplitCache splitCache = new InMemoryCacheImp();
final SplitFetcherImp splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, gates, splitCache);
final SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask(splitFetcher, splitCache, findPollingPeriod(RANDOM, config.featuresRefreshRate()));

List<ImpressionListener> impressionListeners = new ArrayList<>();
// Setup integrations
Expand All @@ -227,16 +226,19 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
final EventClient eventClient = EventClientImpl.create(httpclient, eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown());

// SyncManager
final SyncManager syncManager = SyncManagerImp.build(config.streamingEnabled(), splitFetcherProvider, segmentFetcher, config.authServiceURL(), httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config));
final SyncManager syncManager = SyncManagerImp.build(config.streamingEnabled(), splitSynchronizationTask, splitFetcher, segmentFetcher, splitCache, config.authServiceURL(), httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config));
syncManager.start();

// Evaluator
final Evaluator evaluator = new EvaluatorImp(splitCache);

destroyer = new Runnable() {
public void run() {
_log.info("Shutdown called for split");
try {
segmentFetcher.close();
_log.info("Successful shutdown of segment fetchers");
splitFetcherProvider.close();
splitSynchronizationTask.close();
_log.info("Successful shutdown of splits");
impressionsManager.close();
_log.info("Successful shutdown of impressions manager");
Expand Down Expand Up @@ -266,19 +268,15 @@ public void run() {
});
}


SplitFetcher splitFetcher = splitFetcherProvider.getFetcher();
Evaluator evaluator = new EvaluatorImp(gates, splitFetcher);

_client = new SplitClientImpl(this,
splitFetcher,
splitCache,
impressionsManager,
cachedFireAndForgetMetrics,
eventClient,
config,
gates,
evaluator);
_manager = new SplitManagerImpl(splitFetcherProvider.getFetcher(), config, gates);
_manager = new SplitManagerImpl(splitCache, config, gates);
}

private static int findPollingPeriod(Random rand, int max) {
Expand Down
16 changes: 9 additions & 7 deletions client/src/main/java/io/split/client/SplitManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@
import io.split.client.api.SplitView;
import io.split.client.dtos.Partition;
import io.split.engine.SDKReadinessGates;
import io.split.cache.SplitCache;
import io.split.engine.experiments.ParsedCondition;
import io.split.engine.experiments.ParsedSplit;
import io.split.engine.experiments.SplitFetcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import java.util.concurrent.TimeoutException;

/**
Expand All @@ -24,23 +26,23 @@ public class SplitManagerImpl implements SplitManager {

private static final Logger _log = LoggerFactory.getLogger(SplitManagerImpl.class);

private final SplitFetcher _splitFetcher;
private final SplitCache _splitCache;
private final SplitClientConfig _config;
private final SDKReadinessGates _gates;


public SplitManagerImpl(SplitFetcher splitFetcher,
public SplitManagerImpl(SplitCache splitCache,
SplitClientConfig config,
SDKReadinessGates gates) {
_config = Preconditions.checkNotNull(config);
_splitFetcher = Preconditions.checkNotNull(splitFetcher);
_splitCache = Preconditions.checkNotNull(splitCache);
_gates = Preconditions.checkNotNull(gates);
}

@Override
public List<SplitView> splits() {
List<SplitView> result = new ArrayList<>();
List<ParsedSplit> parsedSplits = _splitFetcher.fetchAll();
Collection<ParsedSplit> parsedSplits = _splitCache.getAll();
for (ParsedSplit split : parsedSplits) {
result.add(toSplitView(split));
}
Expand All @@ -57,7 +59,7 @@ public SplitView split(String featureName) {
_log.error("split: you passed an empty split name, split name must be a non-empty string");
return null;
}
ParsedSplit parsedSplit = _splitFetcher.fetch(featureName);
ParsedSplit parsedSplit = _splitCache.get(featureName);
if (parsedSplit == null) {
if (_gates.isSDKReadyNow()) {
_log.warn("split: you passed \"" + featureName + "\" that does not exist in this environment, " +
Expand All @@ -71,7 +73,7 @@ public SplitView split(String featureName) {
@Override
public List<String> splitNames() {
List<String> result = new ArrayList<>();
List<ParsedSplit> parsedSplits = _splitFetcher.fetchAll();
Collection<ParsedSplit> parsedSplits = _splitCache.getAll();
for (ParsedSplit split : parsedSplits) {
result.add(split.feature());
}
Expand Down
15 changes: 10 additions & 5 deletions client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package io.split.client.jmx;

import io.split.client.SplitClient;
import io.split.cache.SplitCache;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.segments.SegmentFetcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* Created by patricioe on 1/18/16.
*/
Expand All @@ -15,12 +18,14 @@ public class SplitJmxMonitor implements SplitJmxMonitorMBean {

private final SplitClient _client;
private final SplitFetcher _featureFetcher;
private final SplitCache _splitCache;
private final SegmentFetcher _segmentFetcher;

public SplitJmxMonitor(SplitClient splitClient, SplitFetcher fetcher, SegmentFetcher segmentFetcher) {
_client = splitClient;
_featureFetcher = fetcher;
_segmentFetcher = segmentFetcher;
public SplitJmxMonitor(SplitClient splitClient, SplitFetcher featureFetcher, SplitCache splitCache, SegmentFetcher segmentFetcher) {
_client = checkNotNull(splitClient);
_featureFetcher = checkNotNull(featureFetcher);
_splitCache = checkNotNull(splitCache);
_segmentFetcher = checkNotNull(segmentFetcher);
}

@Override
Expand All @@ -44,7 +49,7 @@ public String getTreatment(String key, String featureName) {

@Override
public String fetchDefinition(String featureName) {
return _featureFetcher.fetch(featureName).toString();
return _splitCache.get(featureName).toString();
}

@Override
Expand Down
10 changes: 7 additions & 3 deletions client/src/main/java/io/split/engine/common/SyncManagerImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.split.engine.experiments.RefreshableSplitFetcherProvider;
import io.split.cache.SplitCache;
import io.split.engine.experiments.SplitFetcherImp;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.RefreshableSegmentFetcher;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.slf4j.Logger;
Expand Down Expand Up @@ -44,15 +46,17 @@ public class SyncManagerImp implements SyncManager {
}

public static SyncManagerImp build(boolean streamingEnabledConfig,
RefreshableSplitFetcherProvider refreshableSplitFetcherProvider,
SplitSynchronizationTask splitSynchronizationTask,
SplitFetcherImp splitFetcher,
RefreshableSegmentFetcher segmentFetcher,
SplitCache splitCache,
String authUrl,
CloseableHttpClient httpClient,
String streamingServiceUrl,
int authRetryBackOffBase,
CloseableHttpClient sseHttpClient) {
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
Synchronizer synchronizer = new SynchronizerImp(refreshableSplitFetcherProvider, segmentFetcher);
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentFetcher, splitCache);
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, authRetryBackOffBase, pushMessages, sseHttpClient);
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages);
}
Expand Down
Loading