Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.9.0</version>
<version>4.10.0-rc1</version>
</parent>
<artifactId>java-client</artifactId>
<packaging>jar</packaging>
Expand Down Expand Up @@ -149,7 +149,7 @@
<dependency>
<groupId>io.split.client</groupId>
<artifactId>pluggable-storage</artifactId>
<version>2.0.0</version>
<version>2.1.0-rc1</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
175 changes: 105 additions & 70 deletions client/src/main/java/io/split/client/SplitClientImpl.java

Large diffs are not rendered by default.

19 changes: 10 additions & 9 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.split.client.impressions.strategy.ProcessImpressionStrategy;
import io.split.client.interceptors.AuthorizationInterceptorFilter;
import io.split.client.interceptors.ClientKeyInterceptorFilter;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.client.interceptors.FlagSetsFilterImpl;
import io.split.client.interceptors.GzipDecoderResponseInterceptor;
import io.split.client.interceptors.GzipEncoderRequestInterceptor;
Expand Down Expand Up @@ -110,7 +111,6 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -190,7 +190,8 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn

// Cache Initialisations
SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
SplitCache splitCache = new InMemoryCacheImp(config.getSetsFilter());
FlagSetsFilter flagSetsFilter = new FlagSetsFilterImpl(config.getSetsFilter());
SplitCache splitCache = new InMemoryCacheImp(flagSetsFilter);
ImpressionsStorage impressionsStorage = new InMemoryImpressionsStorage(config.impressionsQueueSize());
_splitCache = splitCache;
_segmentCache = segmentCache;
Expand All @@ -202,7 +203,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn

SplitParser splitParser = new SplitParser();
// SplitFetcher
_splitFetcher = buildSplitFetcher(splitCache, splitParser, config.getSetsFilter());
_splitFetcher = buildSplitFetcher(splitCache, splitParser, flagSetsFilter);

// SplitSynchronizationTask
_splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher,
Expand Down Expand Up @@ -250,7 +251,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
SplitAPI splitAPI = SplitAPI.build(_httpclient, buildSSEdHttpClient(apiToken, config, _sdkMetadata));

_syncManager = SyncManagerImp.build(splitTasks, _splitFetcher, splitCache, splitAPI,
segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config, splitParser);
segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config, splitParser, flagSetsFilter);
_syncManager.start();

// DestroyOnShutDown
Expand Down Expand Up @@ -355,7 +356,8 @@ protected SplitFactoryImpl(SplitClientConfig config) {
_telemetryStorageProducer = new NoopTelemetryStorage();

SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
SplitCache splitCache = new InMemoryCacheImp(config.getSetsFilter());
FlagSetsFilter flagSetsFilter = new FlagSetsFilterImpl(config.getSetsFilter());
SplitCache splitCache = new InMemoryCacheImp(flagSetsFilter);
_splitCache = splitCache;
_gates = new SDKReadinessGates();
_segmentCache = segmentCache;
Expand All @@ -379,7 +381,7 @@ protected SplitFactoryImpl(SplitClientConfig config) {
SplitChangeFetcher splitChangeFetcher = createSplitChangeFetcher(config);
SplitParser splitParser = new SplitParser();

_splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCache, _telemetryStorageProducer, config.getSetsFilter());
_splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCache, _telemetryStorageProducer, flagSetsFilter);

// SplitSynchronizationTask
_splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher, splitCache, config.featuresRefreshRate(), config.getThreadFactory());
Expand Down Expand Up @@ -561,11 +563,10 @@ private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config, Se
config.getThreadFactory());
}

private SplitFetcher buildSplitFetcher(SplitCacheProducer splitCacheProducer, SplitParser splitParser, HashSet<String> flagSets) throws
private SplitFetcher buildSplitFetcher(SplitCacheProducer splitCacheProducer, SplitParser splitParser, FlagSetsFilter flagSetsFilter) throws
URISyntaxException {
SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(_httpclient, _rootTarget, _telemetryStorageProducer);

return new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, _telemetryStorageProducer,flagSets);
return new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, _telemetryStorageProducer,flagSetsFilter);
}

