Skip to content

Commit

Permalink
finish off implementing iLink3 retransmit queuing
Browse files Browse the repository at this point in the history
  • Loading branch information
RichardWarburton committed Sep 21, 2020
1 parent ced2824 commit c530c17
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 33 deletions.
Expand Up @@ -17,7 +17,10 @@

import iLinkBinary.*;
import io.aeron.exceptions.TimeoutException;
import org.agrona.*;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.EpochNanoClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.sbe.MessageEncoderFlyweight;
Expand All @@ -35,9 +38,7 @@
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayDeque;
import java.util.Base64;
import java.util.Deque;
import java.util.*;
import java.util.function.Consumer;

import static iLinkBinary.KeepAliveLapsed.Lapsed;
Expand Down Expand Up @@ -186,6 +187,8 @@ public final class InternalILink3Connection extends ILink3Connection

private long retransmitFillTimeoutInMs = NOT_AWAITING_RETRANSMIT;
private long retransmitFillSeqNo = NOT_AWAITING_RETRANSMIT;
private long retransmitContiguousSeqNo = NOT_AWAITING_RETRANSMIT;
private long retransmitMaxSeqNo = NOT_AWAITING_RETRANSMIT;
private long nextRetransmitSeqNo = NOT_AWAITING_RETRANSMIT;

