Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public ErrorNotification parseError(String payload) throws EventParsingException
private IncomingNotification parseNotification(GenericNotificationData genericNotificationData) throws Exception {
switch (genericNotificationData.getType()) {
case SPLIT_UPDATE:
return new SplitChangeNotification(genericNotificationData);
return new FeatureFlagChangeNotification(genericNotificationData);
case SPLIT_KILL:
return new SplitKillNotification(genericNotificationData);
case SEGMENT_UPDATE:
Expand All @@ -62,4 +62,4 @@ private IncomingNotification parseControlChannelMessage(GenericNotificationData

return new OccupancyNotification(genericNotificationData);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.split.engine.sse.dtos;

import io.split.engine.sse.NotificationProcessor;
import io.split.engine.sse.enums.CompressType;

public class FeatureFlagChangeNotification extends IncomingNotification {
private final long changeNumber;
private long previousChangeNumber;
private String featureFlagDefinition;
private CompressType compressType;


public FeatureFlagChangeNotification(GenericNotificationData genericNotificationData) {
super(Type.SPLIT_UPDATE, genericNotificationData.getChannel());
changeNumber = genericNotificationData.getChangeNumber();
if(genericNotificationData.getPreviousChangeNumber() != null) {
previousChangeNumber = genericNotificationData.getPreviousChangeNumber();
}
featureFlagDefinition = genericNotificationData.getFeatureFlagDefinition();
compressType = CompressType.from(genericNotificationData.getCompressType());
}

public long getChangeNumber() {
return changeNumber;
}
public long getPreviousChangeNumber() {
return previousChangeNumber;
}

public String getFeatureFlagDefinition() {
return featureFlagDefinition;
}

public CompressType getCompressType() {
return compressType;
}

@Override
public void handler(NotificationProcessor notificationProcessor) {
notificationProcessor.processSplitUpdate(getChangeNumber());
}

@Override
public String toString() {
return String.format("Type: %s; Channel: %s; ChangeNumber: %s", getType(), getChannel(), getChangeNumber());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.split.engine.sse.dtos;

import com.google.gson.annotations.SerializedName;

public class GenericNotificationData {
private final Long changeNumber;
private final String defaultTreatment;
Expand All @@ -9,6 +11,12 @@ public class GenericNotificationData {
private final String segmentName;
private final IncomingNotification.Type type;
private String channel;
@SerializedName("pcn")
private Long previousChangeNumber;
@SerializedName("d")
private String featureFlagDefinition;
@SerializedName("c")
private Integer compressType;

public GenericNotificationData (Long changeNumber,
String defaultTreatment,
Expand All @@ -17,7 +25,10 @@ public GenericNotificationData (Long changeNumber,
OccupancyMetrics occupancyMetrics,
String segmentName,
IncomingNotification.Type type,
String channel) {
String channel,
Long previousChangeNumber,
String data,
Integer compressType) {
this.changeNumber = changeNumber;
this.defaultTreatment = defaultTreatment;
this.splitName = splitName;
Expand All @@ -26,6 +37,9 @@ public GenericNotificationData (Long changeNumber,
this.segmentName = segmentName;
this.type = type;
this.channel = channel;
this.previousChangeNumber = previousChangeNumber;
this.featureFlagDefinition = data;
this.compressType = compressType;
}

public long getChangeNumber() {
Expand Down Expand Up @@ -57,8 +71,19 @@ public IncomingNotification.Type getType() {
}

public String getChannel() { return channel; }
public Long getPreviousChangeNumber() {
return previousChangeNumber;
}

public String getFeatureFlagDefinition() {
return featureFlagDefinition;
}

public Integer getCompressType() {
return compressType;
}

public void setChannel(String channel) {
this.channel = channel;
}
}
}

This file was deleted.

39 changes: 39 additions & 0 deletions client/src/main/java/io/split/engine/sse/enums/CompressType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.split.engine.sse.enums;

import java.util.HashMap;
import java.util.Map;

public enum CompressType {
NOT_COMPRESSED(0),
GZIP(1),
ZLIB(2);

private final Integer value;

CompressType(Integer value) {
this.value = value;
}

public long getValue() {
return value;
}

// Mapping compress type to compress type id
private static final Map<Integer, CompressType> _map = new HashMap<>();
static {
for (CompressType compressType : CompressType.values())
_map.put(compressType.value, compressType);
}

/**
* Get compress type from value
* @param value value
* @return CompressType
*/
public static CompressType from(Integer value) {
if (value == null || _map.size() <= value){
return null;
}
return _map.get(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.split.SSEMockServer;
import io.split.engine.sse.client.SSEClient;
import io.split.engine.sse.dtos.ErrorNotification;
import io.split.engine.sse.dtos.SplitChangeNotification;
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
import io.split.telemetry.storage.InMemoryTelemetryStorage;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import org.apache.hc.client5.http.config.RequestConfig;
Expand Down Expand Up @@ -44,7 +44,7 @@ public void startShouldConnect() throws IOException {

EventSourceClient eventSourceClient = new EventSourceClientImp("http://localhost:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null);

boolean result = eventSourceClient.start("channel-test","token-test");
boolean result = eventSourceClient.start("channel-test", "token-test");

Assert.assertTrue(result);

Expand All @@ -59,7 +59,7 @@ public void startShouldReconnect() throws IOException {
sseServer.start();
EventSourceClient eventSourceClient = new EventSourceClientImp("http://fake:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null);

boolean result = eventSourceClient.start("channel-test","token-test");
boolean result = eventSourceClient.start("channel-test", "token-test");

Assert.assertFalse(result);

Expand All @@ -76,7 +76,7 @@ public void startAndReceiveNotification() throws IOException {
sseServer.start();
EventSourceClient eventSourceClient = new EventSourceClientImp("http://localhost:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null);

boolean result = eventSourceClient.start("channel-test","token-test");
boolean result = eventSourceClient.start("channel-test", "token-test");

Assert.assertTrue(result);

Expand All @@ -95,7 +95,7 @@ public void startAndReceiveNotification() throws IOException {

Awaitility.await()
.atMost(50L, TimeUnit.SECONDS)
.untilAsserted(() -> Mockito.verify(_notificationProcessor, Mockito.times(1)).process(Mockito.any(SplitChangeNotification.class)));
.untilAsserted(() -> Mockito.verify(_notificationProcessor, Mockito.times(1)).process(Mockito.any(FeatureFlagChangeNotification.class)));

OutboundSseEvent sseEventError = new OutboundEvent
.Builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.split.engine.sse;

import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
import io.split.engine.sse.enums.CompressType;
import io.split.engine.sse.exceptions.EventParsingException;
import org.junit.Assert;
import org.junit.Test;

public class NotificationParserImpTest {

@Test
public void validateZlibCompressType() throws EventParsingException {
String payload = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":2,\\\"d\\\":\\\"eJzMk99u2kwQxV8lOtdryQZj8N6hD5QPlThSTVNVEUKDPYZt1jZar1OlyO9emf8lVFWv2ss5zJyd82O8hTWUZSqZvW04opwhUVdsIKBSSKR+10vS1HWW7pIdz2NyBjRwHS8IXEopTLgbQqDYT+ZUm3LxlV4J4mg81LpMyKqygPRc94YeM6eQTtjphp4fegLVXvD6Qdjt9wPXF6gs2bqCxPC/2eRpDIEXpXXblpGuWCDljGptZ4bJ5lxYSJRZBoFkTcWKozpfsoH0goHfCXpB6PfcngDpVQnZEUjKIlOr2uwWqiC3zU5L1aF+3p7LFhUkPv8/mY2nk3gGgZxssmZzb8p6A9n25ktVtA9iGI3ODXunQ3HDp+AVWT6F+rZWlrWq7MN+YkSWWvuTDvkMSnNV7J6oTdl6qKTEvGnmjcCGjL2IYC/ovPYgUKnvvPtbmrmApiVryLM7p2jE++AfH6fTx09/HvuF32LWnNjStM0Xh3c8ukZcsZlEi3h8/zCObsBpJ0acqYLTmFdtqitK1V6NzrfpdPBbLmVx4uK26e27izpDu/r5yf/16AXun2Cr4u6w591xw7+LfDidLj6Mv8TXwP8xbofv/c7UmtHMmx8BAAD//0fclvU=\\\"}\"}";
NotificationParserImp notificationParserImp = new NotificationParserImp();

FeatureFlagChangeNotification incomingNotification = (FeatureFlagChangeNotification) notificationParserImp.parseMessage(payload);
Assert.assertEquals(CompressType.ZLIB, incomingNotification.getCompressType());
Assert.assertEquals(0, incomingNotification.getPreviousChangeNumber());
}

@Test
public void validateGzipCompressType() throws EventParsingException {
String payload = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":1,\\\"d\\\":\\\"eJzMk99u2kwQxV8lOtdryQZj8N6hD5QPlThSTVNVEUKDPYZt1jZar1OlyO9emf8lVFWv2ss5zJyd82O8hTWUZSqZvW04opwhUVdsIKBSSKR+10vS1HWW7pIdz2NyBjRwHS8IXEopTLgbQqDYT+ZUm3LxlV4J4mg81LpMyKqygPRc94YeM6eQTtjphp4fegLVXvD6Qdjt9wPXF6gs2bqCxPC/2eRpDIEXpXXblpGuWCDljGptZ4bJ5lxYSJRZBoFkTcWKozpfsoH0goHfCXpB6PfcngDpVQnZEUjKIlOr2uwWqiC3zU5L1aF+3p7LFhUkPv8/mY2nk3gGgZxssmZzb8p6A9n25ktVtA9iGI3ODXunQ3HDp+AVWT6F+rZWlrWq7MN+YkSWWvuTDvkMSnNV7J6oTdl6qKTEvGnmjcCGjL2IYC/ovPYgUKnvvPtbmrmApiVryLM7p2jE++AfH6fTx09/HvuF32LWnNjStM0Xh3c8ukZcsZlEi3h8/zCObsBpJ0acqYLTmFdtqitK1V6NzrfpdPBbLmVx4uK26e27izpDu/r5yf/16AXun2Cr4u6w591xw7+LfDidLj6Mv8TXwP8xbofv/c7UmtHMmx8BAAD//0fclvU=\\\"}\"}";
NotificationParserImp notificationParserImp = new NotificationParserImp();

FeatureFlagChangeNotification incomingNotification = (FeatureFlagChangeNotification) notificationParserImp.parseMessage(payload);
Assert.assertEquals(CompressType.GZIP, incomingNotification.getCompressType());
Assert.assertEquals(0, incomingNotification.getPreviousChangeNumber());
}

@Test
public void validateNotCompressType() throws EventParsingException {
String payload = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":0,\\\"d\\\":\\\"eJzMk99u2kwQxV8lOtdryQZj8N6hD5QPlThSTVNVEUKDPYZt1jZar1OlyO9emf8lVFWv2ss5zJyd82O8hTWUZSqZvW04opwhUVdsIKBSSKR+10vS1HWW7pIdz2NyBjRwHS8IXEopTLgbQqDYT+ZUm3LxlV4J4mg81LpMyKqygPRc94YeM6eQTtjphp4fegLVXvD6Qdjt9wPXF6gs2bqCxPC/2eRpDIEXpXXblpGuWCDljGptZ4bJ5lxYSJRZBoFkTcWKozpfsoH0goHfCXpB6PfcngDpVQnZEUjKIlOr2uwWqiC3zU5L1aF+3p7LFhUkPv8/mY2nk3gGgZxssmZzb8p6A9n25ktVtA9iGI3ODXunQ3HDp+AVWT6F+rZWlrWq7MN+YkSWWvuTDvkMSnNV7J6oTdl6qKTEvGnmjcCGjL2IYC/ovPYgUKnvvPtbmrmApiVryLM7p2jE++AfH6fTx09/HvuF32LWnNjStM0Xh3c8ukZcsZlEi3h8/zCObsBpJ0acqYLTmFdtqitK1V6NzrfpdPBbLmVx4uK26e27izpDu/r5yf/16AXun2Cr4u6w591xw7+LfDidLj6Mv8TXwP8xbofv/c7UmtHMmx8BAAD//0fclvU=\\\"}\"}";
NotificationParserImp notificationParserImp = new NotificationParserImp();

FeatureFlagChangeNotification incomingNotification = (FeatureFlagChangeNotification) notificationParserImp.parseMessage(payload);
Assert.assertEquals(CompressType.NOT_COMPRESSED, incomingNotification.getCompressType());
Assert.assertEquals(0, incomingNotification.getPreviousChangeNumber());
}

@Test
public void validateCompressTypeIncorrect() throws EventParsingException {
String payload = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":3,\\\"d\\\":\\\"eJzMk99u2kwQxV8lOtdryQZj8N6hD5QPlThSTVNVEUKDPYZt1jZar1OlyO9emf8lVFWv2ss5zJyd82O8hTWUZSqZvW04opwhUVdsIKBSSKR+10vS1HWW7pIdz2NyBjRwHS8IXEopTLgbQqDYT+ZUm3LxlV4J4mg81LpMyKqygPRc94YeM6eQTtjphp4fegLVXvD6Qdjt9wPXF6gs2bqCxPC/2eRpDIEXpXXblpGuWCDljGptZ4bJ5lxYSJRZBoFkTcWKozpfsoH0goHfCXpB6PfcngDpVQnZEUjKIlOr2uwWqiC3zU5L1aF+3p7LFhUkPv8/mY2nk3gGgZxssmZzb8p6A9n25ktVtA9iGI3ODXunQ3HDp+AVWT6F+rZWlrWq7MN+YkSWWvuTDvkMSnNV7J6oTdl6qKTEvGnmjcCGjL2IYC/ovPYgUKnvvPtbmrmApiVryLM7p2jE++AfH6fTx09/HvuF32LWnNjStM0Xh3c8ukZcsZlEi3h8/zCObsBpJ0acqYLTmFdtqitK1V6NzrfpdPBbLmVx4uK26e27izpDu/r5yf/16AXun2Cr4u6w591xw7+LfDidLj6Mv8TXwP8xbofv/c7UmtHMmx8BAAD//0fclvU=\\\"}\"}";
NotificationParserImp notificationParserImp = new NotificationParserImp();

FeatureFlagChangeNotification incomingNotification = (FeatureFlagChangeNotification) notificationParserImp.parseMessage(payload);
Assert.assertNull(incomingNotification.getCompressType());
Assert.assertEquals(0, incomingNotification.getPreviousChangeNumber());
}

@Test
public void validateCompressTypeNull() throws EventParsingException {
String payload = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"d\\\":\\\"eJzMk99u2kwQxV8lOtdryQZj8N6hD5QPlThSTVNVEUKDPYZt1jZar1OlyO9emf8lVFWv2ss5zJyd82O8hTWUZSqZvW04opwhUVdsIKBSSKR+10vS1HWW7pIdz2NyBjRwHS8IXEopTLgbQqDYT+ZUm3LxlV4J4mg81LpMyKqygPRc94YeM6eQTtjphp4fegLVXvD6Qdjt9wPXF6gs2bqCxPC/2eRpDIEXpXXblpGuWCDljGptZ4bJ5lxYSJRZBoFkTcWKozpfsoH0goHfCXpB6PfcngDpVQnZEUjKIlOr2uwWqiC3zU5L1aF+3p7LFhUkPv8/mY2nk3gGgZxssmZzb8p6A9n25ktVtA9iGI3ODXunQ3HDp+AVWT6F+rZWlrWq7MN+YkSWWvuTDvkMSnNV7J6oTdl6qKTEvGnmjcCGjL2IYC/ovPYgUKnvvPtbmrmApiVryLM7p2jE++AfH6fTx09/HvuF32LWnNjStM0Xh3c8ukZcsZlEi3h8/zCObsBpJ0acqYLTmFdtqitK1V6NzrfpdPBbLmVx4uK26e27izpDu/r5yf/16AXun2Cr4u6w591xw7+LfDidLj6Mv8TXwP8xbofv/c7UmtHMmx8BAAD//0fclvU=\\\"}\"}";
NotificationParserImp notificationParserImp = new NotificationParserImp();

FeatureFlagChangeNotification incomingNotification = (FeatureFlagChangeNotification) notificationParserImp.parseMessage(payload);
Assert.assertNull(incomingNotification.getCompressType());
Assert.assertEquals(0, incomingNotification.getPreviousChangeNumber());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void parseSplitUpdateShouldReturnParsedEvent() throws EventParsingExcepti
IncomingNotification result = notificationParser.parseMessage(payload);
assertEquals(IncomingNotification.Type.SPLIT_UPDATE, result.getType());
assertEquals("xxxx_xxxx_splits", result.getChannel());
assertEquals(1592590435115L, ((SplitChangeNotification) result).getChangeNumber());
assertEquals(1592590435115L, ((FeatureFlagChangeNotification) result).getChangeNumber());
}

@Test
Expand Down Expand Up @@ -149,4 +149,4 @@ public void parseControlStreamingDisabledShouldReturnParsedEvent() throws EventP
assertEquals("control_pri", result.getChannel());
assertEquals(ControlType.STREAMING_DISABLED, ((ControlNotification)result).getControlType());
}
}
}
Loading