Skip to content

Commit

Permalink
The LogTest made the implicit assumption in many tests that read mess…
Browse files Browse the repository at this point in the history
…ages preserves the writing order which is not generally true for how the log implementations work when dealing with multiple partitions. Adjusted the test case to make that assumption explicit.
  • Loading branch information
mbroecheler committed Apr 30, 2015
1 parent 8a98854 commit 4f45806
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 17 deletions.
Expand Up @@ -47,12 +47,12 @@ public class KCVSLogManager implements LogManager {
ConfigOption.Type.GLOBAL_OFFLINE, false);

public static final ConfigOption<Integer> LOG_MAX_PARTITIONS = new ConfigOption<Integer>(LOG_NS,"max-partitions",
"The maximum number of partitions to use for logging. Setting up this many actual or virtual partitions. Must be bigger than 1" +
"The maximum number of partitions to use for logging. Setting up this many actual or virtual partitions. Must be bigger than 0" +
"and a power of 2.",
ConfigOption.Type.FIXED, Integer.class, new Predicate<Integer>() {
@Override
public boolean apply(@Nullable Integer integer) {
return integer!=null && integer>1 && NumberUtil.isPowerOf2(integer);
return integer!=null && integer>0 && NumberUtil.isPowerOf2(integer);
}
});

Expand Down
Expand Up @@ -25,11 +25,14 @@ public abstract class KCVSLogTest extends LogTest {
private KeyColumnValueStoreManager storeManager;

@Override
public LogManager openLogManager(String senderId) throws BackendException {
public LogManager openLogManager(String senderId, boolean requiresOrderPreserving) throws BackendException {
storeManager = openStorageManager();
ModifiableConfiguration config = GraphDatabaseConfiguration.buildGraphConfiguration();
config.set(GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID,senderId);
config.set(GraphDatabaseConfiguration.LOG_READ_INTERVAL, new StandardDuration(500L, TimeUnit.MILLISECONDS), LOG_NAME);
//To ensure that the write order is preserved in reading, we need to ensure that all writes go to the same partition
//otherwise readers will independently read from the partitions out-of-order by design to avoid having to synchronize
config.set(KCVSLogManager.LOG_FIXED_PARTITION, requiresOrderPreserving, LOG_NAME);
return new KCVSLogManager(storeManager,config.restrictTo(LOG_NAME));
}

Expand Down
Expand Up @@ -34,7 +34,14 @@ public abstract class LogTest {

private static final long TIMEOUT_MS = 30000;

public abstract LogManager openLogManager(String senderId) throws BackendException;
/**
*
* @param senderId The unique id identifying the sending instance
* @param requiresOrderPreserving whether it is required by the test case that write order is preserved when reading.
* @return
* @throws BackendException
*/
public abstract LogManager openLogManager(String senderId, boolean requiresOrderPreserving) throws BackendException;

private LogManager manager;

Expand All @@ -45,8 +52,10 @@ public abstract class LogTest {

@Before
public void setup() throws Exception {
log.debug("Starting {}.{}", getClass().getSimpleName(), testName.getMethodName());
manager = openLogManager(DEFAULT_SENDER_ID);
//Tests that assume that write order is preserved when reading from the log must suffix their test names with "serial"
boolean requiresOrderPreserving = testName.getMethodName().toLowerCase().endsWith("serial");
log.debug("Starting {}.{} - Order preserving {}", getClass().getSimpleName(), testName.getMethodName(), requiresOrderPreserving);
manager = openLogManager(DEFAULT_SENDER_ID,requiresOrderPreserving);
}

@After
Expand All @@ -60,18 +69,23 @@ public void close() throws Exception {
}

@Test
public void smallSendReceive() throws Exception {
simpleSendReceive(100,50);
public void smallSendReceiveSerial() throws Exception {
simpleSendReceive(100, 50);
}

@Test
public void mediumSendReceive() throws Exception {
public void mediumSendReceiveSerial() throws Exception {
simpleSendReceive(2000,1);
}

@Test
public void testMultipleReadersOnSingleLogSerial() throws Exception {
sendReceive(4, 2000, 5, true);
}

@Test
public void testMultipleReadersOnSingleLog() throws Exception {
sendReceive(4, 2000, 5);
sendReceive(4, 2000, 5, false);
}

@Test
Expand All @@ -89,7 +103,7 @@ public void testReadMarkerResumesInMiddleOfLog() throws Exception {
}

@Test
public void testLogIsDurableAcrossReopen() throws Exception {
public void testLogIsDurableAcrossReopenSerial() throws Exception {
final long past = System.currentTimeMillis() - 10L;
Log l;
l = manager.openLog("durable");
Expand All @@ -109,7 +123,7 @@ public void testLogIsDurableAcrossReopen() throws Exception {
}

@Test
public void testMultipleLogsWithSingleReader() throws Exception {
public void testMultipleLogsWithSingleReaderSerial() throws Exception {
final int nl = 3;
Log logs[] = new Log[nl];
CountingReader count = new CountingReader(3, false);
Expand Down Expand Up @@ -158,7 +172,7 @@ public void testSeparateReadersAndLogsInSharedManager() throws Exception {
}

@Test
public void testFuzzMessages() throws Exception {
public void testFuzzMessagesSerial() throws Exception {
final int maxLen = 1024 * 4;
final int rounds = 32;

Expand Down Expand Up @@ -202,7 +216,7 @@ public void testReadMarkerCompatibility() throws Exception {
}

@Test
public void testUnregisterReader() throws Exception {
public void testUnregisterReaderSerial() throws Exception {
Log log = manager.openLog("test1");

// Register two readers and verify they receive messages.
Expand All @@ -224,16 +238,16 @@ public void testUnregisterReader() throws Exception {
}

private void simpleSendReceive(int numMessages, int delayMS) throws Exception {
sendReceive(1, numMessages, delayMS);
sendReceive(1, numMessages, delayMS, true);
}

public void sendReceive(int readers, int numMessages, int delayMS) throws Exception {
public void sendReceive(int readers, int numMessages, int delayMS, boolean expectMessageOrder) throws Exception {
Preconditions.checkState(0 < readers);
Log log1 = manager.openLog("test1");
assertEquals("test1",log1.getName());
CountingReader counts[] = new CountingReader[readers];
for (int i = 0; i < counts.length; i++) {
counts[i] = new CountingReader(numMessages, true);
counts[i] = new CountingReader(numMessages, expectMessageOrder);
log1.registerReader(ReadMarker.fromNow(),counts[i]);
}
for (long i=1;i<=numMessages;i++) {
Expand Down

0 comments on commit 4f45806

Please sign in to comment.