From c530c176b0fe242974bb1c2071e7de8f3ce7732c Mon Sep 17 00:00:00 2001 From: Richard Warburton Date: Mon, 21 Sep 2020 20:39:44 +0100 Subject: [PATCH] finish off implementing iLink3 retransmit queuing --- .../library/InternalILink3Connection.java | 156 +++++++++++++++--- .../artio/library/RetransmitQueueTest.java | 55 ++++-- 2 files changed, 178 insertions(+), 33 deletions(-) diff --git a/artio-ilink3-impl/src/main/java/uk/co/real_logic/artio/library/InternalILink3Connection.java b/artio-ilink3-impl/src/main/java/uk/co/real_logic/artio/library/InternalILink3Connection.java index e2f5f7648a..10d6213036 100644 --- a/artio-ilink3-impl/src/main/java/uk/co/real_logic/artio/library/InternalILink3Connection.java +++ b/artio-ilink3-impl/src/main/java/uk/co/real_logic/artio/library/InternalILink3Connection.java @@ -17,7 +17,10 @@ import iLinkBinary.*; import io.aeron.exceptions.TimeoutException; -import org.agrona.*; +import org.agrona.DirectBuffer; +import org.agrona.ExpandableArrayBuffer; +import org.agrona.LangUtil; +import org.agrona.MutableDirectBuffer; import org.agrona.concurrent.EpochNanoClock; import org.agrona.concurrent.UnsafeBuffer; import org.agrona.sbe.MessageEncoderFlyweight; @@ -35,9 +38,7 @@ import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; -import java.util.ArrayDeque; -import java.util.Base64; -import java.util.Deque; +import java.util.*; import java.util.function.Consumer; import static iLinkBinary.KeepAliveLapsed.Lapsed; @@ -186,6 +187,8 @@ public final class InternalILink3Connection extends ILink3Connection private long retransmitFillTimeoutInMs = NOT_AWAITING_RETRANSMIT; private long retransmitFillSeqNo = NOT_AWAITING_RETRANSMIT; + private long retransmitContiguousSeqNo = NOT_AWAITING_RETRANSMIT; + private long retransmitMaxSeqNo = NOT_AWAITING_RETRANSMIT; private long nextRetransmitSeqNo = NOT_AWAITING_RETRANSMIT; private long resendTime; @@ -1132,7 +1135,7 @@ public long onMessage( final int possRetrans = offsets.possRetrans(templateId, buffer, offset); if (possRetrans == BOOLEAN_FLAG_TRUE) { - return onPossRetransMessage(buffer, offset, templateId, blockLength, version, seqNum); + return onPossRetransMessage(buffer, offset, templateId, blockLength, version, totalLength, seqNum); } final long nextRecvSeqNo = this.nextRecvSeqNo; @@ -1161,18 +1164,14 @@ public long onMessage( // We could queue this instead of just passing it on to the customer's application but this // hasn't been requested as of yet checkBusinessRejectSequenceNumber(buffer, offset, templateId, blockLength, version); - // old callback for immediate processing: - // onBusinessMessage(buffer, offset, templateId, blockLength, version, false); - if (retransmitFillSeqNo == NOT_AWAITING_RETRANSMIT) - { - // Don't check the enqueue - buffer should always be big enough for any iLink3 message. - // TODO: add validation for the minimum buffer size - retransmitEnqueueMessage(buffer, offset, totalLength, seqNum); - } - else + retransmitEnqueueMessage(buffer, offset, totalLength, seqNum); + if (retransmitFillSeqNo != NOT_AWAITING_RETRANSMIT) { - // TODO: gap in sequence numbers. + // Detected a gap within the normal sequence of messages, + // we need to keep track of the sequence number position of this message to ensure that we + // don't accidentally hand it off during the process of the current retransmit enqueue buffer. + retransmitMaxSeqNo = nextRecvSeqNo - 1; } return onInvalidSequenceNumber(seqNum); @@ -1215,19 +1214,32 @@ private long onPossRetransMessage( final int templateId, final int blockLength, final int version, + final int totalLength, final long seqNum) { - if (seqNum > nextRetransmitSeqNo) + final long expectedSeqNo = this.nextRetransmitSeqNo; + if (seqNum > expectedSeqNo) { + retransmitContiguousSeqNo = expectedSeqNo - 1; + final long position = onInvalidSequenceNumber( - lastUUIDNullValue(), seqNum, nextRetransmitSeqNo, nextRecvSeqNo); + lastUUIDNullValue(), seqNum, expectedSeqNo, nextRecvSeqNo); if (Pressure.isBackPressured(position)) { return position; } + + retransmitEnqueueMessage(buffer, offset, totalLength, seqNum); } + else + { + if (seqNum == retransmitContiguousSeqNo + 1) + { + retransmitContiguousSeqNo = seqNum; + } - onBusinessMessage(buffer, offset, templateId, blockLength, version, true); + onBusinessMessage(buffer, offset, templateId, blockLength, version, true); + } if (seqNum == retransmitFillSeqNo) { @@ -1255,7 +1267,7 @@ private void retransmitEnqueueMessage( final RetransmitRequest retransmitRequest = retransmitRequests.peekFirst(); if (retransmitRequest == null) { - addRetransmitRequest(lastUuid, seqNum, 1); + addRetransmitRequest(lastUUIDNullValue(), seqNum, 1); } else { @@ -1268,7 +1280,7 @@ private void retransmitEnqueueMessage( } else { - addRetransmitRequest(lastUuid, seqNum, 1); + addRetransmitRequest(retransmitRequest.lastUuid, seqNum, 1); } } @@ -1418,6 +1430,24 @@ private void retransmitFillTimeoutInNs(final long requestTimestampInNs) private void processRetransmitQueue() { + if (retransmitContiguousSeqNo == NOT_AWAITING_RETRANSMIT) + { + processInOrderRetransmitQueue(); + } + else if (retransmitRequests.isEmpty()) + { + processOutOfOrderRetransmitQueue(); + } + } + + private void processOutOfOrderRetransmitQueue() + { + // A retransmit within a retransmit happened - messages might be out of order and need sorting. + final ExpandableArrayBuffer retransmitQueue = this.retransmitQueue; + final MessageHeaderDecoder headerDecoder = this.headerDecoder; + final SortedSet entries = new TreeSet<>(); + long retransmitContiguousSeqNo = this.retransmitContiguousSeqNo; + int offset = 0; while (offset < retransmitQueueOffset) { @@ -1430,13 +1460,80 @@ private void processRetransmitQueue() final int version = headerDecoder.version(); final int messageOffset = headerOffset + MessageHeaderDecoder.ENCODED_LENGTH; - onBusinessMessage(retransmitQueue, messageOffset, templateId, blockLength, version, false); + final int seqNum = offsets.seqNum(templateId, retransmitQueue, messageOffset); + if (seqNum == retransmitContiguousSeqNo + 1) + { + onBusinessMessage(retransmitQueue, messageOffset, templateId, blockLength, version, false); + retransmitContiguousSeqNo++; + } + else + { + entries.add(new RetransmitQueueEntry(seqNum, offset)); + } offset += length; } + + for (final RetransmitQueueEntry entry : entries) + { + final int headerOffset = entry.offset + SOFH_LENGTH; + headerDecoder.wrap(retransmitQueue, headerOffset); + final int blockLength = headerDecoder.blockLength(); + final int templateId = headerDecoder.templateId(); + final int version = headerDecoder.version(); + + final int messageOffset = headerOffset + MessageHeaderDecoder.ENCODED_LENGTH; + onBusinessMessage(retransmitQueue, messageOffset, templateId, blockLength, version, false); + } + + this.retransmitContiguousSeqNo = NOT_AWAITING_RETRANSMIT; retransmitQueueOffset = 0; } + private void processInOrderRetransmitQueue() + { + // Simple retransmit queue case - messages are all in order and can all be sent. + final ExpandableArrayBuffer retransmitQueue = this.retransmitQueue; + final MessageHeaderDecoder headerDecoder = this.headerDecoder; + int offset = 0; + while (offset < retransmitQueueOffset) + { + final int length = readSofhMessageSize(retransmitQueue, offset); + + final int headerOffset = offset + SOFH_LENGTH; + headerDecoder.wrap(retransmitQueue, headerOffset); + final int blockLength = headerDecoder.blockLength(); + final int templateId = headerDecoder.templateId(); + final int version = headerDecoder.version(); + + final int messageOffset = headerOffset + MessageHeaderDecoder.ENCODED_LENGTH; + final int seqNum = offsets.seqNum(templateId, retransmitQueue, messageOffset); + if (retransmitMaxSeqNo == NOT_AWAITING_RETRANSMIT || seqNum <= retransmitMaxSeqNo) + { + onBusinessMessage(retransmitQueue, messageOffset, templateId, blockLength, version, false); + + offset += length; + } + else + { + break; + } + } + + // shuffle up remaining bytes + final int remainder = retransmitQueueOffset - offset; + if (remainder > 0) + { + retransmitQueue.putBytes(0, retransmitQueue, offset, remainder); + retransmitQueueOffset = remainder; + } + else + { + retransmitQueueOffset = 0; + } + retransmitMaxSeqNo = NOT_AWAITING_RETRANSMIT; + } + private void addRemainingRetransmitRequests( final long lastUuid, final long initialFromSeqNo, @@ -1491,6 +1588,23 @@ static final class RetransmitRequest } } + static final class RetransmitQueueEntry implements Comparable + { + final long seqNum; + final int offset; + + RetransmitQueueEntry(final long seqNum, final int offset) + { + this.seqNum = seqNum; + this.offset = offset; + } + + public int compareTo(final RetransmitQueueEntry o) + { + return Long.compare(seqNum, o.seqNum); + } + } + public long onRetransmitReject( final String reason, final long uuid, final long lastUuid, final long requestTimestamp, final int errorCodes) { diff --git a/artio-ilink3-impl/src/test/java/uk/co/real_logic/artio/library/RetransmitQueueTest.java b/artio-ilink3-impl/src/test/java/uk/co/real_logic/artio/library/RetransmitQueueTest.java index 6a52021b9c..cccb6acd11 100644 --- a/artio-ilink3-impl/src/test/java/uk/co/real_logic/artio/library/RetransmitQueueTest.java +++ b/artio-ilink3-impl/src/test/java/uk/co/real_logic/artio/library/RetransmitQueueTest.java @@ -26,6 +26,7 @@ import uk.co.real_logic.artio.ilink.SimpleOpenFramingHeader; import uk.co.real_logic.artio.protocol.GatewayPublication; +import static iLinkBinary.RetransmitRequest508Decoder.lastUUIDNullValue; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -103,17 +104,14 @@ public void setUp() // We're ready for a retransmit in all cases assertSeqNos(6, 4); - verifyRetransmitRequest(2L, 3, -1); - } + verifyRetransmitRequest(2L, 3); - private void verifyRetransmitRequest(final long fromSeqNo, final int msgCount) - { - verifyRetransmitRequest(fromSeqNo, msgCount, UUID); + reset(proxy); } - private void verifyRetransmitRequest(final long fromSeqNo, final int msgCount, final long lastUuid) + private void verifyRetransmitRequest(final long fromSeqNo, final int msgCount) { - verify(proxy).sendRetransmitRequest(eq(UUID), eq(lastUuid), anyLong(), eq(fromSeqNo), eq(msgCount)); + verify(proxy).sendRetransmitRequest(eq(UUID), eq(lastUUIDNullValue()), anyLong(), eq(fromSeqNo), eq(msgCount)); } private void assertSeqNos(final long nextRecvSeqNo, final long retransmitFillSeqNo) @@ -176,7 +174,7 @@ public void done() } @Test - public void shouldBufferWhenReceivingOutOfOrder() + public void shouldQueueWhenReceivingOutOfOrder() { // @5,2R,3R,4R,done. onExecutionReport(2, true); @@ -266,7 +264,7 @@ public void shouldNotifyWhenTimeoutBreached() } @Test - public void shouldReplayBufferWhenReceivingSequenceMessage() + public void shouldReplayQueueWhenReceivingSequenceMessage() { // @5,6,Seq8,done. onExecutionReport(6, false); @@ -276,7 +274,40 @@ public void shouldReplayBufferWhenReceivingSequenceMessage() assertThat(handler.sequenceNumbers(), contains(5L, 6L)); } - // TODO: gaps within the retransmit - // TODO: gaps within the normal message sequence - // TODO: shouldNotifyAndQueueReRequestWhenMaxSizeBreachedMultipleMessges with a gap in the retransmit if possible + @Test + public void shouldQueueRetransmitForRetransmitGapWithinRetransmit() + { + // @5,2R,3R,4R,done. + onExecutionReport(2, true); + onExecutionReport(4, true); + onExecutionReport(6, false); + + assertSeqNos(7, 3); + assertThat(handler.sequenceNumbers(), contains(2L)); + handler.sequenceNumbers().clear(); + + verifyRetransmitRequest(3, 1); + onExecutionReport(3, true); + assertThat(handler.sequenceNumbers(), contains(3L, 4L, 5L, 6L)); + assertSeqNos(7, NOT_AWAITING_RETRANSMIT); + } + + @Test + public void shouldQueueRetransmitForNormalGapWithinRetransmit() + { + // @5,7,2R,3R,4R,6,done. + onExecutionReport(7, false); + onExecutionReport(2, true); + onExecutionReport(3, true); + onExecutionReport(4, true); + + assertSeqNos(8, 6); + assertThat(handler.sequenceNumbers(), contains(2L, 3L, 4L, 5L)); + handler.sequenceNumbers().clear(); + + verifyRetransmitRequest(6, 1); + onExecutionReport(6, true); + assertThat(handler.sequenceNumbers(), contains(6L, 7L)); + assertSeqNos(8, NOT_AWAITING_RETRANSMIT); + } }