Skip to content

Commit

Permalink
fix #438
Browse files Browse the repository at this point in the history
  • Loading branch information
RichardWarburton committed Jan 7, 2022
1 parent 48f949a commit e0e4000
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

public class BinaryEntryPointArchiveScannerIntegrationTest extends AbstractBinaryEntryPointSystemTest
{
private boolean follow = false;

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void canScanArchiveWhilstGatewayRunningOneStream() throws IOException
{
Expand All @@ -46,6 +48,16 @@ public void canScanArchiveWhilstGatewayRunningBothStreams() throws IOException
assertArchiveContainsBothMessages();
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void canScanArchiveWhilstGatewayRunningBothStreamsFollowMode() throws IOException
{
follow = true;

setupAndExchangeMessages();

assertArchiveContainsBothMessages();
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void canScanArchiveWhenGatewayStoppedOneStream() throws IOException
{
Expand Down Expand Up @@ -89,11 +101,24 @@ private void assertArchiveContains(final EngineConfiguration configuration, fina
{
final IntArrayList templateIds = new IntArrayList();
final MessageHeaderDecoder header = new MessageHeaderDecoder();
getMessagesFromArchive(configuration, queryStreamIds, null, (fixPMessage, buffer, offset, ignore) ->
try
{
getMessagesFromArchive(configuration, queryStreamIds, null,
(fixPMessage, buffer, offset, ignore) ->
{
header.wrap(buffer, offset + SOFH_LENGTH);
templateIds.add(header.templateId());

if (follow && templateIds.size() >= 7)
{
throw new TestTerminationException();
}
}, follow);
}
catch (final TestTerminationException e)
{
header.wrap(buffer, offset + SOFH_LENGTH);
templateIds.add(header.templateId());
});
// Deliberately blank. This is just used to force termination in follow mode.
}

assertThat(templateIds, Matchers.hasItems(
NegotiateResponseDecoder.TEMPLATE_ID,
Expand All @@ -108,4 +133,8 @@ private void setupAndExchangeMessages() throws IOException

connectAndExchangeBusinessMessage();
}

static class TestTerminationException extends RuntimeException
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ private static void printHelp()
printOption(
"fixp",
"Suppresses the need to provide a fix dictionary on the classpath - used for situations where" +
" only FIXP messages will be printed out",
" only FIXP messages will be printed out",
false);
printOption(
"fixp-protocol",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ public void scan(
scan(aeronChannel, queryStreamIds, fixHandler, fixPHandler, follow, archiveScannerStreamId);
}

@SuppressWarnings("WhileLoopReplaceableByForEach")
public void scan(
final String aeronChannel,
final IntHashSet queryStreamIds,
Expand All @@ -210,16 +209,8 @@ public void scan(

try (Subscription replaySubscription = aeron.addSubscription(IPC_CHANNEL, archiveScannerStreamId))
{
final RecordingPoller[] pollers = new RecordingPoller[queryStreamIds.size()];
int i = 0;

final IntHashSet.IntIterator iterator = queryStreamIds.iterator();
while (iterator.hasNext())
{
final int id = iterator.next();
pollers[i] = makePoller(id, replaySubscription, follow, aeronChannel, recordingIdToPositionRange);
i++;
}
final RecordingPoller[] pollers = makeRecordingPollers(
aeronChannel, queryStreamIds, follow, recordingIdToPositionRange, replaySubscription);

if (DEBUG_LOG_ARCHIVE_SCAN)
{
Expand Down Expand Up @@ -249,6 +240,27 @@ public void scan(
}
}

private RecordingPoller[] makeRecordingPollers(
final String aeronChannel,
final IntHashSet queryStreamIds,
final boolean follow,
final Long2ObjectHashMap<PositionRange> recordingIdToPositionRange,
final Subscription replaySubscription)
{
return queryStreamIds
.stream()
.map(id ->
{
final List<ArchiveLocation> archiveLocations = lookupArchiveLocations(
id, follow, aeronChannel, recordingIdToPositionRange);

archiveLocations.sort(BY_REVERSE_POSITION);

return new RecordingPoller(replaySubscription, id, archiveLocations);
})
.toArray(RecordingPoller[]::new);
}

private Long2ObjectHashMap<PositionRange> scanIndexIfPossible(
final FixMessageConsumer fixHandler, final boolean follow, final IntHashSet queryStreamIds)
{
Expand Down Expand Up @@ -325,21 +337,6 @@ private boolean checkCompletion(final RecordingPoller[] pollers)
return true;
}

private RecordingPoller makePoller(
final int queryStreamId,
final Subscription replaySubscription,
final boolean follow,
final String aeronChannel,
final Long2ObjectHashMap<PositionRange> recordingIdToPositionRange)
{
final List<ArchiveLocation> archiveLocations = lookupArchiveLocations(
queryStreamId, follow, aeronChannel, recordingIdToPositionRange);

archiveLocations.sort(BY_REVERSE_POSITION);

return new RecordingPoller(replaySubscription, queryStreamId, archiveLocations);
}

private List<ArchiveLocation> lookupArchiveLocations(
final int queryStreamId,
final boolean follow,
Expand Down Expand Up @@ -523,7 +520,7 @@ public int poll(final FragmentAssembler fragmentAssembler)
}
else
{
if (image.position() >= stopPosition)
if (stopPosition != NULL_POSITION && image.position() >= stopPosition)
{
image = null;
return 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ class LogEntryHandler implements FragmentHandler
{
private final MessageHeaderDecoder messageHeader = new MessageHeaderDecoder();
private final FixMessageDecoder fixMessage = new FixMessageDecoder();
private final FixPMessageDecoder iLinkMessage = new FixPMessageDecoder();
private final FixPMessageDecoder fixpMessage = new FixPMessageDecoder();
private final ReplayerTimestampDecoder replayerTimestamp = new ReplayerTimestampDecoder();

private final FixMessageConsumer fixHandler;
Expand Down Expand Up @@ -371,16 +371,16 @@ else if (templateId == FixPMessageDecoder.TEMPLATE_ID)
{
offset += MessageHeaderDecoder.ENCODED_LENGTH;

iLinkMessage.wrap(buffer, offset, blockLength, version);
fixpMessage.wrap(buffer, offset, blockLength, version);

offset += FixPMessageDecoder.BLOCK_LENGTH;

final long timestamp = iLinkMessage.enqueueTime();
final long timestamp = fixpMessage.enqueueTime();

if (timestamp <= maxTimestampToHandle)
{
owner.handledTimestamp(timestamp);
fixPHandler.onMessage(iLinkMessage, buffer, offset, owner.header);
fixPHandler.onMessage(fixpMessage, buffer, offset, owner.header);
}
else
{
Expand Down Expand Up @@ -433,11 +433,11 @@ else if (templateId == FixPMessageDecoder.TEMPLATE_ID)
{
offset += MessageHeaderDecoder.ENCODED_LENGTH;

iLinkMessage.wrap(buffer, offset, blockLength, version);
fixpMessage.wrap(buffer, offset, blockLength, version);

offset += FixPMessageDecoder.BLOCK_LENGTH;

fixPHandler.onMessage(iLinkMessage, buffer, offset, owner.header);
fixPHandler.onMessage(fixpMessage, buffer, offset, owner.header);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ List<String> getMessagesFromArchive(final EngineConfiguration configuration, fin
final FixMessageConsumer fixMessageConsumer =
(message, buffer, offset, length, header) -> messages.add(message.body());

SystemTestUtil.getMessagesFromArchive(configuration, queryStreamIds, fixMessageConsumer, null);
SystemTestUtil.getMessagesFromArchive(configuration, queryStreamIds, fixMessageConsumer, null, false);

return messages;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private void canIndexScanArchive(final boolean close)

SystemTestUtil.getMessagesFromArchive(configuration, queryStreamIds,
FixMessagePredicates.filterBy(fixMessageConsumer, predicate),
null);
null, false);

assertThat(messages, hasSize(1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,8 @@ public static void getMessagesFromArchive(
final EngineConfiguration configuration,
final IntHashSet queryStreamIds,
final FixMessageConsumer fixMessageConsumer,
final FixPMessageConsumer fixPConsumer)
final FixPMessageConsumer fixPConsumer,
final boolean follow)
{
final FixArchiveScanner.Configuration context = new FixArchiveScanner.Configuration()
.aeronDirectoryName(configuration.aeronContext().aeronDirectoryName())
Expand All @@ -588,7 +589,7 @@ public static void getMessagesFromArchive(
queryStreamIds,
fixMessageConsumer,
fixPConsumer,
false,
follow,
DEFAULT_ARCHIVE_SCANNER_STREAM);
}
}
Expand Down

0 comments on commit e0e4000

Please sign in to comment.