private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config, ImpressionsStorageConsumer impressionsStorageConsumer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
public class FeatureFlagProcessor {
private static final Logger _log = LoggerFactory.getLogger(FeatureFlagProcessor.class);

public static FeatureFlagsToUpdate processFeatureFlagChanges(SplitParser splitParser, List<Split> splits, HashSet<String> configSets) {
public static FeatureFlagsToUpdate processFeatureFlagChanges(SplitParser splitParser, List<Split> splits, FlagSetsFilter flagSetsFilter) {
List<ParsedSplit> toAdd = new ArrayList<>();
List<String> toRemove = new ArrayList<>();
Set<String> segments = new HashSet<>();
FlagSetsFilter flagSetsFilter = new FlagSetsFilterImpl(configSets);
for (Split split : splits) {
if (split.status != Status.ACTIVE) {
// archive.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.split.engine.common;

import com.google.common.annotations.VisibleForTesting;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.engine.experiments.SplitParser;
import io.split.engine.sse.AuthApiClient;
import io.split.engine.sse.AuthApiClientImp;
Expand All @@ -23,7 +24,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -76,9 +76,9 @@ public static PushManagerImp build(Synchronizer synchronizer,
ThreadFactory threadFactory,
SplitParser splitParser,
SplitCacheProducer splitCacheProducer,
HashSet<String> flagSets) {
FlagSetsFilter flagSetsFilter) {
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer,
telemetryRuntimeProducer, flagSets);
telemetryRuntimeProducer, flagSetsFilter);
Worker<SegmentQueueDto> segmentWorker = new SegmentsWorkerImp(synchronizer);
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer);
return new PushManagerImp(new AuthApiClientImp(authUrl, splitAPI.getHttpClient(), telemetryRuntimeProducer),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.annotations.VisibleForTesting;
import io.split.client.ApiKeyCounter;
import io.split.client.SplitClientConfig;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.engine.SDKReadinessGates;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitParser;
Expand Down Expand Up @@ -87,7 +88,8 @@ public static SyncManagerImp build(SplitTasks splitTasks,
TelemetryRuntimeProducer telemetryRuntimeProducer,
TelemetrySynchronizer telemetrySynchronizer,
SplitClientConfig config,
SplitParser splitParser) {
SplitParser splitParser,
FlagSetsFilter flagSetsFilter) {
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
Synchronizer synchronizer = new SynchronizerImp(splitTasks,
splitFetcher,
Expand All @@ -108,7 +110,7 @@ public static SyncManagerImp build(SplitTasks splitTasks,
config.getThreadFactory(),
splitParser,
splitCacheProducer,
config.getSetsFilter());
flagSetsFilter);

return new SyncManagerImp(splitTasks,
config.streamingEnabled(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ EvaluatorImp.TreatmentLabelAndChangeNumber evaluateFeature(String matchingKey, S
Map<String, Object> attributes);
Map<String, EvaluatorImp.TreatmentLabelAndChangeNumber> evaluateFeatures(String matchingKey, String bucketingKey,
List<String> featureFlags, Map<String, Object> attributes);
Map<String, EvaluatorImp.TreatmentLabelAndChangeNumber> evaluateFeaturesByFlagSets(String key, String bucketingKey, List<String> flagSets);
Map<String, EvaluatorImp.TreatmentLabelAndChangeNumber> evaluateFeaturesByFlagSets(String key, String bucketingKey,
List<String> flagSets, Map<String, Object> attributes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public Map<String, TreatmentLabelAndChangeNumber> evaluateFeatures(String matchi

@Override
public Map<String, EvaluatorImp.TreatmentLabelAndChangeNumber> evaluateFeaturesByFlagSets(String key, String bucketingKey,
List<String> flagSets) {
List<String> flagSets, Map<String, Object> attributes) {
List<String> flagSetsWithNames = getFeatureFlagNamesByFlagSets(flagSets);
Map<String, TreatmentLabelAndChangeNumber> evaluations = evaluateFeatures(key, bucketingKey, flagSetsWithNames, null);
Map<String, TreatmentLabelAndChangeNumber> evaluations = evaluateFeatures(key, bucketingKey, flagSetsWithNames, attributes);
return evaluations;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.split.client.dtos.SplitChange;
import io.split.client.exceptions.UriTooLongException;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.client.utils.FeatureFlagsToUpdate;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
Expand Down Expand Up @@ -30,7 +31,7 @@ public class SplitFetcherImp implements SplitFetcher {
private final SplitCacheProducer _splitCacheProducer;
private final Object _lock = new Object();
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
private final HashSet<String> _flagSets;
private final FlagSetsFilter _flagSetsFilter;

/**
* Contains all the traffic types that are currently being used by the splits and also the count
Expand All @@ -43,12 +44,12 @@ public class SplitFetcherImp implements SplitFetcher {
*/

public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser, SplitCacheProducer splitCacheProducer,
TelemetryRuntimeProducer telemetryRuntimeProducer, HashSet<String> sets) {
TelemetryRuntimeProducer telemetryRuntimeProducer, FlagSetsFilter flagSetsFilter) {
_splitChangeFetcher = checkNotNull(splitChangeFetcher);
_parser = checkNotNull(parser);
_splitCacheProducer = checkNotNull(splitCacheProducer);
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
_flagSets = sets;
_flagSetsFilter = flagSetsFilter;
}

@Override
Expand Down Expand Up @@ -119,7 +120,7 @@ private Set<String> runWithoutExceptionHandling(FetchOptions options) throws Int
// some other thread may have updated the shared state. exit
return segments;
}
FeatureFlagsToUpdate featureFlagsToUpdate = processFeatureFlagChanges(_parser, change.splits, _flagSets);
FeatureFlagsToUpdate featureFlagsToUpdate = processFeatureFlagChanges(_parser, change.splits, _flagSetsFilter);
segments = featureFlagsToUpdate.getSegments();
_splitCacheProducer.update(featureFlagsToUpdate.getToAdd(), featureFlagsToUpdate.getToRemove(), change.till);
_telemetryRuntimeProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.SPLITS, System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.split.engine.sse.workers;

import io.split.client.dtos.Split;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.client.utils.FeatureFlagsToUpdate;
import io.split.engine.common.Synchronizer;
import io.split.engine.experiments.SplitParser;
Expand All @@ -13,7 +14,6 @@
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -25,16 +25,16 @@ public class FeatureFlagWorkerImp extends Worker<FeatureFlagChangeNotification>
private final SplitParser _splitParser;
private final SplitCacheProducer _splitCacheProducer;
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
private final HashSet<String> _flagSets;
private final FlagSetsFilter _flagSetsFilter;

public FeatureFlagWorkerImp(Synchronizer synchronizer, SplitParser splitParser, SplitCacheProducer splitCacheProducer,
TelemetryRuntimeProducer telemetryRuntimeProducer, HashSet<String> flagSets) {
TelemetryRuntimeProducer telemetryRuntimeProducer, FlagSetsFilter flagSetsFilter) {
super("Feature flags");
_synchronizer = checkNotNull(synchronizer);
_splitParser = splitParser;
_splitCacheProducer = splitCacheProducer;
_telemetryRuntimeProducer = telemetryRuntimeProducer;
_flagSets = flagSets;
_flagSetsFilter = flagSetsFilter;
}

@Override
Expand Down Expand Up @@ -65,7 +65,7 @@ private boolean addOrUpdateFeatureFlag(FeatureFlagChangeNotification featureFlag
featureFlagChangeNotification.getPreviousChangeNumber() == _splitCacheProducer.getChangeNumber()) {
Split featureFlag = featureFlagChangeNotification.getFeatureFlagDefinition();
FeatureFlagsToUpdate featureFlagsToUpdate = processFeatureFlagChanges(_splitParser, Collections.singletonList(featureFlag),
_flagSets);
_flagSetsFilter);
_splitCacheProducer.update(featureFlagsToUpdate.getToAdd(), featureFlagsToUpdate.getToRemove(),
featureFlagChangeNotification.getChangeNumber());
Set<String> segments = featureFlagsToUpdate.getSegments();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.client.interceptors.FlagSetsFilterImpl;
import io.split.engine.experiments.ParsedSplit;
import io.split.storages.SplitCache;
import org.slf4j.Logger;
Expand Down Expand Up @@ -33,23 +32,23 @@ public class InMemoryCacheImp implements SplitCache {

private AtomicLong _changeNumber;

public InMemoryCacheImp(HashSet<String> flagSets) {
public InMemoryCacheImp(FlagSetsFilter flagSets) {
this(-1, flagSets);
}

public InMemoryCacheImp(long startingChangeNumber, HashSet<String> flagSets) {
public InMemoryCacheImp(long startingChangeNumber, FlagSetsFilter flagSets) {
_concurrentMap = Maps.newConcurrentMap();
_changeNumber = new AtomicLong(startingChangeNumber);
_concurrentTrafficTypeNameSet = ConcurrentHashMultiset.create();
_flagSets = Maps.newConcurrentMap();
_flagSetsFilter = new FlagSetsFilterImpl(flagSets);
_flagSetsFilter = flagSets;
}

@Override
public boolean remove(String name) {
ParsedSplit removed = _concurrentMap.remove(name);
if (removed != null) {
removeFromFlagSets(removed.feature(), removed.flagSets());
removeFromFlagSets(removed.feature());
if (removed.trafficTypeName() != null) {
this.decreaseTrafficType(removed.trafficTypeName());
}
Expand Down Expand Up @@ -149,10 +148,10 @@ public void clear() {
public void putMany(List<ParsedSplit> splits) {
for (ParsedSplit split : splits) {
_concurrentMap.put(split.feature(), split);

if (split.trafficTypeName() != null) {
this.increaseTrafficType(split.trafficTypeName());
}
removeFromFlagSets(split.feature());
addToFlagSets(split);
}
}
Expand Down Expand Up @@ -203,16 +202,9 @@ private void addToFlagSets(ParsedSplit featureFlag) {
}
}

private void removeFromFlagSets(String featureFlagName, HashSet<String> sets) {
if(sets == null) {
return;
}
for (String set : sets) {
HashSet<String> features = _flagSets.get(set);
if (features == null){
continue;
}
features.remove(featureFlagName);
private void removeFromFlagSets(String featureFlagName) {
for (String set: _flagSets.keySet()) {
_flagSets.get(set).remove(featureFlagName);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.split.client;

import io.split.client.interceptors.FlagSetsFilter;
import io.split.client.interceptors.FlagSetsFilterImpl;
import io.split.storages.memory.InMemoryCacheImp;
import io.split.storages.SplitCache;
import org.junit.Assert;
Expand All @@ -18,7 +20,8 @@ public class CacheUpdaterServiceTest {

@Test
public void testCacheUpdate() {
SplitCache splitCache = new InMemoryCacheImp(new HashSet<>());
FlagSetsFilter flagSetsFilter = new FlagSetsFilterImpl(new HashSet<>());
SplitCache splitCache = new InMemoryCacheImp(flagSetsFilter);
CacheUpdaterService cacheUpdaterService = new CacheUpdaterService(splitCache);
cacheUpdaterService.updateCache(getMap());
Assert.assertNotNull(splitCache.get(MY_FEATURE));
Expand Down
Loading