Skip to content

Commit

Permalink
Merge pull request #502 from ZachBray/bugfix/catalog-with-deleted-rec…
Browse files Browse the repository at this point in the history
…ordings

Allows Catalog to be constructed over a directory where old recordings have been deleted
  • Loading branch information
mjpt777 committed Apr 25, 2018
2 parents e68da3f + fb4709b commit 48c7c2f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 16 deletions.
4 changes: 2 additions & 2 deletions aeron-archive/src/main/java/io/aeron/archive/Catalog.java
Expand Up @@ -638,8 +638,8 @@ private void refreshAndFixDescriptor(
if (headerDecoder.valid() == VALID && decoder.stopTimestamp() == NULL_TIMESTAMP)
{
final String prefix = recordingId + "-";
String[] segmentFiles =
archiveDir.list((dir, name) -> name.endsWith(RECORDING_SEGMENT_POSTFIX));
String[] segmentFiles = // Only the segments for recordingId
archiveDir.list((dir, name) -> name.startsWith(prefix) && name.endsWith(RECORDING_SEGMENT_POSTFIX));
int maxSegmentIndex = -1;

if (null == segmentFiles)
Expand Down
56 changes: 42 additions & 14 deletions aeron-archive/src/test/java/io/aeron/archive/CatalogTest.java
Expand Up @@ -25,8 +25,12 @@
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import static io.aeron.archive.Archive.segmentFileName;
import static io.aeron.archive.Catalog.PAGE_SIZE;
Expand All @@ -37,6 +41,7 @@
import static java.nio.file.StandardOpenOption.*;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.*;
import static org.junit.Assume.assumeThat;

public class CatalogTest
{
Expand Down Expand Up @@ -205,20 +210,7 @@ public void shouldFixTimestampAndPositionAfterFailurePageStraddle() throws Excep
{
final long newRecordingId = newRecording();

final File segmentFile = new File(archiveDir, segmentFileName(newRecordingId, 0));
try (FileChannel log = FileChannel.open(segmentFile.toPath(), READ, WRITE, CREATE))
{
final ByteBuffer bb = ByteBuffer.allocateDirect(HEADER_LENGTH);
final DataHeaderFlyweight flyweight = new DataHeaderFlyweight(bb);
flyweight.frameLength(PAGE_SIZE - 32);
log.write(bb);
bb.clear();
flyweight.frameLength(128);
log.write(bb, PAGE_SIZE - 32);
bb.clear();
flyweight.frameLength(0);
log.write(bb, PAGE_SIZE - 32 + 128);
}
createSegmentFile(newRecordingId);

try (Catalog catalog = new Catalog(archiveDir, clock))
{
Expand Down Expand Up @@ -313,4 +305,40 @@ public void shouldFixTimestampAndPositionAfterFailureFullSegment() throws Except
newRecordingId);
}
}

@Test
public void shouldNotThrowWhenOldRecordingLogsAreDeleted() throws IOException
{
// Simulate the scenario where old recordings have been deleted. Here those are the
// recordings 1 and 2.
createSegmentFile(recordingThreeId);

// Check the new recording is in place
final Path segmentFilePath = Paths.get(segmentFileName(recordingThreeId, 0));
final boolean segmentFileExists = Files.exists(archiveDir.toPath().resolve(segmentFilePath));
assumeThat(segmentFileExists, is(true));

// Check creating a Catalog does not throw
try (Catalog ignored = new Catalog(archiveDir, null, 0, MAX_ENTRIES, clock))
{
}
}

private void createSegmentFile(final long newRecordingId) throws IOException
{
final File segmentFile = new File(archiveDir, segmentFileName(newRecordingId, 0));
try (FileChannel log = FileChannel.open(segmentFile.toPath(), READ, WRITE, CREATE))
{
final ByteBuffer bb = ByteBuffer.allocateDirect(HEADER_LENGTH);
final DataHeaderFlyweight flyweight = new DataHeaderFlyweight(bb);
flyweight.frameLength(PAGE_SIZE - 32);
log.write(bb);
bb.clear();
flyweight.frameLength(128);
log.write(bb, PAGE_SIZE - 32);
bb.clear();
flyweight.frameLength(0);
log.write(bb, PAGE_SIZE - 32 + 128);
}
}
}

0 comments on commit 48c7c2f

Please sign in to comment.