diff --git a/client/src/main/java/io/split/client/SplitFactoryImpl.java b/client/src/main/java/io/split/client/SplitFactoryImpl.java index c0c2c0dbb..cce80eefc 100644 --- a/client/src/main/java/io/split/client/SplitFactoryImpl.java +++ b/client/src/main/java/io/split/client/SplitFactoryImpl.java @@ -152,7 +152,6 @@ public class SplitFactoryImpl implements SplitFactory { private final URI _eventsRootTarget; private final UniqueKeysTracker _uniqueKeysTracker; - //Constructor for standalone mode public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyntaxException { _userStorageWrapper = null; @@ -188,13 +187,14 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn ImpressionsStorage impressionsStorage = new InMemoryImpressionsStorage(config.impressionsQueueSize()); _splitCache = splitCache; _segmentCache = segmentCache; - _telemetrySynchronizer = new TelemetryInMemorySubmitter(_httpclient, URI.create(config.telemetryURL()), telemetryStorage, splitCache, segmentCache, telemetryStorage, _startTime); + _telemetrySynchronizer = new TelemetryInMemorySubmitter(_httpclient, URI.create(config.telemetryURL()), telemetryStorage, splitCache, _segmentCache, telemetryStorage, _startTime); // Segments _segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache); + SplitParser splitParser = new SplitParser(); // SplitFetcher - _splitFetcher = buildSplitFetcher(splitCache, splitCache); + _splitFetcher = buildSplitFetcher(splitCache, splitCache, splitParser); // SplitSynchronizationTask _splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher, @@ -241,7 +241,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn SplitAPI splitAPI = SplitAPI.build(_httpclient, buildSSEdHttpClient(apiToken, config, _sdkMetadata)); _syncManager = SyncManagerImp.build(splitTasks, _splitFetcher, splitCache, splitAPI, - segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config); + segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config, splitParser); _syncManager.start(); // DestroyOnShutDown @@ -255,7 +255,6 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn } } - //Constructor for consumer mode protected SplitFactoryImpl(String apiToken, SplitClientConfig config, CustomStorageWrapper customStorageWrapper) throws URISyntaxException { //Variables that are not used in Consumer mode. @@ -559,9 +558,8 @@ private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config, Se config.getThreadFactory()); } - private SplitFetcher buildSplitFetcher(SplitCacheConsumer splitCacheConsumer, SplitCacheProducer splitCacheProducer) throws URISyntaxException { + private SplitFetcher buildSplitFetcher(SplitCacheConsumer splitCacheConsumer, SplitCacheProducer splitCacheProducer, SplitParser splitParser) throws URISyntaxException { SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(_httpclient, _rootTarget, _telemetryStorageProducer); - SplitParser splitParser = new SplitParser(); return new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheConsumer, splitCacheProducer, _telemetryStorageProducer); } diff --git a/client/src/main/java/io/split/engine/common/ConsumerSynchronizer.java b/client/src/main/java/io/split/engine/common/ConsumerSynchronizer.java index 6b4e6b9f3..89aa8267f 100644 --- a/client/src/main/java/io/split/engine/common/ConsumerSynchronizer.java +++ b/client/src/main/java/io/split/engine/common/ConsumerSynchronizer.java @@ -2,6 +2,7 @@ import io.split.client.impressions.ImpressionsManager; import io.split.client.impressions.UniqueKeysTracker; +import io.split.engine.sse.dtos.SplitKillNotification; import io.split.telemetry.synchronizer.TelemetrySyncTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +40,7 @@ public void refreshSplits(Long targetChangeNumber) { } @Override - public void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber) { + public void localKillSplit(SplitKillNotification splitKillNotification) { //No-Op } diff --git a/client/src/main/java/io/split/engine/common/LocalhostSynchronizer.java b/client/src/main/java/io/split/engine/common/LocalhostSynchronizer.java index a8b4cf312..64c97b35d 100644 --- a/client/src/main/java/io/split/engine/common/LocalhostSynchronizer.java +++ b/client/src/main/java/io/split/engine/common/LocalhostSynchronizer.java @@ -5,6 +5,7 @@ import io.split.engine.experiments.SplitSynchronizationTask; import io.split.engine.segments.SegmentFetcher; import io.split.engine.segments.SegmentSynchronizationTask; +import io.split.engine.sse.dtos.SplitKillNotification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +67,7 @@ public void refreshSplits(Long targetChangeNumber) { } @Override - public void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber) { + public void localKillSplit(SplitKillNotification splitKillNotification) { //No-Op } diff --git a/client/src/main/java/io/split/engine/common/PushManagerImp.java b/client/src/main/java/io/split/engine/common/PushManagerImp.java index 61fff9a1e..d4fc8dadf 100644 --- a/client/src/main/java/io/split/engine/common/PushManagerImp.java +++ b/client/src/main/java/io/split/engine/common/PushManagerImp.java @@ -1,6 +1,7 @@ package io.split.engine.common; import com.google.common.annotations.VisibleForTesting; +import io.split.engine.experiments.SplitParser; import io.split.engine.sse.AuthApiClient; import io.split.engine.sse.AuthApiClientImp; import io.split.engine.sse.EventSourceClient; @@ -11,10 +12,11 @@ import io.split.engine.sse.dtos.AuthenticationResponse; import io.split.engine.sse.dtos.SegmentQueueDto; import io.split.engine.sse.workers.SegmentsWorkerImp; -import io.split.engine.sse.workers.SplitsWorker; -import io.split.engine.sse.workers.SplitsWorkerImp; +import io.split.engine.sse.workers.FeatureFlagsWorker; +import io.split.engine.sse.workers.FeatureFlagWorkerImp; import io.split.engine.sse.workers.Worker; +import io.split.storages.SplitCacheProducer; import io.split.telemetry.domain.StreamingEvent; import io.split.telemetry.domain.enums.StreamEventsEnum; import io.split.telemetry.storage.TelemetryRuntimeProducer; @@ -36,7 +38,7 @@ public class PushManagerImp implements PushManager { private final AuthApiClient _authApiClient; private final EventSourceClient _eventSourceClient; - private final SplitsWorker _splitsWorker; + private final FeatureFlagsWorker _featureFlagsWorker; private final Worker _segmentWorker; private final PushStatusTracker _pushStatusTracker; @@ -48,7 +50,7 @@ public class PushManagerImp implements PushManager { @VisibleForTesting /* package private */ PushManagerImp(AuthApiClient authApiClient, EventSourceClient eventSourceClient, - SplitsWorker splitsWorker, + FeatureFlagsWorker featureFlagsWorker, Worker segmentWorker, PushStatusTracker pushStatusTracker, TelemetryRuntimeProducer telemetryRuntimeProducer, @@ -56,7 +58,7 @@ public class PushManagerImp implements PushManager { _authApiClient = checkNotNull(authApiClient); _eventSourceClient = checkNotNull(eventSourceClient); - _splitsWorker = splitsWorker; + _featureFlagsWorker = featureFlagsWorker; _segmentWorker = segmentWorker; _pushStatusTracker = pushStatusTracker; _expirationTime = new AtomicLong(); @@ -70,13 +72,15 @@ public static PushManagerImp build(Synchronizer synchronizer, SplitAPI splitAPI, LinkedBlockingQueue statusMessages, TelemetryRuntimeProducer telemetryRuntimeProducer, - ThreadFactory threadFactory) { - SplitsWorker splitsWorker = new SplitsWorkerImp(synchronizer); + ThreadFactory threadFactory, + SplitParser splitParser, + SplitCacheProducer splitCacheProducer) { + FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer); Worker segmentWorker = new SegmentsWorkerImp(synchronizer); PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer); return new PushManagerImp(new AuthApiClientImp(authUrl, splitAPI.getHttpClient(), telemetryRuntimeProducer), - EventSourceClientImp.build(streamingUrl, splitsWorker, segmentWorker, pushStatusTracker, splitAPI.getSseHttpClient(), telemetryRuntimeProducer, threadFactory), - splitsWorker, + EventSourceClientImp.build(streamingUrl, featureFlagsWorker, segmentWorker, pushStatusTracker, splitAPI.getSseHttpClient(), telemetryRuntimeProducer, threadFactory), + featureFlagsWorker, segmentWorker, pushStatusTracker, telemetryRuntimeProducer, @@ -134,13 +138,13 @@ private boolean startSse(String token, String channels) { @Override public synchronized void startWorkers() { - _splitsWorker.start(); + _featureFlagsWorker.start(); _segmentWorker.start(); } @Override public synchronized void stopWorkers() { - _splitsWorker.stop(); + _featureFlagsWorker.stop(); _segmentWorker.stop(); } } \ No newline at end of file diff --git a/client/src/main/java/io/split/engine/common/SyncManagerImp.java b/client/src/main/java/io/split/engine/common/SyncManagerImp.java index d739d40ab..bc23c96b1 100644 --- a/client/src/main/java/io/split/engine/common/SyncManagerImp.java +++ b/client/src/main/java/io/split/engine/common/SyncManagerImp.java @@ -5,6 +5,7 @@ import io.split.client.SplitClientConfig; import io.split.engine.SDKReadinessGates; import io.split.engine.experiments.SplitFetcher; +import io.split.engine.experiments.SplitParser; import io.split.engine.experiments.SplitSynchronizationTask; import io.split.engine.segments.SegmentSynchronizationTask; import io.split.storages.SegmentCacheProducer; @@ -85,7 +86,8 @@ public static SyncManagerImp build(SplitTasks splitTasks, SDKReadinessGates gates, TelemetryRuntimeProducer telemetryRuntimeProducer, TelemetrySynchronizer telemetrySynchronizer, - SplitClientConfig config) { + SplitClientConfig config, + SplitParser splitParser) { LinkedBlockingQueue pushMessages = new LinkedBlockingQueue<>(); Synchronizer synchronizer = new SynchronizerImp(splitTasks, splitFetcher, @@ -102,7 +104,9 @@ public static SyncManagerImp build(SplitTasks splitTasks, splitAPI, pushMessages, telemetryRuntimeProducer, - config.getThreadFactory()); + config.getThreadFactory(), + splitParser, + splitCacheProducer); return new SyncManagerImp(splitTasks, config.streamingEnabled(), diff --git a/client/src/main/java/io/split/engine/common/Synchronizer.java b/client/src/main/java/io/split/engine/common/Synchronizer.java index 20dcae138..529b96582 100644 --- a/client/src/main/java/io/split/engine/common/Synchronizer.java +++ b/client/src/main/java/io/split/engine/common/Synchronizer.java @@ -1,11 +1,13 @@ package io.split.engine.common; +import io.split.engine.sse.dtos.SplitKillNotification; + public interface Synchronizer { boolean syncAll(); void startPeriodicFetching(); void stopPeriodicFetching(); void refreshSplits(Long targetChangeNumber); - void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber); + void localKillSplit(SplitKillNotification splitKillNotification); void refreshSegment(String segmentName, Long targetChangeNumber); void startPeriodicDataRecording(); void stopPeriodicDataRecording(); diff --git a/client/src/main/java/io/split/engine/common/SynchronizerImp.java b/client/src/main/java/io/split/engine/common/SynchronizerImp.java index 71cad55f1..5b268a6ac 100644 --- a/client/src/main/java/io/split/engine/common/SynchronizerImp.java +++ b/client/src/main/java/io/split/engine/common/SynchronizerImp.java @@ -9,6 +9,7 @@ import io.split.engine.experiments.SplitSynchronizationTask; import io.split.engine.segments.SegmentFetcher; import io.split.engine.segments.SegmentSynchronizationTask; +import io.split.engine.sse.dtos.SplitKillNotification; import io.split.storages.SegmentCacheProducer; import io.split.storages.SplitCacheProducer; import io.split.telemetry.synchronizer.TelemetrySyncTask; @@ -183,10 +184,10 @@ public void refreshSplits(Long targetChangeNumber) { } @Override - public void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber) { - if (newChangeNumber > _splitCacheProducer.getChangeNumber()) { - _splitCacheProducer.kill(featureFlagName, defaultTreatment, newChangeNumber); - refreshSplits(newChangeNumber); + public void localKillSplit(SplitKillNotification splitKillNotification) { + if (splitKillNotification.getChangeNumber() > _splitCacheProducer.getChangeNumber()) { + _splitCacheProducer.kill(splitKillNotification.getSplitName(), splitKillNotification.getDefaultTreatment(), splitKillNotification.getChangeNumber()); + refreshSplits(splitKillNotification.getChangeNumber()); } } @@ -307,4 +308,4 @@ private void forceRefreshSegment(String segmentName){ SegmentFetcher segmentFetcher = _segmentSynchronizationTaskImp.getFetcher(segmentName); segmentFetcher.fetch(new FetchOptions.Builder().build()); } -} +} \ No newline at end of file diff --git a/client/src/main/java/io/split/engine/experiments/SplitParser.java b/client/src/main/java/io/split/engine/experiments/SplitParser.java index 7b521175e..31aa90980 100644 --- a/client/src/main/java/io/split/engine/experiments/SplitParser.java +++ b/client/src/main/java/io/split/engine/experiments/SplitParser.java @@ -176,6 +176,4 @@ private AttributeMatcher toMatcher(Matcher matcher) { return new AttributeMatcher(attribute, delegate, negate); } - - -} +} \ No newline at end of file diff --git a/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java b/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java index 3791334bb..35d1c05d7 100644 --- a/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java +++ b/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java @@ -6,7 +6,7 @@ import io.split.engine.sse.client.SSEClient; import io.split.engine.sse.dtos.SegmentQueueDto; import io.split.engine.sse.exceptions.EventParsingException; -import io.split.engine.sse.workers.SplitsWorker; +import io.split.engine.sse.workers.FeatureFlagsWorker; import io.split.engine.sse.workers.Worker; import io.split.telemetry.storage.TelemetryRuntimeProducer; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; @@ -56,7 +56,7 @@ public class EventSourceClientImp implements EventSourceClient { } public static EventSourceClientImp build(String baseStreamingUrl, - SplitsWorker splitsWorker, + FeatureFlagsWorker featureFlagsWorker, Worker segmentWorker, PushStatusTracker pushStatusTracker, CloseableHttpClient sseHttpClient, @@ -64,7 +64,7 @@ public static EventSourceClientImp build(String baseStreamingUrl, ThreadFactory threadFactory) { return new EventSourceClientImp(baseStreamingUrl, new NotificationParserImp(), - NotificationProcessorImp.build(splitsWorker, segmentWorker, pushStatusTracker), + NotificationProcessorImp.build(featureFlagsWorker, segmentWorker, pushStatusTracker), pushStatusTracker, sseHttpClient, telemetryRuntimeProducer, diff --git a/client/src/main/java/io/split/engine/sse/NotificationProcessor.java b/client/src/main/java/io/split/engine/sse/NotificationProcessor.java index 20f8af7fa..bdd842455 100644 --- a/client/src/main/java/io/split/engine/sse/NotificationProcessor.java +++ b/client/src/main/java/io/split/engine/sse/NotificationProcessor.java @@ -1,12 +1,14 @@ package io.split.engine.sse; +import io.split.engine.sse.dtos.FeatureFlagChangeNotification; import io.split.engine.sse.dtos.IncomingNotification; +import io.split.engine.sse.dtos.SplitKillNotification; import io.split.engine.sse.dtos.StatusNotification; public interface NotificationProcessor { void process(IncomingNotification notification); - void processSplitUpdate(long changeNumber); - void processSplitKill(long changeNumber, String splitName, String defaultTreatment); + void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification); + void processSplitKill(SplitKillNotification splitKillNotification); void processSegmentUpdate(long changeNumber, String segmentName); void processStatus(StatusNotification statusNotification); -} +} \ No newline at end of file diff --git a/client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java b/client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java index c8271c9ec..5b8e705b3 100644 --- a/client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java +++ b/client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java @@ -1,30 +1,33 @@ package io.split.engine.sse; import com.google.common.annotations.VisibleForTesting; +import io.split.engine.sse.dtos.FeatureFlagChangeNotification; +import io.split.engine.sse.dtos.GenericNotificationData; import io.split.engine.sse.dtos.IncomingNotification; +import io.split.engine.sse.dtos.SplitKillNotification; import io.split.engine.sse.dtos.StatusNotification; import io.split.engine.sse.dtos.SegmentQueueDto; -import io.split.engine.sse.workers.SplitsWorker; +import io.split.engine.sse.workers.FeatureFlagsWorker; import io.split.engine.sse.workers.Worker; import static com.google.common.base.Preconditions.checkNotNull; public class NotificationProcessorImp implements NotificationProcessor { - private final SplitsWorker _splitsWorker; + private final FeatureFlagsWorker _featureFlagsWorker; private final Worker _segmentWorker; private final PushStatusTracker _pushStatusTracker; @VisibleForTesting - /* package private */ NotificationProcessorImp(SplitsWorker splitsWorker, + /* package private */ NotificationProcessorImp(FeatureFlagsWorker featureFlagsWorker, Worker segmentWorker, PushStatusTracker pushStatusTracker) { - _splitsWorker = checkNotNull(splitsWorker); + _featureFlagsWorker = checkNotNull(featureFlagsWorker); _segmentWorker = checkNotNull(segmentWorker); _pushStatusTracker = checkNotNull(pushStatusTracker); } - public static NotificationProcessorImp build(SplitsWorker splitsWorker, Worker segmentWorker, PushStatusTracker pushStatusTracker) { - return new NotificationProcessorImp(splitsWorker, segmentWorker, pushStatusTracker); + public static NotificationProcessorImp build(FeatureFlagsWorker featureFlagsWorker, Worker segmentWorker, PushStatusTracker pushStatusTracker) { + return new NotificationProcessorImp(featureFlagsWorker, segmentWorker, pushStatusTracker); } @Override @@ -33,14 +36,17 @@ public void process(IncomingNotification notification) { } @Override - public void processSplitUpdate(long changeNumber) { - _splitsWorker.addToQueue(changeNumber); + public void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification) { + _featureFlagsWorker.addToQueue(featureFlagChangeNotification); } @Override - public void processSplitKill(long changeNumber, String splitName, String defaultTreatment) { - _splitsWorker.killSplit(changeNumber, splitName, defaultTreatment); - _splitsWorker.addToQueue(changeNumber); + public void processSplitKill(SplitKillNotification splitKillNotification) { + _featureFlagsWorker.kill(splitKillNotification); + _featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + .changeNumber(splitKillNotification.getChangeNumber()) + .channel(splitKillNotification.getChannel()) + .build())); } @Override diff --git a/client/src/main/java/io/split/engine/sse/dtos/FeatureFlagChangeNotification.java b/client/src/main/java/io/split/engine/sse/dtos/FeatureFlagChangeNotification.java index 108fbc016..05f79abec 100644 --- a/client/src/main/java/io/split/engine/sse/dtos/FeatureFlagChangeNotification.java +++ b/client/src/main/java/io/split/engine/sse/dtos/FeatureFlagChangeNotification.java @@ -70,7 +70,7 @@ public CompressType getCompressType() { @Override public void handler(NotificationProcessor notificationProcessor) { - notificationProcessor.processSplitUpdate(getChangeNumber()); + notificationProcessor.processSplitUpdate(this); } @Override diff --git a/client/src/main/java/io/split/engine/sse/dtos/GenericNotificationData.java b/client/src/main/java/io/split/engine/sse/dtos/GenericNotificationData.java index 774df5003..7fd3dc1bd 100644 --- a/client/src/main/java/io/split/engine/sse/dtos/GenericNotificationData.java +++ b/client/src/main/java/io/split/engine/sse/dtos/GenericNotificationData.java @@ -18,7 +18,7 @@ public class GenericNotificationData { @SerializedName("c") private Integer compressType; - public GenericNotificationData (Long changeNumber, + private GenericNotificationData (Long changeNumber, String defaultTreatment, String splitName, ControlType controlType, @@ -86,4 +86,85 @@ public Integer getCompressType() { public void setChannel(String channel) { this.channel = channel; } + + public static GenericNotificationData.Builder builder() { + return new GenericNotificationData.Builder(); + } + + public static final class Builder { + private Long changeNumber; + private String defaultTreatment; + private String featureFlagName; + private ControlType controlType; + private OccupancyMetrics metrics; + private String segmentName; + private IncomingNotification.Type type; + private String channel; + private Long previousChangeNumber; + private String featureFlagDefinition; + private Integer compressType; + + public Builder() { + } + + public Builder changeNumber(Long changeNumber) { + this.changeNumber = changeNumber; + return this; + } + + public Builder defaultTreatment(String defaultTreatment) { + this.defaultTreatment = defaultTreatment; + return this; + } + + public Builder featureFlagName(String featureFlagName) { + this.featureFlagName = featureFlagName; + return this; + } + + public Builder controlType(ControlType controlType) { + this.controlType = controlType; + return this; + } + + public Builder metrics(OccupancyMetrics occupancyMetrics) { + this.metrics = occupancyMetrics; + return this; + } + + public Builder segmentName(String segmentName) { + this.segmentName = segmentName; + return this; + } + + public Builder type(IncomingNotification.Type type) { + this.type = type; + return this; + } + + public Builder channel(String channel) { + this.channel = channel; + return this; + } + + public Builder previousChangeNumber(Long previousChangeNumber) { + this.previousChangeNumber = previousChangeNumber; + return this; + } + + public Builder featureFlagDefinition(String featureFlagDefinition) { + this.featureFlagDefinition = featureFlagDefinition; + return this; + } + + public Builder compressType(Integer compressType) { + this.compressType = compressType; + return this; + } + + public GenericNotificationData build() { + return new GenericNotificationData(changeNumber, defaultTreatment, featureFlagName, controlType, metrics, + segmentName, type, channel, previousChangeNumber, featureFlagDefinition, compressType); + } + } } \ No newline at end of file diff --git a/client/src/main/java/io/split/engine/sse/dtos/SplitKillNotification.java b/client/src/main/java/io/split/engine/sse/dtos/SplitKillNotification.java index ed4700352..4d47e758b 100644 --- a/client/src/main/java/io/split/engine/sse/dtos/SplitKillNotification.java +++ b/client/src/main/java/io/split/engine/sse/dtos/SplitKillNotification.java @@ -28,11 +28,11 @@ public String getSplitName() { @Override public void handler(NotificationProcessor notificationProcessor) { - notificationProcessor.processSplitKill(getChangeNumber(), getSplitName(), getDefaultTreatment()); + notificationProcessor.processSplitKill(this); } @Override public String toString() { return String.format("Type: %s; Channel: %s; ChangeNumber: %s; DefaultTreatment: %s; SplitName: %s", getType(), getChannel(), getChangeNumber(), getDefaultTreatment(), getSplitName()); } -} +} \ No newline at end of file diff --git a/client/src/main/java/io/split/engine/sse/workers/FeatureFlagWorkerImp.java b/client/src/main/java/io/split/engine/sse/workers/FeatureFlagWorkerImp.java new file mode 100644 index 000000000..8328b08e0 --- /dev/null +++ b/client/src/main/java/io/split/engine/sse/workers/FeatureFlagWorkerImp.java @@ -0,0 +1,61 @@ +package io.split.engine.sse.workers; + +import io.split.engine.common.Synchronizer; +import io.split.engine.experiments.SplitParser; +import io.split.engine.sse.dtos.FeatureFlagChangeNotification; +import io.split.engine.sse.dtos.SplitKillNotification; +import io.split.storages.SplitCacheProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class FeatureFlagWorkerImp extends Worker implements FeatureFlagsWorker { + private static final Logger _log = LoggerFactory.getLogger(FeatureFlagWorkerImp.class); + private final Synchronizer _synchronizer; + private final SplitParser _splitParser; + private final SplitCacheProducer _splitCacheProducer; + + public FeatureFlagWorkerImp(Synchronizer synchronizer, SplitParser splitParser, SplitCacheProducer splitCacheProducer) { + super("Feature flags"); + _synchronizer = checkNotNull(synchronizer); + _splitParser = splitParser; + _splitCacheProducer = splitCacheProducer; + } + + @Override + public void kill(SplitKillNotification splitKillNotification) { + try { + _synchronizer.localKillSplit(splitKillNotification); + _log.debug(String.format("Kill feature flag: %s, changeNumber: %s, defaultTreatment: %s", splitKillNotification.getSplitName(), splitKillNotification.getChangeNumber(), + splitKillNotification.getDefaultTreatment())); + } catch (Exception ex) { + _log.warn(String.format("Exception on FeatureFlagWorker kill: %s", ex.getMessage())); + } + } + + @Override + protected void executeRefresh(FeatureFlagChangeNotification featureFlagChangeNotification) { + boolean success = addOrUpdateFeatureFlag(featureFlagChangeNotification); + + if (!success) + _synchronizer.refreshSplits(featureFlagChangeNotification.getChangeNumber()); + } + + private boolean addOrUpdateFeatureFlag(FeatureFlagChangeNotification featureFlagChangeNotification) { + if (featureFlagChangeNotification.getChangeNumber() <= _splitCacheProducer.getChangeNumber()) { + return true; + } + try { + if (featureFlagChangeNotification.getFeatureFlagDefinition() != null && + featureFlagChangeNotification.getPreviousChangeNumber() == _splitCacheProducer.getChangeNumber()){ + _splitCacheProducer.updateFeatureFlag(_splitParser.parse(featureFlagChangeNotification.getFeatureFlagDefinition())); + _splitCacheProducer.setChangeNumber(featureFlagChangeNotification.getChangeNumber()); + return true; + } + } catch (Exception e) { + _log.warn("Something went wrong processing a Feature Flag notification", e); + } + return false; + } +} \ No newline at end of file diff --git a/client/src/main/java/io/split/engine/sse/workers/FeatureFlagsWorker.java b/client/src/main/java/io/split/engine/sse/workers/FeatureFlagsWorker.java new file mode 100644 index 000000000..354dbd7e1 --- /dev/null +++ b/client/src/main/java/io/split/engine/sse/workers/FeatureFlagsWorker.java @@ -0,0 +1,11 @@ +package io.split.engine.sse.workers; + +import io.split.engine.sse.dtos.FeatureFlagChangeNotification; +import io.split.engine.sse.dtos.SplitKillNotification; + +public interface FeatureFlagsWorker { + void addToQueue(FeatureFlagChangeNotification featureFlagChangeNotification); + void start(); + void stop(); + void kill(SplitKillNotification splitKillNotification); +} \ No newline at end of file diff --git a/client/src/main/java/io/split/engine/sse/workers/SplitsWorker.java b/client/src/main/java/io/split/engine/sse/workers/SplitsWorker.java deleted file mode 100644 index 3664b7cd4..000000000 --- a/client/src/main/java/io/split/engine/sse/workers/SplitsWorker.java +++ /dev/null @@ -1,8 +0,0 @@ -package io.split.engine.sse.workers; - -public interface SplitsWorker { - void addToQueue(Long element); - void start(); - void stop(); - void killSplit(long changeNumber, String splitName, String defaultTreatment); -} diff --git a/client/src/main/java/io/split/engine/sse/workers/SplitsWorkerImp.java b/client/src/main/java/io/split/engine/sse/workers/SplitsWorkerImp.java deleted file mode 100644 index 16e155ed7..000000000 --- a/client/src/main/java/io/split/engine/sse/workers/SplitsWorkerImp.java +++ /dev/null @@ -1,29 +0,0 @@ -package io.split.engine.sse.workers; - -import io.split.engine.common.Synchronizer; - -import static com.google.common.base.Preconditions.checkNotNull; - -public class SplitsWorkerImp extends Worker implements SplitsWorker { - private final Synchronizer _synchronizer; - - public SplitsWorkerImp(Synchronizer synchronizer) { - super("Splits"); - _synchronizer = checkNotNull(synchronizer); - } - - @Override - public void killSplit(long changeNumber, String splitName, String defaultTreatment) { - try { - _synchronizer.localKillSplit(splitName, defaultTreatment, changeNumber); - _log.debug(String.format("Kill split: %s, changeNumber: %s, defaultTreatment: %s", splitName, changeNumber, defaultTreatment)); - } catch (Exception ex) { - _log.warn(String.format("Exception on SplitWorker killSplit: %s", ex.getMessage())); - } - } - - @Override - protected void executeRefresh(Long changeNumber) { - _synchronizer.refreshSplits(changeNumber); - } -} diff --git a/client/src/main/java/io/split/storages/SplitCacheProducer.java b/client/src/main/java/io/split/storages/SplitCacheProducer.java index a237b06f0..70968b6dc 100644 --- a/client/src/main/java/io/split/storages/SplitCacheProducer.java +++ b/client/src/main/java/io/split/storages/SplitCacheProducer.java @@ -12,4 +12,5 @@ public interface SplitCacheProducer extends SplitCacheCommons{ void putMany(List splits); void increaseTrafficType(String trafficType); void decreaseTrafficType(String trafficType); + void updateFeatureFlag(ParsedSplit parsedSplit); } diff --git a/client/src/main/java/io/split/storages/memory/InMemoryCacheImp.java b/client/src/main/java/io/split/storages/memory/InMemoryCacheImp.java index 4a4c97a67..db5732346 100644 --- a/client/src/main/java/io/split/storages/memory/InMemoryCacheImp.java +++ b/client/src/main/java/io/split/storages/memory/InMemoryCacheImp.java @@ -142,7 +142,12 @@ public void increaseTrafficType(String trafficType) { public void decreaseTrafficType(String trafficType) { _concurrentTrafficTypeNameSet.remove(trafficType); } - + + @Override + public void updateFeatureFlag(ParsedSplit parsedSplit) { + _concurrentMap.put(parsedSplit.feature(), parsedSplit); + } + public Set getSegments() { return _concurrentMap.values().stream() .flatMap(parsedSplit -> parsedSplit.getSegmentsNames().stream()).collect(Collectors.toSet()); diff --git a/client/src/main/java/io/split/storages/pluggable/adapters/UserCustomSplitAdapterProducer.java b/client/src/main/java/io/split/storages/pluggable/adapters/UserCustomSplitAdapterProducer.java index a98391443..2aa4f12eb 100644 --- a/client/src/main/java/io/split/storages/pluggable/adapters/UserCustomSplitAdapterProducer.java +++ b/client/src/main/java/io/split/storages/pluggable/adapters/UserCustomSplitAdapterProducer.java @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory; import pluggable.CustomStorageWrapper; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -98,6 +99,11 @@ public void decreaseTrafficType(String trafficType) { } } + @Override + public void updateFeatureFlag(ParsedSplit parsedSplit) { + putMany(Collections.singletonList(parsedSplit)); + } + @Override public Set getSegments() { //NoOp diff --git a/client/src/test/java/io/split/engine/common/PushManagerTest.java b/client/src/test/java/io/split/engine/common/PushManagerTest.java index d95d13ce5..12664ad1d 100644 --- a/client/src/test/java/io/split/engine/common/PushManagerTest.java +++ b/client/src/test/java/io/split/engine/common/PushManagerTest.java @@ -7,7 +7,7 @@ import io.split.engine.sse.client.SSEClient; import io.split.engine.sse.dtos.AuthenticationResponse; import io.split.engine.sse.workers.SegmentsWorkerImp; -import io.split.engine.sse.workers.SplitsWorker; +import io.split.engine.sse.workers.FeatureFlagsWorker; import io.split.telemetry.storage.InMemoryTelemetryStorage; import io.split.telemetry.storage.TelemetryStorage; import org.junit.Assert; @@ -32,7 +32,7 @@ public void setUp() { _telemetryStorage = new InMemoryTelemetryStorage(); _pushManager = new PushManagerImp(_authApiClient, _eventSourceClient, - Mockito.mock(SplitsWorker.class), + Mockito.mock(FeatureFlagsWorker.class), Mockito.mock(SegmentsWorkerImp.class), _pushStatusTracker, _telemetryStorage, diff --git a/client/src/test/java/io/split/engine/common/SynchronizerTest.java b/client/src/test/java/io/split/engine/common/SynchronizerTest.java index a25ec4139..036115a93 100644 --- a/client/src/test/java/io/split/engine/common/SynchronizerTest.java +++ b/client/src/test/java/io/split/engine/common/SynchronizerTest.java @@ -5,7 +5,11 @@ import io.split.client.impressions.UniqueKeysTracker; import io.split.engine.segments.SegmentChangeFetcher; import io.split.engine.segments.SegmentSynchronizationTaskImp; -import io.split.storages.*; +import io.split.storages.SegmentCache; +import io.split.storages.SegmentCacheProducer; +import io.split.storages.SplitCache; +import io.split.storages.SplitCacheConsumer; +import io.split.storages.SplitCacheProducer; import io.split.storages.memory.InMemoryCacheImp; import io.split.engine.experiments.FetchResult; import io.split.engine.experiments.SplitFetcherImp; @@ -113,7 +117,7 @@ public void stopPeriodicFetching() { public void streamingRetryOnSplit() { when(_splitCacheProducer.getChangeNumber()).thenReturn(0l).thenReturn(0l).thenReturn(1l); when(_splitFetcher.forceRefresh(Mockito.anyObject())).thenReturn(new FetchResult(true, new HashSet<>())); - _synchronizer.refreshSplits(1l); + _synchronizer.refreshSplits(1L); Mockito.verify(_splitCacheProducer, Mockito.times(3)).getChangeNumber(); } @@ -138,14 +142,14 @@ public void streamingRetryOnSplitAndSegment() { SegmentFetcher fetcher = Mockito.mock(SegmentFetcher.class); when(_segmentCacheProducer.getChangeNumber(Mockito.anyString())).thenReturn(0l).thenReturn(0l).thenReturn(1l); when(_segmentFetcher.getFetcher(Mockito.anyString())).thenReturn(fetcher); - _synchronizer.refreshSplits(1l); + _synchronizer.refreshSplits(1L); Mockito.verify(_splitCacheProducer, Mockito.times(3)).getChangeNumber(); Mockito.verify(_segmentFetcher, Mockito.times(2)).getFetcher(Mockito.anyString()); } @Test - public void testCDNBypassIsRequestedAfterNFailures() throws NoSuchFieldException, IllegalAccessException { + public void testCDNBypassIsRequestedAfterNFailures() { SplitCache cache = new InMemoryCacheImp(); Synchronizer imp = new SynchronizerImp(_splitTasks, @@ -317,4 +321,4 @@ public void testDataRecording(){ Mockito.verify(_uniqueKeysTracker, Mockito.times(1)).stop(); Mockito.verify(_telemetrySyncTask, Mockito.times(1)).stopScheduledTask(); } -} +} \ No newline at end of file diff --git a/client/src/test/java/io/split/engine/sse/NotificationProcessorTest.java b/client/src/test/java/io/split/engine/sse/NotificationProcessorTest.java index 99aef62c5..a56f05dd1 100644 --- a/client/src/test/java/io/split/engine/sse/NotificationProcessorTest.java +++ b/client/src/test/java/io/split/engine/sse/NotificationProcessorTest.java @@ -8,39 +8,40 @@ import io.split.engine.sse.dtos.SegmentQueueDto; import io.split.engine.sse.dtos.SplitKillNotification; import io.split.engine.sse.workers.SegmentsWorkerImp; -import io.split.engine.sse.workers.SplitsWorker; +import io.split.engine.sse.workers.FeatureFlagsWorker; import io.split.engine.sse.workers.Worker; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import java.io.UnsupportedEncodingException; - public class NotificationProcessorTest { - private SplitsWorker _splitsWorker; + private FeatureFlagsWorker _featureFlagsWorker; private Worker _segmentWorker; private NotificationProcessor _notificationProcessor; private PushStatusTracker _pushStatusTracker; @Before public void setUp() { - _splitsWorker = Mockito.mock(SplitsWorker.class); + _featureFlagsWorker = Mockito.mock(FeatureFlagsWorker.class); _segmentWorker = Mockito.mock(SegmentsWorkerImp.class); _pushStatusTracker = Mockito.mock(PushStatusTracker.class); - _notificationProcessor = new NotificationProcessorImp(_splitsWorker, _segmentWorker, _pushStatusTracker); + _notificationProcessor = new NotificationProcessorImp(_featureFlagsWorker, _segmentWorker, _pushStatusTracker); } @Test - public void processSplitUpdateAddToQueueInWorker() throws UnsupportedEncodingException { + public void processSplitUpdateAddToQueueInWorker() { long changeNumber = 1585867723838L; String channel = "splits"; - GenericNotificationData genericNotificationData = new GenericNotificationData(changeNumber, null, null, null, null, null, null, channel, null, null, null); + GenericNotificationData genericNotificationData = GenericNotificationData.builder() + .changeNumber(changeNumber) + .channel(channel) + .build(); FeatureFlagChangeNotification splitChangeNotification = new FeatureFlagChangeNotification(genericNotificationData); _notificationProcessor.process(splitChangeNotification); - Mockito.verify(_splitsWorker, Mockito.times(1)).addToQueue(splitChangeNotification.getChangeNumber()); + Mockito.verify(_featureFlagsWorker, Mockito.times(1)).addToQueue(Mockito.anyObject()); } @Test @@ -49,13 +50,18 @@ public void processSplitKillAndAddToQueueInWorker() { String defaultTreatment = "off"; String splitName = "test-split"; String channel = "splits"; - GenericNotificationData genericNotificationData = new GenericNotificationData(changeNumber, defaultTreatment, splitName, null, null, null, null, channel, null, null, null); + GenericNotificationData genericNotificationData = GenericNotificationData.builder() + .changeNumber(changeNumber) + .defaultTreatment(defaultTreatment) + .featureFlagName(splitName) + .channel(channel) + .build(); SplitKillNotification splitKillNotification = new SplitKillNotification(genericNotificationData); _notificationProcessor.process(splitKillNotification); - Mockito.verify(_splitsWorker, Mockito.times(1)).killSplit(splitKillNotification.getChangeNumber(), splitKillNotification.getSplitName(), splitKillNotification.getDefaultTreatment()); - Mockito.verify(_splitsWorker, Mockito.times(1)).addToQueue(splitKillNotification.getChangeNumber()); + Mockito.verify(_featureFlagsWorker, Mockito.times(1)).kill(splitKillNotification); + Mockito.verify(_featureFlagsWorker, Mockito.times(1)).addToQueue(Mockito.anyObject()); } @Test @@ -63,7 +69,11 @@ public void processSegmentUpdateAddToQueueInWorker() { long changeNumber = 1585867723838L; String segmentName = "segment-test"; String channel = "segments"; - GenericNotificationData genericNotificationData = new GenericNotificationData(changeNumber, null, null, null, null, segmentName, null, channel, null, null, null); + GenericNotificationData genericNotificationData = GenericNotificationData.builder() + .changeNumber(changeNumber) + .segmentName(segmentName) + .channel(channel) + .build(); SegmentChangeNotification segmentChangeNotification = new SegmentChangeNotification(genericNotificationData); _notificationProcessor.process(segmentChangeNotification); @@ -83,7 +93,9 @@ public void processControlNotification() { @Test public void processOccupancyNotification() { - GenericNotificationData genericNotificationData = new GenericNotificationData(null, null, null, null, null, null, null, "control_pri", null, null, null); + GenericNotificationData genericNotificationData = GenericNotificationData.builder() + .channel("control_pri") + .build(); OccupancyNotification occupancyNotification = new OccupancyNotification(genericNotificationData); _notificationProcessor.process(occupancyNotification); diff --git a/client/src/test/java/io/split/engine/sse/PushStatusTrackerTest.java b/client/src/test/java/io/split/engine/sse/PushStatusTrackerTest.java index 2d08c49d8..2660f9e1c 100644 --- a/client/src/test/java/io/split/engine/sse/PushStatusTrackerTest.java +++ b/client/src/test/java/io/split/engine/sse/PushStatusTrackerTest.java @@ -162,17 +162,11 @@ private OccupancyNotification buildOccupancyNotification(int publishers, String } private GenericNotificationData buildGenericData(ControlType controlType, IncomingNotification.Type type, Integer publishers, String channel) { - return new GenericNotificationData( - null, - null, - null, - controlType, - publishers != null ? new OccupancyMetrics(publishers) : null, - null, - type, - channel == null ? "channel-test" : channel, - null, - null, - null); + return GenericNotificationData.builder() + .controlType(controlType) + .metrics(publishers != null ? new OccupancyMetrics(publishers) : null) + .type(type) + .channel(channel == null ? "channel-test" : channel) + .build(); } } \ No newline at end of file diff --git a/client/src/test/java/io/split/engine/sse/workers/FeatureFlagWorkerImpTest.java b/client/src/test/java/io/split/engine/sse/workers/FeatureFlagWorkerImpTest.java new file mode 100644 index 000000000..ec926590b --- /dev/null +++ b/client/src/test/java/io/split/engine/sse/workers/FeatureFlagWorkerImpTest.java @@ -0,0 +1,46 @@ +package io.split.engine.sse.workers; + +import io.split.client.utils.Json; +import io.split.engine.common.Synchronizer; +import io.split.engine.common.SynchronizerImp; +import io.split.engine.experiments.SplitParser; +import io.split.engine.sse.dtos.FeatureFlagChangeNotification; +import io.split.engine.sse.dtos.GenericNotificationData; +import io.split.engine.sse.dtos.RawMessageNotification; +import io.split.storages.SplitCacheProducer; +import org.junit.Test; +import org.mockito.Mockito; + +public class FeatureFlagWorkerImpTest { + + @Test + public void testRefreshSplitsWithCorrectFF(){ + SplitParser splitParser = new SplitParser(); + Synchronizer synchronizer = Mockito.mock(SynchronizerImp.class); + SplitCacheProducer splitCacheProducer = Mockito.mock(SplitCacheProducer.class); + FeatureFlagWorkerImp featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer); + String notification = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":2,\\\"d\\\":\\\"eJzMk99u2kwQxV8lOtdryQZj8N6hD5QPlThSTVNVEUKDPYZt1jZar1OlyO9emf8lVFWv2ss5zJyd82O8hTWUZSqZvW04opwhUVdsIKBSSKR+10vS1HWW7pIdz2NyBjRwHS8IXEopTLgbQqDYT+ZUm3LxlV4J4mg81LpMyKqygPRc94YeM6eQTtjphp4fegLVXvD6Qdjt9wPXF6gs2bqCxPC/2eRpDIEXpXXblpGuWCDljGptZ4bJ5lxYSJRZBoFkTcWKozpfsoH0goHfCXpB6PfcngDpVQnZEUjKIlOr2uwWqiC3zU5L1aF+3p7LFhUkPv8/mY2nk3gGgZxssmZzb8p6A9n25ktVtA9iGI3ODXunQ3HDp+AVWT6F+rZWlrWq7MN+YkSWWvuTDvkMSnNV7J6oTdl6qKTEvGnmjcCGjL2IYC/ovPYgUKnvvPtbmrmApiVryLM7p2jE++AfH6fTx09/HvuF32LWnNjStM0Xh3c8ukZcsZlEi3h8/zCObsBpJ0acqYLTmFdtqitK1V6NzrfpdPBbLmVx4uK26e27izpDu/r5yf/16AXun2Cr4u6w591xw7+LfDidLj6Mv8TXwP8xbofv/c7UmtHMmx8BAAD//0fclvU=\\\"}\"}"; + RawMessageNotification rawMessageNotification = Json.fromJson(notification, RawMessageNotification.class); + GenericNotificationData genericNotificationData = Json.fromJson(rawMessageNotification.getData(), GenericNotificationData.class); + FeatureFlagChangeNotification featureFlagChangeNotification = new FeatureFlagChangeNotification(genericNotificationData); + featureFlagsWorker.executeRefresh(featureFlagChangeNotification); + Mockito.verify(splitCacheProducer, Mockito.times(1)).updateFeatureFlag(Mockito.anyObject()); + Mockito.verify(synchronizer, Mockito.times(0)).refreshSplits(1684265694505L); + } + + @Test + public void testRefreshSplitsWithEmptyData(){ + SplitParser splitParser = new SplitParser(); + Synchronizer synchronizer = Mockito.mock(SynchronizerImp.class); + SplitCacheProducer splitCacheProducer = Mockito.mock(SplitCacheProducer.class); + FeatureFlagWorkerImp featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer); + String notification = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505}\"}"; + RawMessageNotification rawMessageNotification = Json.fromJson(notification, RawMessageNotification.class); + GenericNotificationData genericNotificationData = Json.fromJson(rawMessageNotification.getData(), GenericNotificationData.class); + FeatureFlagChangeNotification featureFlagChangeNotification = new FeatureFlagChangeNotification(genericNotificationData); + featureFlagsWorker.executeRefresh(featureFlagChangeNotification); + Mockito.verify(splitCacheProducer, Mockito.times(0)).updateFeatureFlag(Mockito.anyObject()); + Mockito.verify(synchronizer, Mockito.times(1)).refreshSplits(1684265694505L); + } + +} \ No newline at end of file diff --git a/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java b/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java index 8db49054b..864ea58ed 100644 --- a/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java +++ b/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java @@ -1,6 +1,13 @@ package io.split.engine.sse.workers; +import io.split.client.utils.Json; import io.split.engine.common.Synchronizer; +import io.split.engine.experiments.SplitParser; +import io.split.engine.sse.dtos.FeatureFlagChangeNotification; +import io.split.engine.sse.dtos.GenericNotificationData; +import io.split.engine.sse.dtos.RawMessageNotification; +import io.split.engine.sse.dtos.SplitKillNotification; +import io.split.storages.SplitCacheProducer; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -15,70 +22,99 @@ public class SplitsWorkerTest { @Test public void addToQueueWithoutElementsWShouldNotTriggerFetch() throws InterruptedException { Synchronizer splitFetcherMock = Mockito.mock(Synchronizer.class); + SplitParser splitParser = new SplitParser(); + SplitCacheProducer splitCacheProducer = Mockito.mock(SplitCacheProducer.class); - SplitsWorker splitsWorker = new SplitsWorkerImp(splitFetcherMock); - splitsWorker.start(); + FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(splitFetcherMock, splitParser, splitCacheProducer); + featureFlagsWorker.start(); Thread.sleep(500); - Mockito.verify(splitFetcherMock, Mockito.never()).refreshSplits(Mockito.anyLong()); - splitsWorker.stop(); + Mockito.verify(splitFetcherMock, Mockito.never()).refreshSplits(Mockito.anyObject()); + featureFlagsWorker.stop(); } @Test public void addToQueueWithElementsWShouldTriggerFetch() throws InterruptedException { Synchronizer syncMock = Mockito.mock(Synchronizer.class); + SplitParser splitParser = new SplitParser(); + SplitCacheProducer splitCacheProducer = Mockito.mock(SplitCacheProducer.class); - SplitsWorker splitsWorker = new SplitsWorkerImp(syncMock); - splitsWorker.start(); + FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(syncMock, splitParser, splitCacheProducer); + featureFlagsWorker.start(); ArgumentCaptor cnCaptor = ArgumentCaptor.forClass(Long.class); - splitsWorker.addToQueue(1585956698457L); - splitsWorker.addToQueue(1585956698467L); - splitsWorker.addToQueue(1585956698477L); - splitsWorker.addToQueue(1585956698476L); + + featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + .changeNumber(1585956698457L) + .build())); + featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + .changeNumber(1585956698467L) + .build())); + featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + .changeNumber(1585956698477L) + .build())); + featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + .changeNumber(1585956698476L) + .build())); Thread.sleep(1000); Mockito.verify(syncMock, Mockito.times(4)).refreshSplits(cnCaptor.capture()); List captured = cnCaptor.getAllValues(); assertThat(captured, contains(1585956698457L, 1585956698467L, 1585956698477L, 1585956698476L)); - splitsWorker.stop(); + featureFlagsWorker.stop(); } @Test public void killShouldTriggerFetch() { long changeNumber = 1585956698457L; - String splitName = "split-test"; + String featureFlagName = "feature-flag-test"; String defaultTreatment = "off"; Synchronizer syncMock = Mockito.mock(Synchronizer.class); - SplitsWorker splitsWorker = new SplitsWorkerImp(syncMock); - splitsWorker.start(); - - splitsWorker.killSplit(changeNumber, splitName, defaultTreatment); - Mockito.verify(syncMock, Mockito.times(1)).localKillSplit(splitName, defaultTreatment, changeNumber); - splitsWorker.stop(); + SplitParser splitParser = new SplitParser(); + SplitCacheProducer splitCacheProducer = Mockito.mock(SplitCacheProducer.class); + FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(syncMock, splitParser, splitCacheProducer) { + }; + featureFlagsWorker.start(); + SplitKillNotification splitKillNotification = new SplitKillNotification(GenericNotificationData.builder() + .changeNumber(changeNumber) + .defaultTreatment(defaultTreatment) + .featureFlagName(featureFlagName) + .build()); + + featureFlagsWorker.kill(splitKillNotification); + Mockito.verify(syncMock, Mockito.times(1)).localKillSplit(splitKillNotification); + featureFlagsWorker.stop(); } @Test public void messagesNotProcessedWhenWorkerStopped() throws InterruptedException { Synchronizer syncMock = Mockito.mock(Synchronizer.class); - SplitsWorker splitsWorker = new SplitsWorkerImp(syncMock); - splitsWorker.start(); - splitsWorker.addToQueue(1585956698457L); + SplitParser splitParser = new SplitParser(); + SplitCacheProducer splitCacheProducer = Mockito.mock(SplitCacheProducer.class); + FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(syncMock, splitParser, splitCacheProducer); + featureFlagsWorker.start(); + featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + .changeNumber(1585956698457L) + .build())); Thread.sleep(500); - splitsWorker.stop(); + featureFlagsWorker.stop(); Thread.sleep(500); - splitsWorker.addToQueue(1585956698467L); - Mockito.verify(syncMock, Mockito.times(1)).refreshSplits(1585956698457L); // Previous one! + featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + .changeNumber(1585956698467L) + .build())); + Mockito.verify(syncMock, Mockito.times(1)).refreshSplits(Mockito.anyObject()); // Previous one! Mockito.reset(syncMock); - splitsWorker.start(); - splitsWorker.addToQueue(1585956698477L); + featureFlagsWorker.start(); + featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder() + .changeNumber(1585956698477L) + .build())); Thread.sleep(500); - Mockito.verify(syncMock, Mockito.times(1)).refreshSplits(1585956698477L); - splitsWorker.stop(); + Mockito.verify(syncMock, Mockito.times(1)).refreshSplits(Mockito.anyObject()); + featureFlagsWorker.stop(); } -} +} \ No newline at end of file