Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
package io.split.engine.sse;

import io.split.client.utils.Json;
import io.split.engine.sse.dtos.*;

import io.split.engine.sse.dtos.ControlNotification;
import io.split.engine.sse.dtos.ErrorNotification;
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
import io.split.engine.sse.dtos.GenericNotificationData;
import io.split.engine.sse.dtos.IncomingNotification;
import io.split.engine.sse.dtos.OccupancyNotification;
import io.split.engine.sse.dtos.RawMessageNotification;
import io.split.engine.sse.dtos.SegmentChangeNotification;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.engine.sse.exceptions.EventParsingException;

public class NotificationParserImp implements NotificationParser {
Expand All @@ -13,11 +22,9 @@ public IncomingNotification parseMessage(String payload) throws EventParsingExce
RawMessageNotification rawMessageNotification = Json.fromJson(payload, RawMessageNotification.class);
GenericNotificationData genericNotificationData = Json.fromJson(rawMessageNotification.getData(), GenericNotificationData.class);
genericNotificationData.setChannel(rawMessageNotification.getChannel());

if (rawMessageNotification.getChannel().contains(OCCUPANCY_PREFIX)) {
return parseControlChannelMessage(genericNotificationData);
}

return parseNotification(genericNotificationData);
} catch (Exception ex) {
throw new EventParsingException("Error parsing event.", ex, payload);
Expand All @@ -28,11 +35,9 @@ public IncomingNotification parseMessage(String payload) throws EventParsingExce
public ErrorNotification parseError(String payload) throws EventParsingException {
try {
ErrorNotification messageError = Json.fromJson(payload, ErrorNotification.class);

if (messageError.getMessage() == null || messageError.getStatusCode() == null) {
throw new Exception("Wrong notification format.");
}

return messageError;
} catch (Exception ex) {
throw new EventParsingException("Error parsing event.", ex, payload);
Expand All @@ -59,7 +64,6 @@ private IncomingNotification parseControlChannelMessage(GenericNotificationData
if (genericNotificationData.getControlType() != null) {
return new ControlNotification(genericNotificationData);
}

return new OccupancyNotification(genericNotificationData);
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,56 @@
package io.split.engine.sse.dtos;

import io.split.client.dtos.Split;
import io.split.client.utils.Json;
import io.split.engine.segments.SegmentSynchronizationTaskImp;
import io.split.engine.sse.NotificationProcessor;
import io.split.engine.sse.enums.CompressType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Base64;
import java.util.zip.DataFormatException;

import static io.split.engine.sse.utils.DecompressionUtil.gZipDecompress;
import static io.split.engine.sse.utils.DecompressionUtil.zLibDecompress;

public class FeatureFlagChangeNotification extends IncomingNotification {
private static final Logger _log = LoggerFactory.getLogger(SegmentSynchronizationTaskImp.class);
private final long changeNumber;
private long previousChangeNumber;
private String featureFlagDefinition;
private Split featureFlagDefinition;
private CompressType compressType;


public FeatureFlagChangeNotification(GenericNotificationData genericNotificationData) {
super(Type.SPLIT_UPDATE, genericNotificationData.getChannel());
changeNumber = genericNotificationData.getChangeNumber();
if(genericNotificationData.getPreviousChangeNumber() != null) {
previousChangeNumber = genericNotificationData.getPreviousChangeNumber();
}
featureFlagDefinition = genericNotificationData.getFeatureFlagDefinition();
compressType = CompressType.from(genericNotificationData.getCompressType());
if (compressType == null || genericNotificationData.getFeatureFlagDefinition() == null) {
return;
}
try {
byte[] decodedBytes = Base64.getDecoder().decode(genericNotificationData.getFeatureFlagDefinition());
switch (compressType) {
case GZIP:
decodedBytes = gZipDecompress(decodedBytes);
break;
case ZLIB:
decodedBytes = zLibDecompress(decodedBytes);
break;
}
featureFlagDefinition = Json.fromJson(new String(decodedBytes, 0, decodedBytes.length, "UTF-8"), Split.class);
} catch (UnsupportedEncodingException | IllegalArgumentException e) {
_log.warn("Could not decode base64 data in flag definition", e);
} catch (DataFormatException d) {
_log.warn("Could not decompress feature flag definition with zlib algorithm", d);
} catch (IOException i) {
_log.warn("Could not decompress feature flag definition with gzip algorithm", i);
}
}

public long getChangeNumber() {
Expand All @@ -27,7 +60,7 @@ public long getPreviousChangeNumber() {
return previousChangeNumber;
}

public String getFeatureFlagDefinition() {
public Split getFeatureFlagDefinition() {
return featureFlagDefinition;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ private String addPrefixControlChannels(String channels) {
.replace("control_pri", "[?occupancy=metrics.publishers]control_pri")
.replace("control_sec", "[?occupancy=metrics.publishers]control_sec");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package io.split.engine.sse.utils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.DataFormatException;
import java.util.zip.GZIPInputStream;
import java.util.zip.Inflater;

public class DecompressionUtil {

public static byte[] zLibDecompress(byte[] toDecompress){
public static byte[] zLibDecompress(byte[] toDecompress) throws DataFormatException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(toDecompress.length);
Inflater decompressor = new Inflater();
try {
Expand All @@ -16,11 +19,24 @@ public static byte[] zLibDecompress(byte[] toDecompress){
int count = decompressor.inflate(buf);
byteArrayOutputStream.write(buf, 0, count);
}
} catch (DataFormatException e) {
throw new RuntimeException(e);
} finally {
decompressor.end();
}
return byteArrayOutputStream.toByteArray();
}

public static byte[] gZipDecompress(byte[] toDecompress) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (GZIPInputStream gzipInputStream = new GZIPInputStream(new ByteArrayInputStream(toDecompress))){
int res = 0;
byte buf[] = new byte[toDecompress.length];
while (res >= 0) {
res = gzipInputStream.read(buf, 0, buf.length);
if (res > 0) {
out.write(buf, 0, res);
}
}
}
return out.toByteArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
import io.split.engine.sse.enums.CompressType;
import io.split.engine.sse.exceptions.EventParsingException;

import org.junit.Assert;
import org.junit.Test;

Expand All @@ -14,26 +15,32 @@ public void validateZlibCompressType() throws EventParsingException {
NotificationParserImp notificationParserImp = new NotificationParserImp();

FeatureFlagChangeNotification incomingNotification = (FeatureFlagChangeNotification) notificationParserImp.parseMessage(payload);
Assert.assertEquals(incomingNotification.getFeatureFlagDefinition().name, "mauro_java");
Assert.assertEquals(incomingNotification.getFeatureFlagDefinition().changeNumber, 1684265694505L);
Assert.assertEquals(CompressType.ZLIB, incomingNotification.getCompressType());
Assert.assertEquals(0, incomingNotification.getPreviousChangeNumber());
}

@Test
public void validateGzipCompressType() throws EventParsingException {
String payload = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":1,\\\"d\\\":\\\"eJzMk99u2kwQxV8lOtdryQZj8N6hD5QPlThSTVNVEUKDPYZt1jZar1OlyO9emf8lVFWv2ss5zJyd82O8hTWUZSqZvW04opwhUVdsIKBSSKR+10vS1HWW7pIdz2NyBjRwHS8IXEopTLgbQqDYT+ZUm3LxlV4J4mg81LpMyKqygPRc94YeM6eQTtjphp4fegLVXvD6Qdjt9wPXF6gs2bqCxPC/2eRpDIEXpXXblpGuWCDljGptZ4bJ5lxYSJRZBoFkTcWKozpfsoH0goHfCXpB6PfcngDpVQnZEUjKIlOr2uwWqiC3zU5L1aF+3p7LFhUkPv8/mY2nk3gGgZxssmZzb8p6A9n25ktVtA9iGI3ODXunQ3HDp+AVWT6F+rZWlrWq7MN+YkSWWvuTDvkMSnNV7J6oTdl6qKTEvGnmjcCGjL2IYC/ovPYgUKnvvPtbmrmApiVryLM7p2jE++AfH6fTx09/HvuF32LWnNjStM0Xh3c8ukZcsZlEi3h8/zCObsBpJ0acqYLTmFdtqitK1V6NzrfpdPBbLmVx4uK26e27izpDu/r5yf/16AXun2Cr4u6w591xw7+LfDidLj6Mv8TXwP8xbofv/c7UmtHMmx8BAAD//0fclvU=\\\"}\"}";
String payload = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":1,\\\"d\\\":\\\"H4sIAAAAAAAA/8yT327aTBDFXyU612vJxoTgvUMfKB8qcaSapqoihAZ7DNusvWi9TpUiv3tl/pdQVb1qL+cwc3bOj/EGzlKeq3T6tuaYCoZEXbGFgMogkXXDIM0y31v4C/aCgMnrU9/3gl7Pp4yilMMIAuVusqDamvlXeiWIg/FAa5OSU6aEDHz/ip4wZ5Be1AmjoBsFAtVOCO56UXh31/O7ApUjV1eQGPw3HT+NIPCitG7bctIVC2ScU63d1DK5gksHCZPnEEhXVC45rosFW8ig1++GYej3g85tJEB6aSA7Aqkpc7Ws7XahCnLTbLVM7evnzalsUUHi8//j6WgyTqYQKMilK7b31tRryLa3WKiyfRCDeHhq2Dntiys+JS/J8THUt5VyrFXlHnYTQ3LU2h91yGdQVqhy+0RtTeuhUoNZ08wagTVZdxbBndF5vYVApb7z9m9pZgKaFqwhT+6coRHvg398nEweP/157Bd+S1hz6oxtm88O73B0jbhgM47nyej+YRRfgdNODDlXJWcJL9tUF5SqnRqfbtPr4LdcTHnk4rfp3buLOkG7+Pmp++vRM9w/wVblzX7Pm8OGfxf5YDKZfxh9SS6B/2Pc9t/7ja01o5k1PwIAAP//uTipVskEAAA=\\\"}\"}";
NotificationParserImp notificationParserImp = new NotificationParserImp();

FeatureFlagChangeNotification incomingNotification = (FeatureFlagChangeNotification) notificationParserImp.parseMessage(payload);
Assert.assertEquals(incomingNotification.getFeatureFlagDefinition().name, "mauro_java");
Assert.assertEquals(incomingNotification.getFeatureFlagDefinition().changeNumber, 1684333081259L);
Assert.assertEquals(CompressType.GZIP, incomingNotification.getCompressType());
Assert.assertEquals(0, incomingNotification.getPreviousChangeNumber());
}

@Test
public void validateNotCompressType() throws EventParsingException {
String payload = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684265694505,\\\"pcn\\\":0,\\\"c\\\":0,\\\"d\\\":\\\"eJzMk99u2kwQxV8lOtdryQZj8N6hD5QPlThSTVNVEUKDPYZt1jZar1OlyO9emf8lVFWv2ss5zJyd82O8hTWUZSqZvW04opwhUVdsIKBSSKR+10vS1HWW7pIdz2NyBjRwHS8IXEopTLgbQqDYT+ZUm3LxlV4J4mg81LpMyKqygPRc94YeM6eQTtjphp4fegLVXvD6Qdjt9wPXF6gs2bqCxPC/2eRpDIEXpXXblpGuWCDljGptZ4bJ5lxYSJRZBoFkTcWKozpfsoH0goHfCXpB6PfcngDpVQnZEUjKIlOr2uwWqiC3zU5L1aF+3p7LFhUkPv8/mY2nk3gGgZxssmZzb8p6A9n25ktVtA9iGI3ODXunQ3HDp+AVWT6F+rZWlrWq7MN+YkSWWvuTDvkMSnNV7J6oTdl6qKTEvGnmjcCGjL2IYC/ovPYgUKnvvPtbmrmApiVryLM7p2jE++AfH6fTx09/HvuF32LWnNjStM0Xh3c8ukZcsZlEi3h8/zCObsBpJ0acqYLTmFdtqitK1V6NzrfpdPBbLmVx4uK26e27izpDu/r5yf/16AXun2Cr4u6w591xw7+LfDidLj6Mv8TXwP8xbofv/c7UmtHMmx8BAAD//0fclvU=\\\"}\"}";
NotificationParserImp notificationParserImp = new NotificationParserImp();
String payload = "{\"id\":\"vQQ61wzBRO:0:0\",\"clientId\":\"pri:MTUxNzg3MDg1OQ==\",\"timestamp\":1684265694676,\"encoding\":\"json\",\"channel\":\"NzM2MDI5Mzc0_MjkyNTIzNjczMw==_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1684329854385,\\\"pcn\\\":0,\\\"c\\\":0,\\\"d\\\":\\\"eyJ0cmFmZmljVHlwZU5hbWUiOiJ1c2VyIiwiaWQiOiJkNDMxY2RkMC1iMGJlLTExZWEtOGE4MC0xNjYwYWRhOWNlMzkiLCJuYW1lIjoibWF1cm9famF2YSIsInRyYWZmaWNBbGxvY2F0aW9uIjoxMDAsInRyYWZmaWNBbGxvY2F0aW9uU2VlZCI6LTkyMzkxNDkxLCJzZWVkIjotMTc2OTM3NzYwNCwic3RhdHVzIjoiQUNUSVZFIiwia2lsbGVkIjpmYWxzZSwiZGVmYXVsdFRyZWF0bWVudCI6Im9mZiIsImNoYW5nZU51bWJlciI6MTY4NDMyOTg1NDM4NSwiYWxnbyI6MiwiY29uZmlndXJhdGlvbnMiOnt9LCJjb25kaXRpb25zIjpbeyJjb25kaXRpb25UeXBlIjoiV0hJVEVMSVNUIiwibWF0Y2hlckdyb3VwIjp7ImNvbWJpbmVyIjoiQU5EIiwibWF0Y2hlcnMiOlt7Im1hdGNoZXJUeXBlIjoiV0hJVEVMSVNUIiwibmVnYXRlIjpmYWxzZSwid2hpdGVsaXN0TWF0Y2hlckRhdGEiOnsid2hpdGVsaXN0IjpbImFkbWluIiwibWF1cm8iLCJuaWNvIl19fV19LCJwYXJ0aXRpb25zIjpbeyJ0cmVhdG1lbnQiOiJvZmYiLCJzaXplIjoxMDB9XSwibGFiZWwiOiJ3aGl0ZWxpc3RlZCJ9LHsiY29uZGl0aW9uVHlwZSI6IlJPTExPVVQiLCJtYXRjaGVyR3JvdXAiOnsiY29tYmluZXIiOiJBTkQiLCJtYXRjaGVycyI6W3sia2V5U2VsZWN0b3IiOnsidHJhZmZpY1R5cGUiOiJ1c2VyIn0sIm1hdGNoZXJUeXBlIjoiSU5fU0VHTUVOVCIsIm5lZ2F0ZSI6ZmFsc2UsInVzZXJEZWZpbmVkU2VnbWVudE1hdGNoZXJEYXRhIjp7InNlZ21lbnROYW1lIjoibWF1ci0yIn19XX0sInBhcnRpdGlvbnMiOlt7InRyZWF0bWVudCI6Im9uIiwic2l6ZSI6MH0seyJ0cmVhdG1lbnQiOiJvZmYiLCJzaXplIjoxMDB9LHsidHJlYXRtZW50IjoiVjQiLCJzaXplIjowfSx7InRyZWF0bWVudCI6InY1Iiwic2l6ZSI6MH1dLCJsYWJlbCI6ImluIHNlZ21lbnQgbWF1ci0yIn0seyJjb25kaXRpb25UeXBlIjoiUk9MTE9VVCIsIm1hdGNoZXJHcm91cCI6eyJjb21iaW5lciI6IkFORCIsIm1hdGNoZXJzIjpbeyJrZXlTZWxlY3RvciI6eyJ0cmFmZmljVHlwZSI6InVzZXIifSwibWF0Y2hlclR5cGUiOiJBTExfS0VZUyIsIm5lZ2F0ZSI6ZmFsc2V9XX0sInBhcnRpdGlvbnMiOlt7InRyZWF0bWVudCI6Im9uIiwic2l6ZSI6MH0seyJ0cmVhdG1lbnQiOiJvZmYiLCJzaXplIjoxMDB9LHsidHJlYXRtZW50IjoiVjQiLCJzaXplIjowfSx7InRyZWF0bWVudCI6InY1Iiwic2l6ZSI6MH1dLCJsYWJlbCI6ImRlZmF1bHQgcnVsZSJ9XX0=\\\"}\"}";
NotificationParserImp notificationParserImp = new NotificationParserImp();

FeatureFlagChangeNotification incomingNotification = (FeatureFlagChangeNotification) notificationParserImp.parseMessage(payload);
Assert.assertEquals(incomingNotification.getFeatureFlagDefinition().name, "mauro_java");
Assert.assertEquals(incomingNotification.getFeatureFlagDefinition().changeNumber, 1684329854385L);
Assert.assertEquals(CompressType.NOT_COMPRESSED, incomingNotification.getCompressType());
Assert.assertEquals(0, incomingNotification.getPreviousChangeNumber());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package io.split.engine.sse;

import io.split.engine.sse.dtos.*;
import io.split.engine.sse.dtos.ControlNotification;
import io.split.engine.sse.dtos.ControlType;
import io.split.engine.sse.dtos.ErrorNotification;
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
import io.split.engine.sse.dtos.IncomingNotification;
import io.split.engine.sse.dtos.OccupancyNotification;
import io.split.engine.sse.dtos.SegmentChangeNotification;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.engine.sse.exceptions.EventParsingException;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;

public class NotificationParserTest {
private NotificationParser notificationParser;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
package io.split.engine.sse;

import io.split.engine.sse.dtos.*;
import io.split.engine.sse.dtos.ControlNotification;
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
import io.split.engine.sse.dtos.GenericNotificationData;
import io.split.engine.sse.dtos.OccupancyNotification;
import io.split.engine.sse.dtos.SegmentChangeNotification;
import io.split.engine.sse.dtos.SegmentQueueDto;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.engine.sse.workers.SegmentsWorkerImp;
import io.split.engine.sse.workers.SplitsWorker;
import io.split.engine.sse.workers.Worker;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.UnsupportedEncodingException;

public class NotificationProcessorTest {
private SplitsWorker _splitsWorker;
private Worker<SegmentQueueDto> _segmentWorker;
Expand All @@ -24,7 +32,7 @@ public void setUp() {
}

@Test
public void processSplitUpdateAddToQueueInWorker() {
public void processSplitUpdateAddToQueueInWorker() throws UnsupportedEncodingException {
long changeNumber = 1585867723838L;
String channel = "splits";
GenericNotificationData genericNotificationData = new GenericNotificationData(changeNumber, null, null, null, null, null, null, channel, null, null, null);
Expand Down
Loading