Skip to content

Commit

Permalink
Issue 201: Fix the flaky test TestBKDistributedLogManager.deleteDurin…
Browse files Browse the repository at this point in the history
…gRead

This closes apache#204, apache#201
  • Loading branch information
sijie committed Oct 18, 2017
2 parents ecca567 + 2de1f6d commit b737e9c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
Expand Up @@ -707,6 +707,9 @@ private void unsafeProcessLogSegments(List<LogSegmentMetadata> segments) {
private boolean updateLogSegmentMetadata(SegmentReader reader,
LogSegmentMetadata newMetadata) {
if (reader.getSegment().getLogSegmentSequenceNumber() != newMetadata.getLogSegmentSequenceNumber()) {
logger.error("Inconsistent state found in entry reader for {} : "
+ "current segment = {}, new segment = {}",
new Object[] { streamName, reader.getSegment(), newMetadata });
setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for "
+ streamName + " : current segment = " + reader.getSegment() + ", new segment = " + newMetadata));
return false;
Expand Down Expand Up @@ -746,6 +749,9 @@ private void unsafeReinitializeLogSegments(List<LogSegmentMetadata> segments) {
}
} else {
if (currentSegmentSequenceNumber != segment.getLogSegmentSequenceNumber()) {
logger.error("Inconsistent state found in entry reader for {} : "
+ "current segment sn = {}, new segment sn = {}",
new Object[] { streamName, currentSegmentSequenceNumber, segment.getLogSegmentSequenceNumber() });
setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for "
+ streamName + " : current segment sn = " + currentSegmentSequenceNumber
+ ", new segment sn = " + segment.getLogSegmentSequenceNumber()));
Expand Down
Expand Up @@ -17,10 +17,13 @@
*/
package org.apache.distributedlog;


import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.net.URI;
import java.util.Collection;
Expand All @@ -47,6 +50,7 @@
import org.apache.distributedlog.callback.LogSegmentListener;
import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
import org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.exceptions.EndOfStreamException;
import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.exceptions.LogEmptyException;
Expand All @@ -63,15 +67,12 @@
import org.apache.distributedlog.metadata.MetadataUpdater;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Utils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



/**
* Test Cases for {@link DistributedLogManager}.
*/
Expand Down Expand Up @@ -765,31 +766,27 @@ public void deleteDuringRead() throws Exception {
}

LogReader reader = dlm.getInputStream(1);
long numTrans = 1;
LogRecord record = reader.readNext(false);
assert (null != record);
DLMTestUtil.verifyLogRecord(record);
long lastTxId = record.getTransactionId();

dlm.delete();

boolean exceptionEncountered = false;
boolean exceptionEncountered;
try {
record = reader.readNext(false);
while (null != record) {
DLMTestUtil.verifyLogRecord(record);
assert (lastTxId < record.getTransactionId());
lastTxId = record.getTransactionId();
numTrans++;
record = reader.readNext(false);
}
// make sure the exception is thrown from readahead
while (true) {
reader.readNext(false);
}
} catch (LogReadException readexc) {
exceptionEncountered = true;
} catch (LogNotFoundException exc) {
} catch (LogReadException | LogNotFoundException | DLIllegalStateException e) {
exceptionEncountered = true;
}
assertTrue(exceptionEncountered);
Expand Down Expand Up @@ -1136,26 +1133,26 @@ public void testTruncationValidation() throws Exception {
Utils.ioResult(updater.setLogSegmentActive(segmentList.get(2L)));

BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
Assert.assertTrue(Utils.ioResult(writer.truncate(truncDLSN)));
assertTrue(Utils.ioResult(writer.truncate(truncDLSN)));
BKLogWriteHandler handler = writer.getCachedWriteHandler();
List<LogSegmentMetadata> cachedSegments = handler.getCachedLogSegments(LogSegmentMetadata.COMPARATOR);
for (LogSegmentMetadata segment: cachedSegments) {
if (segment.getLastDLSN().compareTo(truncDLSN) < 0) {
Assert.assertTrue(segment.isTruncated());
Assert.assertTrue(!segment.isPartiallyTruncated());
assertTrue(segment.isTruncated());
assertTrue(!segment.isPartiallyTruncated());
} else if (segment.getFirstDLSN().compareTo(truncDLSN) < 0) {
Assert.assertTrue(!segment.isTruncated());
Assert.assertTrue(segment.isPartiallyTruncated());
assertTrue(!segment.isTruncated());
assertTrue(segment.isPartiallyTruncated());
} else {
Assert.assertTrue(!segment.isTruncated());
Assert.assertTrue(!segment.isPartiallyTruncated());
assertTrue(!segment.isTruncated());
assertTrue(!segment.isPartiallyTruncated());
}
}

segmentList = DLMTestUtil.readLogSegments(zookeeperClient,
LogMetadata.getLogSegmentsPath(uri, name, conf.getUnpartitionedStreamName()));

Assert.assertTrue(segmentList.get(truncDLSN.getLogSegmentSequenceNo())
assertTrue(segmentList.get(truncDLSN.getLogSegmentSequenceNo())
.getMinActiveDLSN().compareTo(truncDLSN) == 0);

{
Expand Down

0 comments on commit b737e9c

Please sign in to comment.