Skip to content

Commit

Permalink
Merge branch 'master' into issue-5737-SLTS-improve-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
andreipaduroiu committed Feb 16, 2021
2 parents 6d71d0d + cf1c2a9 commit 820db4c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public void setPosition(String process, String readerGroup, String readerId, Pos
public Map<String, Position> getPositions(String process, String readerGroup) throws CheckpointStoreException {
Map<String, Position> map = new HashMap<>();
String path = getReaderGroupPath(process, readerGroup);
ReaderGroupData rgData = groupDataSerializer.deserialize(ByteBuffer.wrap(getData(path)));
rgData.getReaderIds().forEach(x -> map.put(x, null));
for (String child : getChildren(path)) {
Position position = null;
byte[] data = getData(path + "/" + child);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;

Expand Down Expand Up @@ -85,5 +89,54 @@ public void failingTests() {
AssertExtensions.assertThrows("failed removeReaderGroup",
() -> checkpointStore.removeReaderGroup(process1, readerGroup1), predicate);
}

@Test
public void readerWithoutCheckpointTest() throws Exception {
final String process1 = "process1";
final String readerGroup1 = "rg1";
final String reader1 = "reader1";
final String reader2 = "reader2";

Set<String> processes = checkpointStore.getProcesses();
Assert.assertEquals(0, processes.size());

checkpointStore.addReaderGroup(process1, readerGroup1);
List<String> result = checkpointStore.getReaderGroups(process1);
Assert.assertNotNull(result);
Assert.assertEquals(1, result.size());
Assert.assertEquals(readerGroup1, result.get(0));

processes = checkpointStore.getProcesses();
Assert.assertEquals(1, processes.size());

checkpointStore.addReader(process1, readerGroup1, reader1);
Map<String, Position> resultMap = checkpointStore.getPositions(process1, readerGroup1);
Assert.assertNotNull(resultMap);
Assert.assertEquals(1, resultMap.size());
Assert.assertNull(resultMap.get(reader1));

checkpointStore.addReader(process1, readerGroup1, reader2);
resultMap = checkpointStore.getPositions(process1, readerGroup1);
Assert.assertNotNull(resultMap);
Assert.assertEquals(2, resultMap.size());

// delete the node for the reader.
cli.delete().forPath(String.format("/%s/%s/%s/%s", "eventProcessors", process1, readerGroup1, reader1));

Map<String, Position> map = checkpointStore.sealReaderGroup(process1, readerGroup1);
Assert.assertEquals(map.size(), 2);
Assert.assertTrue(map.containsKey(reader1));
Assert.assertNull(map.get(reader1));
Assert.assertTrue(map.containsKey(reader2));
Assert.assertNull(map.get(reader2));
map.keySet().forEach(x -> {
try {
checkpointStore.removeReader(process1, readerGroup1, x);
} catch (CheckpointStoreException e) {
throw new RuntimeException(e);
}
});
checkpointStore.removeReaderGroup(process1, readerGroup1);
}
}

0 comments on commit 820db4c

Please sign in to comment.