private long resendTime;
Expand Down Expand Up @@ -1132,7 +1135,7 @@ public long onMessage(
final int possRetrans = offsets.possRetrans(templateId, buffer, offset);
if (possRetrans == BOOLEAN_FLAG_TRUE)
{
return onPossRetransMessage(buffer, offset, templateId, blockLength, version, seqNum);
return onPossRetransMessage(buffer, offset, templateId, blockLength, version, totalLength, seqNum);
}

final long nextRecvSeqNo = this.nextRecvSeqNo;
Expand Down Expand Up @@ -1161,18 +1164,14 @@ public long onMessage(
// We could queue this instead of just passing it on to the customer's application but this
// hasn't been requested as of yet
checkBusinessRejectSequenceNumber(buffer, offset, templateId, blockLength, version);
// old callback for immediate processing:
// onBusinessMessage(buffer, offset, templateId, blockLength, version, false);

if (retransmitFillSeqNo == NOT_AWAITING_RETRANSMIT)
{
// Don't check the enqueue - buffer should always be big enough for any iLink3 message.
// TODO: add validation for the minimum buffer size
retransmitEnqueueMessage(buffer, offset, totalLength, seqNum);
}
else
retransmitEnqueueMessage(buffer, offset, totalLength, seqNum);
if (retransmitFillSeqNo != NOT_AWAITING_RETRANSMIT)
{
// TODO: gap in sequence numbers.
// Detected a gap within the normal sequence of messages,
// we need to keep track of the sequence number position of this message to ensure that we
// don't accidentally hand it off during the process of the current retransmit enqueue buffer.
retransmitMaxSeqNo = nextRecvSeqNo - 1;
}

return onInvalidSequenceNumber(seqNum);
Expand Down Expand Up @@ -1215,19 +1214,32 @@ private long onPossRetransMessage(
final int templateId,
final int blockLength,
final int version,
final int totalLength,
final long seqNum)
{
if (seqNum > nextRetransmitSeqNo)
final long expectedSeqNo = this.nextRetransmitSeqNo;
if (seqNum > expectedSeqNo)
{
retransmitContiguousSeqNo = expectedSeqNo - 1;

final long position = onInvalidSequenceNumber(
lastUUIDNullValue(), seqNum, nextRetransmitSeqNo, nextRecvSeqNo);
lastUUIDNullValue(), seqNum, expectedSeqNo, nextRecvSeqNo);
if (Pressure.isBackPressured(position))
{
return position;
}

retransmitEnqueueMessage(buffer, offset, totalLength, seqNum);
}
else
{
if (seqNum == retransmitContiguousSeqNo + 1)
{
retransmitContiguousSeqNo = seqNum;
}

onBusinessMessage(buffer, offset, templateId, blockLength, version, true);
onBusinessMessage(buffer, offset, templateId, blockLength, version, true);
}

if (seqNum == retransmitFillSeqNo)
{
Expand Down Expand Up @@ -1255,7 +1267,7 @@ private void retransmitEnqueueMessage(
final RetransmitRequest retransmitRequest = retransmitRequests.peekFirst();
if (retransmitRequest == null)
{
addRetransmitRequest(lastUuid, seqNum, 1);
addRetransmitRequest(lastUUIDNullValue(), seqNum, 1);
}
else
{
Expand All @@ -1268,7 +1280,7 @@ private void retransmitEnqueueMessage(
}
else
{
addRetransmitRequest(lastUuid, seqNum, 1);
addRetransmitRequest(retransmitRequest.lastUuid, seqNum, 1);
}
}

Expand Down Expand Up @@ -1418,6 +1430,24 @@ private void retransmitFillTimeoutInNs(final long requestTimestampInNs)

private void processRetransmitQueue()
{
if (retransmitContiguousSeqNo == NOT_AWAITING_RETRANSMIT)
{
processInOrderRetransmitQueue();
}
else if (retransmitRequests.isEmpty())
{
processOutOfOrderRetransmitQueue();
}
}

private void processOutOfOrderRetransmitQueue()
{
// A retransmit within a retransmit happened - messages might be out of order and need sorting.
final ExpandableArrayBuffer retransmitQueue = this.retransmitQueue;
final MessageHeaderDecoder headerDecoder = this.headerDecoder;
final SortedSet<RetransmitQueueEntry> entries = new TreeSet<>();
long retransmitContiguousSeqNo = this.retransmitContiguousSeqNo;

int offset = 0;
while (offset < retransmitQueueOffset)
{
Expand All @@ -1430,13 +1460,80 @@ private void processRetransmitQueue()
final int version = headerDecoder.version();

final int messageOffset = headerOffset + MessageHeaderDecoder.ENCODED_LENGTH;
onBusinessMessage(retransmitQueue, messageOffset, templateId, blockLength, version, false);
final int seqNum = offsets.seqNum(templateId, retransmitQueue, messageOffset);
if (seqNum == retransmitContiguousSeqNo + 1)
{
onBusinessMessage(retransmitQueue, messageOffset, templateId, blockLength, version, false);
retransmitContiguousSeqNo++;
}
else
{
entries.add(new RetransmitQueueEntry(seqNum, offset));
}

offset += length;
}

for (final RetransmitQueueEntry entry : entries)
{
final int headerOffset = entry.offset + SOFH_LENGTH;
headerDecoder.wrap(retransmitQueue, headerOffset);
final int blockLength = headerDecoder.blockLength();
final int templateId = headerDecoder.templateId();
final int version = headerDecoder.version();

final int messageOffset = headerOffset + MessageHeaderDecoder.ENCODED_LENGTH;
onBusinessMessage(retransmitQueue, messageOffset, templateId, blockLength, version, false);
}

this.retransmitContiguousSeqNo = NOT_AWAITING_RETRANSMIT;
retransmitQueueOffset = 0;
}

private void processInOrderRetransmitQueue()
{
// Simple retransmit queue case - messages are all in order and can all be sent.
final ExpandableArrayBuffer retransmitQueue = this.retransmitQueue;
final MessageHeaderDecoder headerDecoder = this.headerDecoder;
int offset = 0;
while (offset < retransmitQueueOffset)
{
final int length = readSofhMessageSize(retransmitQueue, offset);

final int headerOffset = offset + SOFH_LENGTH;
headerDecoder.wrap(retransmitQueue, headerOffset);
final int blockLength = headerDecoder.blockLength();
final int templateId = headerDecoder.templateId();
final int version = headerDecoder.version();

final int messageOffset = headerOffset + MessageHeaderDecoder.ENCODED_LENGTH;
final int seqNum = offsets.seqNum(templateId, retransmitQueue, messageOffset);
if (retransmitMaxSeqNo == NOT_AWAITING_RETRANSMIT || seqNum <= retransmitMaxSeqNo)
{
onBusinessMessage(retransmitQueue, messageOffset, templateId, blockLength, version, false);

offset += length;
}
else
{
break;
}
}

// shuffle up remaining bytes
final int remainder = retransmitQueueOffset - offset;
if (remainder > 0)
{
retransmitQueue.putBytes(0, retransmitQueue, offset, remainder);
retransmitQueueOffset = remainder;
}
else
{
retransmitQueueOffset = 0;
}
retransmitMaxSeqNo = NOT_AWAITING_RETRANSMIT;
}

private void addRemainingRetransmitRequests(
final long lastUuid,
final long initialFromSeqNo,
Expand Down Expand Up @@ -1491,6 +1588,23 @@ static final class RetransmitRequest
}
}

static final class RetransmitQueueEntry implements Comparable<RetransmitQueueEntry>
{
final long seqNum;
final int offset;

RetransmitQueueEntry(final long seqNum, final int offset)
{
this.seqNum = seqNum;
this.offset = offset;
}

public int compareTo(final RetransmitQueueEntry o)
{
return Long.compare(seqNum, o.seqNum);
}
}

public long onRetransmitReject(
final String reason, final long uuid, final long lastUuid, final long requestTimestamp, final int errorCodes)
{
Expand Down
Expand Up @@ -26,6 +26,7 @@
import uk.co.real_logic.artio.ilink.SimpleOpenFramingHeader;
import uk.co.real_logic.artio.protocol.GatewayPublication;

import static iLinkBinary.RetransmitRequest508Decoder.lastUUIDNullValue;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -103,17 +104,14 @@ public void setUp()
// We're ready for a retransmit in all cases
assertSeqNos(6, 4);

verifyRetransmitRequest(2L, 3, -1);
}
verifyRetransmitRequest(2L, 3);

private void verifyRetransmitRequest(final long fromSeqNo, final int msgCount)
{
verifyRetransmitRequest(fromSeqNo, msgCount, UUID);
reset(proxy);
}

private void verifyRetransmitRequest(final long fromSeqNo, final int msgCount, final long lastUuid)
private void verifyRetransmitRequest(final long fromSeqNo, final int msgCount)
{
verify(proxy).sendRetransmitRequest(eq(UUID), eq(lastUuid), anyLong(), eq(fromSeqNo), eq(msgCount));
verify(proxy).sendRetransmitRequest(eq(UUID), eq(lastUUIDNullValue()), anyLong(), eq(fromSeqNo), eq(msgCount));
}

private void assertSeqNos(final long nextRecvSeqNo, final long retransmitFillSeqNo)
Expand Down Expand Up @@ -176,7 +174,7 @@ public void done()
}

@Test
public void shouldBufferWhenReceivingOutOfOrder()
public void shouldQueueWhenReceivingOutOfOrder()
{
// @5,2R,3R,4R,done.
onExecutionReport(2, true);
Expand Down Expand Up @@ -266,7 +264,7 @@ public void shouldNotifyWhenTimeoutBreached()
}

@Test
public void shouldReplayBufferWhenReceivingSequenceMessage()
public void shouldReplayQueueWhenReceivingSequenceMessage()
{
// @5,6,Seq8,done.
onExecutionReport(6, false);
Expand All @@ -276,7 +274,40 @@ public void shouldReplayBufferWhenReceivingSequenceMessage()
assertThat(handler.sequenceNumbers(), contains(5L, 6L));
}

// TODO: gaps within the retransmit
// TODO: gaps within the normal message sequence
// TODO: shouldNotifyAndQueueReRequestWhenMaxSizeBreachedMultipleMessges with a gap in the retransmit if possible
@Test
public void shouldQueueRetransmitForRetransmitGapWithinRetransmit()
{
// @5,2R,3R,4R,done.
onExecutionReport(2, true);
onExecutionReport(4, true);
onExecutionReport(6, false);

assertSeqNos(7, 3);
assertThat(handler.sequenceNumbers(), contains(2L));
handler.sequenceNumbers().clear();

verifyRetransmitRequest(3, 1);
onExecutionReport(3, true);
assertThat(handler.sequenceNumbers(), contains(3L, 4L, 5L, 6L));
assertSeqNos(7, NOT_AWAITING_RETRANSMIT);
}

@Test
public void shouldQueueRetransmitForNormalGapWithinRetransmit()
{
// @5,7,2R,3R,4R,6,done.
onExecutionReport(7, false);
onExecutionReport(2, true);
onExecutionReport(3, true);
onExecutionReport(4, true);

assertSeqNos(8, 6);
assertThat(handler.sequenceNumbers(), contains(2L, 3L, 4L, 5L));
handler.sequenceNumbers().clear();

verifyRetransmitRequest(6, 1);
onExecutionReport(6, true);
assertThat(handler.sequenceNumbers(), contains(6L, 7L));
assertSeqNos(8, NOT_AWAITING_RETRANSMIT);
}
}

0 comments on commit c530c17

Please sign in to comment.