Skip to content

Commit

Permalink
Stream Multi Receive implementation (#4383)
Browse files Browse the repository at this point in the history
  • Loading branch information
ami-GS committed Jul 3, 2024
1 parent 394f1d1 commit db66ab9
Show file tree
Hide file tree
Showing 3 changed files with 892 additions and 59 deletions.
183 changes: 126 additions & 57 deletions src/core/recv_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ QuicRecvBufferInitialize(
RecvBuffer->ReadStart = 0;
RecvBuffer->ReadPendingLength = 0;
RecvBuffer->ReadLength = 0;
RecvBuffer->Capacity = AllocBufferLength;
RecvBuffer->VirtualBufferLength = VirtualBufferLength;
RecvBuffer->RecvMode = RecvMode;
Status = QUIC_STATUS_SUCCESS;
Expand Down Expand Up @@ -243,6 +244,7 @@ QuicRecvBufferResize(
Span - LengthTillWrap);
}
RecvBuffer->ReadStart = 0;
RecvBuffer->Capacity = TargetBufferLength;

} else {
//
Expand Down Expand Up @@ -419,58 +421,100 @@ QuicRecvBufferCopyIntoChunks(
RecvBuffer->Chunks.Flink, // First chunk
QUIC_RECV_CHUNK,
Link);

uint32_t ChunkLength;
BOOLEAN IsFirstChunk = TRUE;
uint64_t RelativeOffset = WriteOffset - RecvBuffer->BaseOffset;
uint32_t ChunkOffset = RecvBuffer->ReadStart;
uint64_t BaseOffset = RecvBuffer->BaseOffset;
if (Chunk->Link.Flink == &RecvBuffer->Chunks) {
CXPLAT_DBG_ASSERT(WriteLength <= Chunk->AllocLength); // Should always fit if we only have one
ChunkLength = Chunk->AllocLength;
RecvBuffer->ReadLength =
(uint32_t)(QuicRangeGet(&RecvBuffer->WrittenRanges, 0)->Count - RecvBuffer->BaseOffset);
} else {
ChunkLength = RecvBuffer->ReadLength;
while (BaseOffset + ChunkLength <= WriteOffset) {
BaseOffset += ChunkLength;
Chunk =
CXPLAT_CONTAINING_RECORD(
Chunk->Link.Flink,
QUIC_RECV_CHUNK,
Link);
ChunkOffset = 0;
ChunkLength = Chunk->AllocLength;
//
// In multiple mode, the first chunk may not start at the beginning.
//
ChunkLength = RecvBuffer->Capacity;

if (RelativeOffset < RecvBuffer->Capacity) {
//
// The write starts from the first chunk. The write spans to the next chunk.
//
RecvBuffer->ReadLength =
(uint32_t)(QuicRangeGet(&RecvBuffer->WrittenRanges, 0)->Count - RecvBuffer->BaseOffset);
if (RecvBuffer->Capacity < RecvBuffer->ReadLength) {
RecvBuffer->ReadLength = RecvBuffer->Capacity;
}
} else {
//
// When the RelativeOffset is larger than the Capacity, the write starts from later chunk.
// Shrink RelativeOffset to represent the offset from beginning of the other chunk.
//
while (ChunkLength <= RelativeOffset) {
RelativeOffset -= ChunkLength;
IsFirstChunk = FALSE;
Chunk =
CXPLAT_CONTAINING_RECORD(
Chunk->Link.Flink,
QUIC_RECV_CHUNK,
Link);
ChunkLength = Chunk->AllocLength;
}
}
}

BOOLEAN IsFirstLoop = TRUE;
do {
uint64_t RelativeOffset = WriteOffset - BaseOffset;
uint32_t ChunkWriteOffset = (ChunkOffset + RelativeOffset) % Chunk->AllocLength;
uint32_t ChunkWriteLength = WriteLength;
if (ChunkWriteLength > ChunkLength) {
ChunkWriteLength = ChunkLength;
if (!IsFirstChunk) {
// This RelativeOffset is already shrunk to represent the offset from beginning of the current chunk.
ChunkWriteOffset = (uint32_t)RelativeOffset;
}
if (!IsFirstLoop) {
// We are continue writing from previous chunk. So, start from the beginning of the currnet chunk.
ChunkWriteOffset = 0;
}

if (ChunkWriteOffset + WriteLength > Chunk->AllocLength) {
uint32_t Part1Len = ChunkLength - ChunkWriteOffset;
CxPlatCopyMemory(Chunk->Buffer + ChunkWriteOffset, WriteBuffer, Part1Len);
CxPlatCopyMemory(Chunk->Buffer, WriteBuffer + Part1Len, WriteLength - Part1Len);
uint32_t ChunkWriteLength = WriteLength;
if (IsFirstChunk) {
if (RecvBuffer->Capacity < RelativeOffset + ChunkWriteLength) {
//
// Trying to write beyond the capacity of the first chunk.
// Limit the write length to the capacity of the first chunk.
//
ChunkWriteLength = RecvBuffer->Capacity - (uint32_t)RelativeOffset;
}
if (Chunk->AllocLength < ChunkWriteOffset + ChunkWriteLength) {
// Circular buffer wrap around case.
CxPlatCopyMemory(Chunk->Buffer + ChunkWriteOffset, WriteBuffer, Chunk->AllocLength - ChunkWriteOffset);
CxPlatCopyMemory(Chunk->Buffer, WriteBuffer + Chunk->AllocLength - ChunkWriteOffset, ChunkWriteLength - (Chunk->AllocLength - ChunkWriteOffset));
} else {
CxPlatCopyMemory(Chunk->Buffer + ChunkWriteOffset, WriteBuffer, ChunkWriteLength);
}
} else {
CxPlatCopyMemory(Chunk->Buffer + ChunkWriteOffset, WriteBuffer, WriteLength);
if (ChunkWriteOffset + ChunkWriteLength >= ChunkLength) {
ChunkWriteLength = ChunkLength - ChunkWriteOffset;
}
CxPlatCopyMemory(Chunk->Buffer + ChunkWriteOffset, WriteBuffer, ChunkWriteLength);
}

if (WriteLength == ChunkWriteLength) {
// Run out of data to write. Exit the loop.
break;
}
WriteOffset += ChunkWriteLength;
WriteLength -= (uint16_t)ChunkWriteLength;
WriteBuffer += ChunkWriteLength;
BaseOffset += ChunkLength;
Chunk =
CXPLAT_CONTAINING_RECORD(
Chunk->Link.Flink,
QUIC_RECV_CHUNK,
Link);
ChunkOffset = 0;
ChunkLength = Chunk->AllocLength;
IsFirstChunk = FALSE;
IsFirstLoop = FALSE;

} while (TRUE);
}
Expand Down Expand Up @@ -542,7 +586,7 @@ QuicRecvBufferWrite(
RecvBuffer->Chunks.Blink,
QUIC_RECV_CHUNK,
Link)->AllocLength << 1;
while (AbsoluteLength > RecvBuffer->BaseOffset + NewBufferLength) {
while (AbsoluteLength > RecvBuffer->BaseOffset + NewBufferLength + RecvBuffer->ReadPendingLength) {
NewBufferLength <<= 1;
}
if (!QuicRecvBufferResize(RecvBuffer, NewBufferLength)) {
Expand Down Expand Up @@ -670,87 +714,92 @@ QuicRecvBufferRead(

} else {
CXPLAT_DBG_ASSERT(RecvBuffer->ReadPendingLength < ContiguousLength); // Shouldn't call read if there is nothing new to read
uint64_t WrittenLength = ContiguousLength - RecvBuffer->ReadPendingLength;
uint64_t UnreadLength = ContiguousLength - RecvBuffer->ReadPendingLength;
CXPLAT_DBG_ASSERT(UnreadLength > 0);

//
// Walk the chunks to find the data after ReadPendingLength, up to
// WrittenLength, to return.
// UnreadLength, to return.
//
uint64_t ReadOffset = RecvBuffer->ReadPendingLength;
uint64_t ChunkReadOffset = RecvBuffer->ReadPendingLength;
QUIC_RECV_CHUNK* Chunk =
CXPLAT_CONTAINING_RECORD(
RecvBuffer->Chunks.Flink,
QUIC_RECV_CHUNK,
Link);
BOOLEAN IsFirstChunk = TRUE;
uint32_t ChunkLength = RecvBuffer->ReadLength;
while ((uint64_t)ChunkLength <= ReadOffset) {
CXPLAT_DBG_ASSERT(ChunkLength);
uint32_t ChunkReadLength = RecvBuffer->ReadLength;
while ((uint64_t)ChunkReadLength <= ChunkReadOffset) {
CXPLAT_DBG_ASSERT(ChunkReadLength);
CXPLAT_DBG_ASSERT(Chunk->ExternalReference);
CXPLAT_DBG_ASSERT(Chunk->Link.Flink != &RecvBuffer->Chunks);
ReadOffset -= ChunkLength;
ChunkReadOffset -= ChunkReadLength;
IsFirstChunk = FALSE;
Chunk =
CXPLAT_CONTAINING_RECORD(
Chunk->Link.Flink,
QUIC_RECV_CHUNK,
Link);
ChunkLength = Chunk->AllocLength;
ChunkReadLength = Chunk->AllocLength;
}
CXPLAT_DBG_ASSERT(*BufferCount >= 3);
CXPLAT_DBG_ASSERT(ReadOffset <= UINT32_MAX);
CXPLAT_DBG_ASSERT(ChunkReadOffset <= UINT32_MAX);

ChunkReadLength -= (uint32_t)ChunkReadOffset;
if (IsFirstChunk) {
//
// Only the first chunk may be used in a circular buffer fashion and
// therefore use the RecvBuffer->ReadStart offset.
//
ChunkLength = RecvBuffer->ReadLength - (uint32_t)ReadOffset;
ReadOffset = (RecvBuffer->ReadStart + ReadOffset) % Chunk->AllocLength;
CXPLAT_DBG_ASSERT(ChunkLength <= WrittenLength);
} else {
ChunkReadOffset = (RecvBuffer->ReadStart + ChunkReadOffset) % Chunk->AllocLength;
CXPLAT_DBG_ASSERT(ChunkReadLength <= UnreadLength);
} else if (ChunkReadLength > UnreadLength) {
//
// Subsequent chunks do not use ReadStart or ReadLength, so we start
// with a chunk length up to the entire length of the chunk.
//
ChunkLength = Chunk->AllocLength - (uint32_t)ReadOffset;
if (ChunkLength > WrittenLength) {
ChunkLength = (uint32_t)WrittenLength;
}
ChunkReadLength = (uint32_t)UnreadLength;
}

CXPLAT_DBG_ASSERT(ChunkLength <= Chunk->AllocLength);
if (ReadOffset + ChunkLength > Chunk->AllocLength) {
CXPLAT_DBG_ASSERT(ChunkReadLength <= Chunk->AllocLength);
if (ChunkReadOffset + ChunkReadLength > Chunk->AllocLength) {
*BufferCount = 2; // Circular buffer wrap around case.
Buffers[0].Length = (uint32_t)(Chunk->AllocLength - ReadOffset);
Buffers[0].Buffer = Chunk->Buffer + ReadOffset;
Buffers[1].Length = ChunkLength - Buffers[0].Length;
Buffers[0].Length = (uint32_t)(Chunk->AllocLength - ChunkReadOffset);
Buffers[0].Buffer = Chunk->Buffer + ChunkReadOffset;
Buffers[1].Length = ChunkReadLength - Buffers[0].Length;
Buffers[1].Buffer = Chunk->Buffer;

} else {
*BufferCount = 1;
Buffers[0].Length = ChunkLength;
Buffers[0].Buffer = Chunk->Buffer + ReadOffset;
Buffers[0].Length = ChunkReadLength;
Buffers[0].Buffer = Chunk->Buffer + ChunkReadOffset;
}
Chunk->ExternalReference = TRUE;

if (WrittenLength > ChunkLength) {
if (UnreadLength > ChunkReadLength) {
CXPLAT_DBG_ASSERT(Chunk->Link.Flink != &RecvBuffer->Chunks); // There must be another chunk to read from
WrittenLength -= ChunkLength;
ChunkReadLength = (uint32_t)UnreadLength - ChunkReadLength;
Chunk =
CXPLAT_CONTAINING_RECORD(
Chunk->Link.Flink,
QUIC_RECV_CHUNK,
Link);
CXPLAT_DBG_ASSERT(WrittenLength <= Chunk->AllocLength); // Shouldn't be able to read more than the chunk size
Buffers[*BufferCount].Length = (uint32_t)WrittenLength;
CXPLAT_DBG_ASSERT(ChunkReadLength <= Chunk->AllocLength); // Shouldn't be able to read more than the chunk size
Buffers[*BufferCount].Length = ChunkReadLength;
Buffers[*BufferCount].Buffer = Chunk->Buffer;
*BufferCount = *BufferCount + 1;
Chunk->ExternalReference = TRUE;
}

*BufferOffset = RecvBuffer->BaseOffset + RecvBuffer->ReadPendingLength;
RecvBuffer->ReadPendingLength += WrittenLength;
RecvBuffer->ReadPendingLength += UnreadLength;

#if DEBUG
uint64_t TotalBuffersLength = 0;
for (uint32_t i = 0; i < *BufferCount; ++i) {
TotalBuffersLength += Buffers[i].Length;
}
CXPLAT_DBG_ASSERT(TotalBuffersLength <= RecvBuffer->ReadPendingLength);
#endif
}
}

Expand Down Expand Up @@ -815,8 +864,15 @@ QuicRecvBufferPartialDrain(
//
RecvBuffer->ReadStart =
(uint32_t)((RecvBuffer->ReadStart + DrainLength) % Chunk->AllocLength);
if (Chunk->Link.Flink != &RecvBuffer->Chunks) {
//
// If there is another chunk, then the capacity of first chunk is shrunk.
//
RecvBuffer->Capacity -= (uint32_t)DrainLength;
}
}

CXPLAT_DBG_ASSERT(RecvBuffer->ReadLength >= (uint32_t)DrainLength);
RecvBuffer->ReadLength -= (uint32_t)DrainLength;
}

Expand All @@ -826,6 +882,13 @@ QuicRecvBufferPartialDrain(
// referencing any chunks anymore.
//
Chunk->ExternalReference = FALSE;
} else {
//
// If all ReadPending data is drained, then we can release the external reference
//
Chunk->ExternalReference = RecvBuffer->ReadPendingLength != DrainLength;
CXPLAT_DBG_ASSERT(DrainLength <= RecvBuffer->ReadPendingLength);
RecvBuffer->ReadPendingLength -= DrainLength;
}
}

Expand Down Expand Up @@ -853,6 +916,9 @@ QuicRecvBufferFullDrain(
DrainLength -= RecvBuffer->ReadLength;
RecvBuffer->ReadStart = 0;
RecvBuffer->BaseOffset += RecvBuffer->ReadLength;
if (RecvBuffer->RecvMode == QUIC_RECV_BUF_MODE_MULTIPLE) {
RecvBuffer->ReadPendingLength -= RecvBuffer->ReadLength;
}
RecvBuffer->ReadLength =
(uint32_t)(QuicRangeGet(&RecvBuffer->WrittenRanges, 0)->Count - RecvBuffer->BaseOffset);

Expand All @@ -879,12 +945,14 @@ QuicRecvBufferFullDrain(
// The rest of the contiguous data might not fit in just the next chunk
// so we need to update the ReadLength of the first chunk to be no more
// than the next chunk's allocation length.
// Capacity is also updated to reflect the new first chunk's allocation length.
//
Chunk =
CXPLAT_CONTAINING_RECORD(
RecvBuffer->Chunks.Flink,
QUIC_RECV_CHUNK,
Link);
RecvBuffer->Capacity = Chunk->AllocLength;
if (Chunk->AllocLength < RecvBuffer->ReadLength) {
RecvBuffer->ReadLength = Chunk->AllocLength;
}
Expand All @@ -903,19 +971,20 @@ QuicRecvBufferDrain(
CXPLAT_DBG_ASSERT(DrainLength <= RecvBuffer->ReadPendingLength);
if (RecvBuffer->RecvMode != QUIC_RECV_BUF_MODE_MULTIPLE) {
RecvBuffer->ReadPendingLength = 0;
} else {
RecvBuffer->ReadPendingLength -= DrainLength;
}

QUIC_SUBRANGE* FirstRange = QuicRangeGet(&RecvBuffer->WrittenRanges, 0);
CXPLAT_DBG_ASSERT(FirstRange);
CXPLAT_DBG_ASSERT(FirstRange->Low == 0);
do {
BOOLEAN PartialDrain = (uint64_t)RecvBuffer->ReadLength > DrainLength;
if (PartialDrain ||
(QuicRangeSize(&RecvBuffer->WrittenRanges) > 1 &&
RecvBuffer->BaseOffset + RecvBuffer->ReadLength == FirstRange->Count)) {
//
// If there are 2 or more written ranges, it means that there may be
// If there are 2 or more written ranges in the first chunk, it means that there may be
// more data later in the chunk that couldn't be read because there is a gap.
// Reuse the partial drain logic to preserve data after the gap.
//
QuicRangeSize(&RecvBuffer->WrittenRanges) > 1) {
QuicRecvBufferPartialDrain(RecvBuffer, DrainLength);
return !PartialDrain;
}
Expand Down
6 changes: 6 additions & 0 deletions src/core/recv_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ typedef struct QUIC_RECV_BUFFER {
//
uint32_t VirtualBufferLength;

//
// Basically same as Chunk->AllocLength of first chunk, but start shrinking
// by drain operation after next chunk is allocated.
//
uint32_t Capacity;

//
// Controls the behavior of the buffer, which changes the logic for
// writing, reading and draining.
Expand Down
Loading

0 comments on commit db66ab9

Please sign in to comment.