From 16cec2d941a1c67d46af60a3dfc87330e3d5570c Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Fri, 3 May 2024 08:52:04 -0400 Subject: [PATCH 01/17] Support Priority Work on Connections --- src/core/api.c | 35 +++++++++++++++++++++++++++------- src/core/connection.c | 22 +++++++++++++++++++++ src/core/connection.h | 7 +++++++ src/core/datagram.c | 7 ++++++- src/core/operation.c | 30 +++++++++++++++++++++++++++++ src/core/operation.h | 12 ++++++++++++ src/cs/lib/msquic_generated.cs | 5 +++++ src/inc/msquic.h | 6 +++++- src/tools/spin/spinquic.cpp | 19 ++++++++++++++++-- 9 files changed, 132 insertions(+), 11 deletions(-) diff --git a/src/core/api.c b/src/core/api.c index 0dfed4d4d5..f90f9010a9 100644 --- a/src/core/api.c +++ b/src/core/api.c @@ -853,7 +853,11 @@ MsQuicStreamStart( // // Queue the operation but don't wait for the completion. // - QuicConnQueueOper(Connection, Oper); + if (Flags & QUIC_STREAM_START_FLAG_PRIORITY_WORK) { + QuicConnQueueHighPriorityOper(Connection, Oper); + } else { + QuicConnQueueOper(Connection, Oper); + } Status = QUIC_STATUS_PENDING; Exit: @@ -1004,6 +1008,7 @@ MsQuicStreamSend( uint64_t TotalLength; QUIC_SEND_REQUEST* SendRequest; BOOLEAN QueueOper = TRUE; + const BOOLEAN IsPriority = Flags & QUIC_SEND_FLAG_PRIORITY_WORK; BOOLEAN SendInline; QUIC_OPERATION* Oper; @@ -1170,7 +1175,7 @@ MsQuicStreamSend( Oper->API_CALL.Context->CONN_SHUTDOWN.ErrorCode = (QUIC_VAR_INT)QUIC_STATUS_OUT_OF_MEMORY; Oper->API_CALL.Context->CONN_SHUTDOWN.RegistrationShutdown = FALSE; Oper->API_CALL.Context->CONN_SHUTDOWN.TransportShutdown = TRUE; - QuicConnQueueOper(Connection, Oper); + QuicConnQueueHighestPriorityOper(Connection, Oper); goto Exit; } @@ -1180,7 +1185,11 @@ MsQuicStreamSend( // // Queue the operation but don't wait for the completion. // - QuicConnQueueOper(Connection, Oper); + if (IsPriority) { + QuicConnQueueHighPriorityOper(Connection, Oper); + } else { + QuicConnQueueOper(Connection, Oper); + } } Exit: @@ -1372,6 +1381,8 @@ MsQuicSetParam( Handle); QUIC_STATUS Status; + const BOOLEAN IsPriority = !!(Param & QUIC_PARAM_HIGH_PRIORITY); + Param &= ~QUIC_PARAM_HIGH_PRIORITY; if (QUIC_PARAM_IS_GLOBAL(Param)) { // @@ -1442,7 +1453,11 @@ MsQuicSetParam( // // Queue the operation and wait for it to be processed. // - QuicConnQueueOper(Connection, &Oper); + if (IsPriority) { + QuicConnQueueHighPriorityOper(Connection, &Oper); + } else { + QuicConnQueueOper(Connection, &Oper); + } QuicTraceEvent( ApiWaitOperation, "[ api] Waiting on operation"); @@ -1483,14 +1498,16 @@ MsQuicGetParam( return QUIC_STATUS_INVALID_PARAMETER; } - QUIC_STATUS Status; - QuicTraceEvent( ApiEnter, "[ api] Enter %u (%p).", QUIC_TRACE_API_GET_PARAM, Handle); + QUIC_STATUS Status; + const BOOLEAN IsPriority = !!(Param & QUIC_PARAM_HIGH_PRIORITY); + Param &= ~QUIC_PARAM_HIGH_PRIORITY; + if (QUIC_PARAM_IS_GLOBAL(Param)) { // // Global parameters are processed inline. @@ -1560,7 +1577,11 @@ MsQuicGetParam( // // Queue the operation and wait for it to be processed. // - QuicConnQueueOper(Connection, &Oper); + if (IsPriority) { + QuicConnQueueHighPriorityOper(Connection, &Oper); + } else { + QuicConnQueueOper(Connection, &Oper); + } QuicTraceEvent( ApiWaitOperation, "[ api] Waiting on operation"); diff --git a/src/core/connection.c b/src/core/connection.c index 687bf3c790..cb974ae3b6 100644 --- a/src/core/connection.c +++ b/src/core/connection.c @@ -716,6 +716,28 @@ QuicConnQueueOper( } } +_IRQL_requires_max_(DISPATCH_LEVEL) +void +QuicConnQueueHighPriorityOper( + _In_ QUIC_CONNECTION* Connection, + _In_ QUIC_OPERATION* Oper + ) +{ +#if DEBUG + if (!Connection->State.Initialized) { + CXPLAT_DBG_ASSERT(QuicConnIsServer(Connection)); + CXPLAT_DBG_ASSERT(Connection->SourceCids.Next != NULL || CxPlatIsRandomMemoryFailureEnabled()); + } +#endif + if (QuicOperationEnqueuePriority(&Connection->OperQ, Oper)) { + // + // The connection needs to be queued on the worker because this was the + // first operation in our OperQ. + // + QuicWorkerQueueConnection(Connection->Worker, Connection); // TODO - Support priority connections on worker? + } +} + _IRQL_requires_max_(DISPATCH_LEVEL) void QuicConnQueueHighestPriorityOper( diff --git a/src/core/connection.h b/src/core/connection.h index eb819506f2..2f4178df47 100644 --- a/src/core/connection.h +++ b/src/core/connection.h @@ -1157,6 +1157,13 @@ QuicConnQueueOper( _In_ QUIC_OPERATION* Oper ); +_IRQL_requires_max_(DISPATCH_LEVEL) +void +QuicConnQueueHighPriorityOper( + _In_ QUIC_CONNECTION* Connection, + _In_ QUIC_OPERATION* Oper + ); + _IRQL_requires_max_(DISPATCH_LEVEL) void QuicConnQueueHighestPriorityOper( diff --git a/src/core/datagram.c b/src/core/datagram.c index a972a05d6d..73af7f4fe8 100644 --- a/src/core/datagram.c +++ b/src/core/datagram.c @@ -328,6 +328,7 @@ QuicDatagramQueueSend( { QUIC_STATUS Status; BOOLEAN QueueOper = TRUE; + BOOLEAN IsPriority = !!(SendRequest->Flags & QUIC_SEND_FLAG_PRIORITY_WORK); QUIC_CONNECTION* Connection = QuicDatagramGetConnection(Datagram); CxPlatDispatchLockAcquire(&Datagram->ApiQueueLock); @@ -388,7 +389,11 @@ QuicDatagramQueueSend( // // Queue the operation but don't wait for the completion. // - QuicConnQueueOper(Connection, Oper); + if (IsPriority) { + QuicConnQueueHighPriorityOper(Connection, Oper); + } else { + QuicConnQueueOper(Connection, Oper); + } } Exit: diff --git a/src/core/operation.c b/src/core/operation.c index 802a8c9229..05744d2d66 100644 --- a/src/core/operation.c +++ b/src/core/operation.c @@ -36,6 +36,7 @@ QuicOperationQueueInitialize( OperQ->ActivelyProcessing = FALSE; CxPlatDispatchLockInitialize(&OperQ->Lock); CxPlatListInitializeHead(&OperQ->List); + OperQ->PriorityTail = &OperQ->List; } _IRQL_requires_max_(DISPATCH_LEVEL) @@ -46,6 +47,7 @@ QuicOperationQueueUninitialize( { UNREFERENCED_PARAMETER(OperQ); CXPLAT_DBG_ASSERT(CxPlatListIsEmpty(&OperQ->List)); + CXPLAT_DBG_ASSERT(OperQ->PriorityTail == &OperQ->List); CxPlatDispatchLockUninitialize(&OperQ->Lock); } @@ -149,6 +151,27 @@ QuicOperationEnqueue( return StartProcessing; } +_IRQL_requires_max_(DISPATCH_LEVEL) +BOOLEAN +QuicOperationEnqueuePriority( + _In_ QUIC_OPERATION_QUEUE* OperQ, + _In_ QUIC_OPERATION* Oper + ) +{ + BOOLEAN StartProcessing; + CxPlatDispatchLockAcquire(&OperQ->Lock); +#if DEBUG + CXPLAT_DBG_ASSERT(Oper->Link.Flink == NULL); +#endif + StartProcessing = CxPlatListIsEmpty(&OperQ->List) && !OperQ->ActivelyProcessing; + CxPlatListInsertTail(OperQ->PriorityTail, &Oper->Link); + OperQ->PriorityTail = &Oper->Link; + CxPlatDispatchLockRelease(&OperQ->Lock); + QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_OPER_QUEUED); + QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_OPER_QUEUE_DEPTH); + return StartProcessing; +} + _IRQL_requires_max_(DISPATCH_LEVEL) BOOLEAN QuicOperationEnqueueFront( @@ -162,6 +185,9 @@ QuicOperationEnqueueFront( CXPLAT_DBG_ASSERT(Oper->Link.Flink == NULL); #endif StartProcessing = CxPlatListIsEmpty(&OperQ->List) && !OperQ->ActivelyProcessing; + if (OperQ->PriorityTail == &OperQ->List) { + OperQ->PriorityTail = &Oper->Link; + } CxPlatListInsertHead(&OperQ->List, &Oper->Link); CxPlatDispatchLockRelease(&OperQ->Lock); QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_OPER_QUEUED); @@ -188,6 +214,9 @@ QuicOperationDequeue( #if DEBUG Oper->Link.Flink = NULL; #endif + if (OperQ->PriorityTail == &Oper->Link) { + OperQ->PriorityTail = &OperQ->List; + } } CxPlatDispatchLockRelease(&OperQ->Lock); @@ -210,6 +239,7 @@ QuicOperationQueueClear( CxPlatDispatchLockAcquire(&OperQ->Lock); OperQ->ActivelyProcessing = FALSE; CxPlatListMoveItems(&OperQ->List, &OldList); + OperQ->PriorityTail = &OperQ->List; CxPlatDispatchLockRelease(&OperQ->Lock); int64_t OperationsDequeued = 0; diff --git a/src/core/operation.h b/src/core/operation.h index 323c830142..b646c16732 100644 --- a/src/core/operation.h +++ b/src/core/operation.h @@ -296,6 +296,7 @@ typedef struct QUIC_OPERATION_QUEUE { // CXPLAT_DISPATCH_LOCK Lock; CXPLAT_LIST_ENTRY List; + CXPLAT_LIST_ENTRY* PriorityTail; // Tail of the priority queue. } QUIC_OPERATION_QUEUE; @@ -348,6 +349,17 @@ QuicOperationEnqueue( _In_ QUIC_OPERATION* Oper ); +// +// Enqueues an operation into the priority part of the queue. Returns TRUE if +// the queue was previously empty and not already being processed. +// +_IRQL_requires_max_(DISPATCH_LEVEL) +BOOLEAN +QuicOperationEnqueuePriority( + _In_ QUIC_OPERATION_QUEUE* OperQ, + _In_ QUIC_OPERATION* Oper + ); + // // Enqueues an operation at the front of the queue. Returns TRUE if the queue // was previously empty and not already being processed. diff --git a/src/cs/lib/msquic_generated.cs b/src/cs/lib/msquic_generated.cs index 46157c55dd..7584a573f2 100644 --- a/src/cs/lib/msquic_generated.cs +++ b/src/cs/lib/msquic_generated.cs @@ -162,6 +162,7 @@ internal enum QUIC_STREAM_START_FLAGS FAIL_BLOCKED = 0x0002, SHUTDOWN_ON_FAIL = 0x0004, INDICATE_PEER_ACCEPT = 0x0008, + PRIORITY_WORK = 0x0010, } [System.Flags] @@ -194,6 +195,7 @@ internal enum QUIC_SEND_FLAGS DGRAM_PRIORITY = 0x0008, DELAY_SEND = 0x0010, CANCEL_ON_LOSS = 0x0020, + PRIORITY_WORK = 0x0040, } internal enum QUIC_DATAGRAM_SEND_STATE @@ -3260,6 +3262,9 @@ internal static unsafe partial class MsQuic [NativeTypeName("#define QUIC_PARAM_PREFIX_STREAM 0x08000000")] internal const uint QUIC_PARAM_PREFIX_STREAM = 0x08000000; + [NativeTypeName("#define QUIC_PARAM_HIGH_PRIORITY 0x40000000")] + internal const uint QUIC_PARAM_HIGH_PRIORITY = 0x40000000; + [NativeTypeName("#define QUIC_PARAM_GLOBAL_RETRY_MEMORY_PERCENT 0x01000000")] internal const uint QUIC_PARAM_GLOBAL_RETRY_MEMORY_PERCENT = 0x01000000; diff --git a/src/inc/msquic.h b/src/inc/msquic.h index bc63bf20ae..ff25cfcddd 100644 --- a/src/inc/msquic.h +++ b/src/inc/msquic.h @@ -208,6 +208,7 @@ typedef enum QUIC_STREAM_START_FLAGS { QUIC_STREAM_START_FLAG_FAIL_BLOCKED = 0x0002, // Only opens the stream if flow control allows. QUIC_STREAM_START_FLAG_SHUTDOWN_ON_FAIL = 0x0004, // Shutdown the stream immediately after start failure. QUIC_STREAM_START_FLAG_INDICATE_PEER_ACCEPT = 0x0008, // Indicate PEER_ACCEPTED event if not accepted at start. + QUIC_STREAM_START_FLAG_PRIORITY_WORK = 0x0010, // Higher priority than other connection work. } QUIC_STREAM_START_FLAGS; DEFINE_ENUM_FLAG_OPERATORS(QUIC_STREAM_START_FLAGS) @@ -241,6 +242,7 @@ typedef enum QUIC_SEND_FLAGS { QUIC_SEND_FLAG_DGRAM_PRIORITY = 0x0008, // Indicates the datagram is higher priority than others. QUIC_SEND_FLAG_DELAY_SEND = 0x0010, // Indicates the send should be delayed because more will be queued soon. QUIC_SEND_FLAG_CANCEL_ON_LOSS = 0x0020, // Indicates that a stream is to be cancelled when packet loss is detected. + QUIC_SEND_FLAG_PRIORITY_WORK = 0x0040, // Higher priority than other connection work. } QUIC_SEND_FLAGS; DEFINE_ENUM_FLAG_OPERATORS(QUIC_SEND_FLAGS) @@ -827,7 +829,9 @@ void #define QUIC_PARAM_PREFIX_TLS_SCHANNEL 0x07000000 #define QUIC_PARAM_PREFIX_STREAM 0x08000000 -#define QUIC_PARAM_IS_GLOBAL(Param) ((Param & 0x7F000000) == QUIC_PARAM_PREFIX_GLOBAL) +#define QUIC_PARAM_HIGH_PRIORITY 0x40000000 // Combine with any param to make it high priority. + +#define QUIC_PARAM_IS_GLOBAL(Param) ((Param & 0x3F000000) == QUIC_PARAM_PREFIX_GLOBAL) // // Parameters for Global. diff --git a/src/tools/spin/spinquic.cpp b/src/tools/spin/spinquic.cpp index ec5fb5c290..5e03f2f7ee 100644 --- a/src/tools/spin/spinquic.cpp +++ b/src/tools/spin/spinquic.cpp @@ -580,6 +580,9 @@ struct SetParamHelper { void SetUint64(uint32_t _Type, uint64_t Value) { Type = _Type; Param.u64 = Value; Size = sizeof(Value); } + void SetPriority() { + Type |= QUIC_PARAM_HIGH_PRIORITY; + } void Apply(HQUIC Handle) { if (Type != -1) { MsQuic.SetParam(Handle, Type, Size, IsPtr ? Param.ptr : &Param); @@ -826,6 +829,10 @@ void SpinQuicSetRandomConnectionParam(HQUIC Connection, uint16_t ThreadID) break; } + if (GetRandom(2)) { + Helper.SetPriority(); + } + Helper.Apply(Connection); } @@ -851,6 +858,10 @@ void SpinQuicSetRandomStreamParam(HQUIC Stream, uint16_t ThreadID) break; } + if (GetRandom(2)) { + Helper.SetPriority(); + } + Helper.Apply(Stream); } @@ -878,6 +889,10 @@ void SpinQuicGetRandomParam(HQUIC Handle, uint16_t ThreadID) uint32_t Param = (uint32_t)GetRandom(((ParamCounts[Level] & 0xFFFFFFF)) + 1); uint32_t Combined = ((Level+1) << 28) + Param; + if (GetRandom(2)) { + Combined |= QUIC_PARAM_HIGH_PRIORITY; + } + uint8_t OutBuffer[200]; uint32_t OutBufferLength = (uint32_t)GetRandom(sizeof(OutBuffer) + 1); @@ -1007,7 +1022,7 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste Buffer->Buffer = Gb.SendBuffer + StreamCtx->SendOffset; Buffer->Length = Length; if (QUIC_SUCCEEDED( - MsQuic.StreamSend(Stream, Buffer, 1, (QUIC_SEND_FLAGS)GetRandom(16), Buffer))) { + MsQuic.StreamSend(Stream, Buffer, 1, (QUIC_SEND_FLAGS)GetRandom(128), Buffer))) { StreamCtx->SendOffset = (uint8_t)(StreamCtx->SendOffset + Length); } else { delete Buffer; @@ -1142,7 +1157,7 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste if (Buffer) { Buffer->Buffer = Gb.SendBuffer; Buffer->Length = MaxBufferSizes[GetRandom(BufferCount)]; - if (QUIC_FAILED(MsQuic.DatagramSend(Connection, Buffer, 1, (QUIC_SEND_FLAGS)GetRandom(8), Buffer))) { + if (QUIC_FAILED(MsQuic.DatagramSend(Connection, Buffer, 1, (QUIC_SEND_FLAGS)GetRandom(128), Buffer))) { delete Buffer; } } From 4b6b857a1a1d7ee36e32a0244787649bf1ef4c0e Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Fri, 3 May 2024 08:58:26 -0400 Subject: [PATCH 02/17] few nits --- src/core/api.c | 2 +- src/core/datagram.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/api.c b/src/core/api.c index f90f9010a9..d1a347fd4e 100644 --- a/src/core/api.c +++ b/src/core/api.c @@ -1008,7 +1008,7 @@ MsQuicStreamSend( uint64_t TotalLength; QUIC_SEND_REQUEST* SendRequest; BOOLEAN QueueOper = TRUE; - const BOOLEAN IsPriority = Flags & QUIC_SEND_FLAG_PRIORITY_WORK; + const BOOLEAN IsPriority = !!(Flags & QUIC_SEND_FLAG_PRIORITY_WORK); BOOLEAN SendInline; QUIC_OPERATION* Oper; diff --git a/src/core/datagram.c b/src/core/datagram.c index 73af7f4fe8..74df255af8 100644 --- a/src/core/datagram.c +++ b/src/core/datagram.c @@ -328,7 +328,7 @@ QuicDatagramQueueSend( { QUIC_STATUS Status; BOOLEAN QueueOper = TRUE; - BOOLEAN IsPriority = !!(SendRequest->Flags & QUIC_SEND_FLAG_PRIORITY_WORK); + const BOOLEAN IsPriority = !!(SendRequest->Flags & QUIC_SEND_FLAG_PRIORITY_WORK); QUIC_CONNECTION* Connection = QuicDatagramGetConnection(Datagram); CxPlatDispatchLockAcquire(&Datagram->ApiQueueLock); From 1901d1d605fd5fe349d101e391f7486706e98f45 Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Fri, 3 May 2024 10:46:27 -0400 Subject: [PATCH 03/17] Make CodeCheck builds happy (hopefully) --- src/core/api.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/core/api.c b/src/core/api.c index d1a347fd4e..3937d11bd3 100644 --- a/src/core/api.c +++ b/src/core/api.c @@ -1366,6 +1366,9 @@ MsQuicSetParam( { CXPLAT_PASSIVE_CODE(); + const BOOLEAN IsPriority = !!(Param & QUIC_PARAM_HIGH_PRIORITY); + Param &= ~QUIC_PARAM_HIGH_PRIORITY; + if ((Handle == NULL) ^ QUIC_PARAM_IS_GLOBAL(Param)) { // // Ensure global parameters don't have a handle passed in, and vice @@ -1381,8 +1384,6 @@ MsQuicSetParam( Handle); QUIC_STATUS Status; - const BOOLEAN IsPriority = !!(Param & QUIC_PARAM_HIGH_PRIORITY); - Param &= ~QUIC_PARAM_HIGH_PRIORITY; if (QUIC_PARAM_IS_GLOBAL(Param)) { // @@ -1489,6 +1490,9 @@ MsQuicGetParam( { CXPLAT_PASSIVE_CODE(); + const BOOLEAN IsPriority = !!(Param & QUIC_PARAM_HIGH_PRIORITY); + Param &= ~QUIC_PARAM_HIGH_PRIORITY; + if ((Handle == NULL) ^ QUIC_PARAM_IS_GLOBAL(Param) || BufferLength == NULL) { // @@ -1505,8 +1509,6 @@ MsQuicGetParam( Handle); QUIC_STATUS Status; - const BOOLEAN IsPriority = !!(Param & QUIC_PARAM_HIGH_PRIORITY); - Param &= ~QUIC_PARAM_HIGH_PRIORITY; if (QUIC_PARAM_IS_GLOBAL(Param)) { // From 1f22fa10c530207cfb6f6ed8ee5a41b0dc857fad Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Fri, 3 May 2024 11:12:35 -0400 Subject: [PATCH 04/17] Name tweak --- src/core/api.c | 8 ++++---- src/core/connection.c | 2 +- src/core/connection.h | 2 +- src/core/datagram.c | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/core/api.c b/src/core/api.c index 3937d11bd3..96f59638fe 100644 --- a/src/core/api.c +++ b/src/core/api.c @@ -854,7 +854,7 @@ MsQuicStreamStart( // Queue the operation but don't wait for the completion. // if (Flags & QUIC_STREAM_START_FLAG_PRIORITY_WORK) { - QuicConnQueueHighPriorityOper(Connection, Oper); + QuicConnQueuePriorityOper(Connection, Oper); } else { QuicConnQueueOper(Connection, Oper); } @@ -1186,7 +1186,7 @@ MsQuicStreamSend( // Queue the operation but don't wait for the completion. // if (IsPriority) { - QuicConnQueueHighPriorityOper(Connection, Oper); + QuicConnQueuePriorityOper(Connection, Oper); } else { QuicConnQueueOper(Connection, Oper); } @@ -1455,7 +1455,7 @@ MsQuicSetParam( // Queue the operation and wait for it to be processed. // if (IsPriority) { - QuicConnQueueHighPriorityOper(Connection, &Oper); + QuicConnQueuePriorityOper(Connection, &Oper); } else { QuicConnQueueOper(Connection, &Oper); } @@ -1580,7 +1580,7 @@ MsQuicGetParam( // Queue the operation and wait for it to be processed. // if (IsPriority) { - QuicConnQueueHighPriorityOper(Connection, &Oper); + QuicConnQueuePriorityOper(Connection, &Oper); } else { QuicConnQueueOper(Connection, &Oper); } diff --git a/src/core/connection.c b/src/core/connection.c index cb974ae3b6..2acefddf23 100644 --- a/src/core/connection.c +++ b/src/core/connection.c @@ -718,7 +718,7 @@ QuicConnQueueOper( _IRQL_requires_max_(DISPATCH_LEVEL) void -QuicConnQueueHighPriorityOper( +QuicConnQueuePriorityOper( _In_ QUIC_CONNECTION* Connection, _In_ QUIC_OPERATION* Oper ) diff --git a/src/core/connection.h b/src/core/connection.h index 2f4178df47..8441da65ca 100644 --- a/src/core/connection.h +++ b/src/core/connection.h @@ -1159,7 +1159,7 @@ QuicConnQueueOper( _IRQL_requires_max_(DISPATCH_LEVEL) void -QuicConnQueueHighPriorityOper( +QuicConnQueuePriorityOper( _In_ QUIC_CONNECTION* Connection, _In_ QUIC_OPERATION* Oper ); diff --git a/src/core/datagram.c b/src/core/datagram.c index 74df255af8..233c0f84cc 100644 --- a/src/core/datagram.c +++ b/src/core/datagram.c @@ -390,7 +390,7 @@ QuicDatagramQueueSend( // Queue the operation but don't wait for the completion. // if (IsPriority) { - QuicConnQueueHighPriorityOper(Connection, Oper); + QuicConnQueuePriorityOper(Connection, Oper); } else { QuicConnQueueOper(Connection, Oper); } From 3bb2ad6f489784880af607afc0a9f34e824a026e Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Fri, 7 Jun 2024 16:46:05 -0700 Subject: [PATCH 05/17] Initial test --- src/test/MsQuicTests.h | 4 ++ src/test/bin/quic_gtest.cpp | 9 ++++ src/test/lib/DataTest.cpp | 84 +++++++++++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+) diff --git a/src/test/MsQuicTests.h b/src/test/MsQuicTests.h index 8b451b1089..d0b87da63d 100644 --- a/src/test/MsQuicTests.h +++ b/src/test/MsQuicTests.h @@ -575,6 +575,10 @@ QuicTestStreamBlockUnblockConnFlowControl( _In_ BOOLEAN Bidirectional ); +void +QuicTestConnectionPriority( + ); + void QuicTestEcn( _In_ int Family diff --git a/src/test/bin/quic_gtest.cpp b/src/test/bin/quic_gtest.cpp index bbc3f8799a..59bbc9614a 100644 --- a/src/test/bin/quic_gtest.cpp +++ b/src/test/bin/quic_gtest.cpp @@ -2220,6 +2220,15 @@ TEST(Misc, StreamAbortConnFlowControl) { } } +TEST(Basic, ConnectionPriority) { + TestLogger Logger("QuicTestConnectionPriority"); + if (TestingKernelMode) { + GTEST_SKIP(); // TODO + } else { + QuicTestConnectionPriority(); + } +} + TEST(Drill, VarIntEncoder) { TestLogger Logger("QuicDrillTestVarIntEncoder"); if (TestingKernelMode) { diff --git a/src/test/lib/DataTest.cpp b/src/test/lib/DataTest.cpp index 8214f78977..2f2f822a59 100644 --- a/src/test/lib/DataTest.cpp +++ b/src/test/lib/DataTest.cpp @@ -3362,6 +3362,90 @@ QuicTestStreamAbortConnFlowControl( TEST_TRUE(Context.ClientStreamShutdownComplete.WaitTimeout(TestWaitTimeout)); } +struct ConnectionPriorityTestContext { + static const char* PriorityTag; + static const uint8_t NumSend; + uint32_t CurrentReceiveCount {0}; + uint32_t WhenPriorityReceived {0}; + CxPlatEvent AllReceivesComplete; + + static QUIC_STATUS StreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) { + UNREFERENCED_PARAMETER(Stream); + auto TestContext = (ConnectionPriorityTestContext*)Context; + if (Event->Type == QUIC_STREAM_EVENT_RECEIVE) { + TestContext->CurrentReceiveCount++; + if (memcmp(Event->RECEIVE.Buffers[0].Buffer, PriorityTag, strlen(PriorityTag)) == 0) { + TestContext->WhenPriorityReceived = TestContext->CurrentReceiveCount; + } + if (TestContext->CurrentReceiveCount == NumSend) { + TestContext->AllReceivesComplete.Set(); + } + } + return QUIC_STATUS_SUCCESS; + } + + static QUIC_STATUS ConnCallback(_In_ MsQuicConnection*, _In_opt_ void* Context, _Inout_ QUIC_CONNECTION_EVENT* Event) { + if (Event->Type == QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED) { + new(std::nothrow) MsQuicStream(Event->PEER_STREAM_STARTED.Stream, CleanUpAutoDelete, StreamCallback, Context); + } + return QUIC_STATUS_SUCCESS; + } +}; +const char* ConnectionPriorityTestContext::PriorityTag = "priority"; +const uint8_t ConnectionPriorityTestContext::NumSend = 100; + +void QuicTestConnectionPriority() +{ + MsQuicRegistration Registration(true); + TEST_QUIC_SUCCEEDED(Registration.GetInitStatus()); + + MsQuicConfiguration ServerConfiguration(Registration, "MsQuicTest", MsQuicSettings().SetPeerUnidiStreamCount(3), ServerSelfSignedCredConfig); + TEST_QUIC_SUCCEEDED(ServerConfiguration.GetInitStatus()); + + MsQuicConfiguration ClientConfiguration(Registration, "MsQuicTest", MsQuicCredentialConfig()); + TEST_QUIC_SUCCEEDED(ClientConfiguration.GetInitStatus()); + + ConnectionPriorityTestContext Context; + MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, ConnectionPriorityTestContext::ConnCallback, &Context); + TEST_QUIC_SUCCEEDED(Listener.GetInitStatus()); + TEST_QUIC_SUCCEEDED(Listener.Start("MsQuicTest")); + QuicAddr ServerLocalAddr; + TEST_QUIC_SUCCEEDED(Listener.GetLocalAddr(ServerLocalAddr)); + + MsQuicConnection Connection(Registration); + TEST_QUIC_SUCCEEDED(Connection.GetInitStatus()); + + uint8_t RawBuffer[100]; + uint8_t PriorityRawBuffer[100]; + CxPlatCopyMemory(PriorityRawBuffer, ConnectionPriorityTestContext::PriorityTag, strlen(ConnectionPriorityTestContext::PriorityTag)); + QUIC_BUFFER Buffer { sizeof(RawBuffer), RawBuffer }; + QUIC_BUFFER PriorityBuffer { sizeof(PriorityRawBuffer), PriorityRawBuffer }; + + MsQuicStream* Streams[ConnectionPriorityTestContext::NumSend-1] = {0}; + for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend-1; ++i) { + Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL); + TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); + } + for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend-1; ++i) { + TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); + } + MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL); + TEST_QUIC_SUCCEEDED(Stream.Send(&PriorityBuffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN | (QUIC_SEND_FLAGS)(QUIC_STREAM_START_FLAG_PRIORITY_WORK))); + + TEST_QUIC_SUCCEEDED(Connection.Start(ClientConfiguration, ServerLocalAddr.GetFamily(), QUIC_TEST_LOOPBACK_FOR_AF(ServerLocalAddr.GetFamily()), ServerLocalAddr.GetPort())); + TEST_TRUE(Connection.HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout)); + TEST_TRUE(Connection.HandshakeComplete); + + TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout)); + // Assuming all the send operation is serialized in the Connection FIFO queue + // However the prioritized Send should be processed earlier + TEST_TRUE(Context.WhenPriorityReceived < ConnectionPriorityTestContext::NumSend); + + for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend-1; ++i) { + delete Streams[i]; + } +} + struct StreamBlockUnblockConnFlowControl { CxPlatEvent ClientStreamShutdownComplete; CxPlatEvent ClientStreamSendComplete; From 7183a93a89ad91ece10aff8b36aedc2b4325b203 Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Mon, 10 Jun 2024 17:03:06 -0700 Subject: [PATCH 06/17] blocking based --- src/test/lib/DataTest.cpp | 54 ++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/src/test/lib/DataTest.cpp b/src/test/lib/DataTest.cpp index 2f2f822a59..6c1cf7b841 100644 --- a/src/test/lib/DataTest.cpp +++ b/src/test/lib/DataTest.cpp @@ -3368,8 +3368,9 @@ struct ConnectionPriorityTestContext { uint32_t CurrentReceiveCount {0}; uint32_t WhenPriorityReceived {0}; CxPlatEvent AllReceivesComplete; + int InitialStreamStart {0}; - static QUIC_STATUS StreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) { + static QUIC_STATUS ServerStreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) { UNREFERENCED_PARAMETER(Stream); auto TestContext = (ConnectionPriorityTestContext*)Context; if (Event->Type == QUIC_STREAM_EVENT_RECEIVE) { @@ -3384,9 +3385,20 @@ struct ConnectionPriorityTestContext { return QUIC_STATUS_SUCCESS; } + static QUIC_STATUS ClientStreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) { + UNREFERENCED_PARAMETER(Stream); + auto TestContext = (ConnectionPriorityTestContext*)Context; + if (Event->Type == QUIC_STREAM_EVENT_START_COMPLETE) { + if (TestContext->InitialStreamStart++ == 0) { + CxPlatSleep(1000); // pseudo blocking operation + } + } + return QUIC_STATUS_SUCCESS; + } + static QUIC_STATUS ConnCallback(_In_ MsQuicConnection*, _In_opt_ void* Context, _Inout_ QUIC_CONNECTION_EVENT* Event) { if (Event->Type == QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED) { - new(std::nothrow) MsQuicStream(Event->PEER_STREAM_STARTED.Stream, CleanUpAutoDelete, StreamCallback, Context); + new(std::nothrow) MsQuicStream(Event->PEER_STREAM_STARTED.Stream, CleanUpAutoDelete, ServerStreamCallback, Context); } return QUIC_STATUS_SUCCESS; } @@ -3415,33 +3427,33 @@ void QuicTestConnectionPriority() MsQuicConnection Connection(Registration); TEST_QUIC_SUCCEEDED(Connection.GetInitStatus()); + TEST_QUIC_SUCCEEDED(Connection.Start(ClientConfiguration, ServerLocalAddr.GetFamily(), QUIC_TEST_LOOPBACK_FOR_AF(ServerLocalAddr.GetFamily()), ServerLocalAddr.GetPort())); + TEST_TRUE(Connection.HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout)); + TEST_TRUE(Connection.HandshakeComplete); + uint8_t RawBuffer[100]; - uint8_t PriorityRawBuffer[100]; - CxPlatCopyMemory(PriorityRawBuffer, ConnectionPriorityTestContext::PriorityTag, strlen(ConnectionPriorityTestContext::PriorityTag)); QUIC_BUFFER Buffer { sizeof(RawBuffer), RawBuffer }; - QUIC_BUFFER PriorityBuffer { sizeof(PriorityRawBuffer), PriorityRawBuffer }; - - MsQuicStream* Streams[ConnectionPriorityTestContext::NumSend-1] = {0}; - for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend-1; ++i) { - Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL); + MsQuicStream* Streams[ConnectionPriorityTestContext::NumSend] = {0}; + for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { + Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamCallback, &Context); TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); - } - for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend-1; ++i) { + // Queueing 100 StreamSendFlush operations during the Connection thread is blocked TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); } - MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL); - TEST_QUIC_SUCCEEDED(Stream.Send(&PriorityBuffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN | (QUIC_SEND_FLAGS)(QUIC_STREAM_START_FLAG_PRIORITY_WORK))); - TEST_QUIC_SUCCEEDED(Connection.Start(ClientConfiguration, ServerLocalAddr.GetFamily(), QUIC_TEST_LOOPBACK_FOR_AF(ServerLocalAddr.GetFamily()), ServerLocalAddr.GetPort())); - TEST_TRUE(Connection.HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout)); - TEST_TRUE(Connection.HandshakeComplete); + // This GetParam operation should be somewhere earlier than the 100th StreamSendFlush operation + QUIC_STATISTICS_V2 Stat = {0}; + uint32_t StatSize = sizeof(Stat); + QUIC_STATUS Status = MsQuic->GetParam( + Connection, + QUIC_PARAM_CONN_STATISTICS_V2_PLAT | QUIC_PARAM_HIGH_PRIORITY, + &StatSize, + &Stat); + fprintf(stderr, "Status:%d, SendTotalPackets:%ld, RecvTotalPackets:%ld, SendTotalBytes:%ld, RecvTotalBytes:%ld\n", + Status, Stat.SendTotalPackets, Stat.RecvTotalPackets, Stat.SendTotalBytes, Stat.RecvTotalBytes); TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout)); - // Assuming all the send operation is serialized in the Connection FIFO queue - // However the prioritized Send should be processed earlier - TEST_TRUE(Context.WhenPriorityReceived < ConnectionPriorityTestContext::NumSend); - - for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend-1; ++i) { + for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { delete Streams[i]; } } From 58f8abae7a10adefd749e428df55bee5fed32095 Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Tue, 11 Jun 2024 15:02:33 -0700 Subject: [PATCH 07/17] fix priority section management --- src/core/operation.c | 17 ++++++++--------- src/core/operation.h | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/core/operation.c b/src/core/operation.c index 05744d2d66..a1827dd5a6 100644 --- a/src/core/operation.c +++ b/src/core/operation.c @@ -36,7 +36,7 @@ QuicOperationQueueInitialize( OperQ->ActivelyProcessing = FALSE; CxPlatDispatchLockInitialize(&OperQ->Lock); CxPlatListInitializeHead(&OperQ->List); - OperQ->PriorityTail = &OperQ->List; + OperQ->PriorityTail = &OperQ->List.Flink; } _IRQL_requires_max_(DISPATCH_LEVEL) @@ -47,7 +47,7 @@ QuicOperationQueueUninitialize( { UNREFERENCED_PARAMETER(OperQ); CXPLAT_DBG_ASSERT(CxPlatListIsEmpty(&OperQ->List)); - CXPLAT_DBG_ASSERT(OperQ->PriorityTail == &OperQ->List); + CXPLAT_DBG_ASSERT(OperQ->PriorityTail == &OperQ->List.Flink); CxPlatDispatchLockUninitialize(&OperQ->Lock); } @@ -164,8 +164,7 @@ QuicOperationEnqueuePriority( CXPLAT_DBG_ASSERT(Oper->Link.Flink == NULL); #endif StartProcessing = CxPlatListIsEmpty(&OperQ->List) && !OperQ->ActivelyProcessing; - CxPlatListInsertTail(OperQ->PriorityTail, &Oper->Link); - OperQ->PriorityTail = &Oper->Link; + CxPlatListInsertTail(*OperQ->PriorityTail, &Oper->Link); CxPlatDispatchLockRelease(&OperQ->Lock); QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_OPER_QUEUED); QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_OPER_QUEUE_DEPTH); @@ -185,8 +184,8 @@ QuicOperationEnqueueFront( CXPLAT_DBG_ASSERT(Oper->Link.Flink == NULL); #endif StartProcessing = CxPlatListIsEmpty(&OperQ->List) && !OperQ->ActivelyProcessing; - if (OperQ->PriorityTail == &OperQ->List) { - OperQ->PriorityTail = &Oper->Link; + if (*OperQ->PriorityTail == &OperQ->List) { + OperQ->PriorityTail = &Oper->Link.Flink; } CxPlatListInsertHead(&OperQ->List, &Oper->Link); CxPlatDispatchLockRelease(&OperQ->Lock); @@ -214,8 +213,8 @@ QuicOperationDequeue( #if DEBUG Oper->Link.Flink = NULL; #endif - if (OperQ->PriorityTail == &Oper->Link) { - OperQ->PriorityTail = &OperQ->List; + if (*OperQ->PriorityTail == &Oper->Link) { + OperQ->PriorityTail = &OperQ->List.Flink; } } CxPlatDispatchLockRelease(&OperQ->Lock); @@ -239,7 +238,7 @@ QuicOperationQueueClear( CxPlatDispatchLockAcquire(&OperQ->Lock); OperQ->ActivelyProcessing = FALSE; CxPlatListMoveItems(&OperQ->List, &OldList); - OperQ->PriorityTail = &OperQ->List; + OperQ->PriorityTail = &OperQ->List.Flink; CxPlatDispatchLockRelease(&OperQ->Lock); int64_t OperationsDequeued = 0; diff --git a/src/core/operation.h b/src/core/operation.h index b646c16732..3242320792 100644 --- a/src/core/operation.h +++ b/src/core/operation.h @@ -296,7 +296,7 @@ typedef struct QUIC_OPERATION_QUEUE { // CXPLAT_DISPATCH_LOCK Lock; CXPLAT_LIST_ENTRY List; - CXPLAT_LIST_ENTRY* PriorityTail; // Tail of the priority queue. + CXPLAT_LIST_ENTRY** PriorityTail; // Tail of the priority queue. } QUIC_OPERATION_QUEUE; From 957d66512e73c9eeca664afb6b982cff95193eb1 Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Tue, 11 Jun 2024 16:21:57 -0700 Subject: [PATCH 08/17] Stream start/send priority test --- src/test/MsQuicTests.h | 6 +- src/test/bin/quic_gtest.cpp | 15 ++++- src/test/lib/DataTest.cpp | 123 +++++++++++++++++++++++++++++------- 3 files changed, 118 insertions(+), 26 deletions(-) diff --git a/src/test/MsQuicTests.h b/src/test/MsQuicTests.h index d0b87da63d..2db4a94a60 100644 --- a/src/test/MsQuicTests.h +++ b/src/test/MsQuicTests.h @@ -576,7 +576,11 @@ QuicTestStreamBlockUnblockConnFlowControl( ); void -QuicTestConnectionPriority( +QuicTestConnectionGetParamPriority( + ); + +void +QuicTestConnectionStreamStartSendPriority( ); void diff --git a/src/test/bin/quic_gtest.cpp b/src/test/bin/quic_gtest.cpp index 59bbc9614a..8c98597a80 100644 --- a/src/test/bin/quic_gtest.cpp +++ b/src/test/bin/quic_gtest.cpp @@ -2220,12 +2220,21 @@ TEST(Misc, StreamAbortConnFlowControl) { } } -TEST(Basic, ConnectionPriority) { - TestLogger Logger("QuicTestConnectionPriority"); +TEST(Basic, ConnectionGetParamPriority) { + TestLogger Logger("ConnectionGetParamPriority"); if (TestingKernelMode) { GTEST_SKIP(); // TODO } else { - QuicTestConnectionPriority(); + QuicTestConnectionGetParamPriority(); + } +} + +TEST(Basic, ConnectionStreamStartPriority) { + TestLogger Logger("ConnectionStreamStartPriority"); + if (TestingKernelMode) { + GTEST_SKIP(); // TODO + } else { + QuicTestConnectionStreamStartSendPriority(); } } diff --git a/src/test/lib/DataTest.cpp b/src/test/lib/DataTest.cpp index 6c1cf7b841..e694e1731f 100644 --- a/src/test/lib/DataTest.cpp +++ b/src/test/lib/DataTest.cpp @@ -3363,35 +3363,58 @@ QuicTestStreamAbortConnFlowControl( } struct ConnectionPriorityTestContext { - static const char* PriorityTag; static const uint8_t NumSend; - uint32_t CurrentReceiveCount {0}; - uint32_t WhenPriorityReceived {0}; CxPlatEvent AllReceivesComplete; - int InitialStreamStart {0}; + CxPlatEvent OperationQueuedComplete; + uint32_t CurrentSendCount {0}; + uint32_t CurrentStartCount {0}; + MsQuicStream* ExpectedStream {nullptr}; + bool TestSucceeded {false}; static QUIC_STATUS ServerStreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) { + UNREFERENCED_PARAMETER(Stream); + UNREFERENCED_PARAMETER(Context); + UNREFERENCED_PARAMETER(Event); + return QUIC_STATUS_SUCCESS; + } + + static QUIC_STATUS ClientGetParamStreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) { UNREFERENCED_PARAMETER(Stream); auto TestContext = (ConnectionPriorityTestContext*)Context; - if (Event->Type == QUIC_STREAM_EVENT_RECEIVE) { - TestContext->CurrentReceiveCount++; - if (memcmp(Event->RECEIVE.Buffers[0].Buffer, PriorityTag, strlen(PriorityTag)) == 0) { - TestContext->WhenPriorityReceived = TestContext->CurrentReceiveCount; + if (Event->Type == QUIC_STREAM_EVENT_START_COMPLETE) { + if (TestContext->CurrentSendCount == 0) { + // pseudo blocking operation. + // Needed for GetParam based test + CxPlatSleep(1000); } - if (TestContext->CurrentReceiveCount == NumSend) { + } else if (Event->Type == QUIC_STREAM_EVENT_SEND_COMPLETE) { + if (++TestContext->CurrentSendCount == TestContext->NumSend) { TestContext->AllReceivesComplete.Set(); } } return QUIC_STATUS_SUCCESS; } - static QUIC_STATUS ClientStreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) { + static QUIC_STATUS ClientStreamStartStreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) { UNREFERENCED_PARAMETER(Stream); auto TestContext = (ConnectionPriorityTestContext*)Context; if (Event->Type == QUIC_STREAM_EVENT_START_COMPLETE) { - if (TestContext->InitialStreamStart++ == 0) { - CxPlatSleep(1000); // pseudo blocking operation + if (TestContext->CurrentStartCount == 0) { + // initial dummy stream start to block this thread + TestContext->OperationQueuedComplete.WaitTimeout(TestWaitTimeout); + } else if (TestContext->CurrentStartCount == 1) { + fprintf(stderr, "START Stream %p started, ExpectedStream %p\n", Stream, TestContext->ExpectedStream); + TestContext->TestSucceeded = TestContext->ExpectedStream == Stream; + } + TestContext->CurrentStartCount++; + } else if (Event->Type == QUIC_STREAM_EVENT_SEND_COMPLETE) { + if (TestContext->CurrentSendCount == 1) { + fprintf(stderr, "SEND Stream %p send, ExpectedStream %p\n", Stream, TestContext->ExpectedStream); + TestContext->TestSucceeded = TestContext->TestSucceeded && (TestContext->ExpectedStream == Stream); + } else if (TestContext->CurrentSendCount == TestContext->NumSend) { + TestContext->AllReceivesComplete.Set(); } + TestContext->CurrentSendCount++; } return QUIC_STATUS_SUCCESS; } @@ -3403,10 +3426,9 @@ struct ConnectionPriorityTestContext { return QUIC_STATUS_SUCCESS; } }; -const char* ConnectionPriorityTestContext::PriorityTag = "priority"; const uint8_t ConnectionPriorityTestContext::NumSend = 100; -void QuicTestConnectionPriority() +void QuicTestConnectionGetParamPriority() { MsQuicRegistration Registration(true); TEST_QUIC_SUCCEEDED(Registration.GetInitStatus()); @@ -3431,28 +3453,85 @@ void QuicTestConnectionPriority() TEST_TRUE(Connection.HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout)); TEST_TRUE(Connection.HandshakeComplete); + QUIC_STATISTICS_V2 BaseStat = {0}; + uint32_t StatSize = sizeof(BaseStat); + TEST_QUIC_SUCCEEDED(MsQuic->GetParam( + Connection, + QUIC_PARAM_CONN_STATISTICS_V2_PLAT, + &StatSize, + &BaseStat)); + uint8_t RawBuffer[100]; QUIC_BUFFER Buffer { sizeof(RawBuffer), RawBuffer }; MsQuicStream* Streams[ConnectionPriorityTestContext::NumSend] = {0}; for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { - Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamCallback, &Context); + Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientGetParamStreamCallback, &Context); TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); // Queueing 100 StreamSendFlush operations during the Connection thread is blocked + // TEST_QUIC_SUCCEEDED(Streams[i]->Start(QUIC_STREAM_START_FLAG_IMMEDIATE)); + // TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_FIN)); TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); } - // This GetParam operation should be somewhere earlier than the 100th StreamSendFlush operation - QUIC_STATISTICS_V2 Stat = {0}; - uint32_t StatSize = sizeof(Stat); - QUIC_STATUS Status = MsQuic->GetParam( + QUIC_STATISTICS_V2 ActualStat = {0}; + TEST_QUIC_SUCCEEDED(MsQuic->GetParam( Connection, QUIC_PARAM_CONN_STATISTICS_V2_PLAT | QUIC_PARAM_HIGH_PRIORITY, &StatSize, - &Stat); - fprintf(stderr, "Status:%d, SendTotalPackets:%ld, RecvTotalPackets:%ld, SendTotalBytes:%ld, RecvTotalBytes:%ld\n", - Status, Stat.SendTotalPackets, Stat.RecvTotalPackets, Stat.SendTotalBytes, Stat.RecvTotalBytes); + &ActualStat)); + // if not prioritized, 300 + TEST_EQUAL(BaseStat.SendTotalStreamBytes, ActualStat.SendTotalStreamBytes); + + TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout)); + for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { + delete Streams[i]; + } +} + +void QuicTestConnectionStreamStartSendPriority() +{ + MsQuicRegistration Registration(true); + TEST_QUIC_SUCCEEDED(Registration.GetInitStatus()); + + MsQuicConfiguration ServerConfiguration(Registration, "MsQuicTest", MsQuicSettings().SetPeerUnidiStreamCount(3), ServerSelfSignedCredConfig); + TEST_QUIC_SUCCEEDED(ServerConfiguration.GetInitStatus()); + + MsQuicConfiguration ClientConfiguration(Registration, "MsQuicTest", MsQuicCredentialConfig()); + TEST_QUIC_SUCCEEDED(ClientConfiguration.GetInitStatus()); + + ConnectionPriorityTestContext Context; + MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, ConnectionPriorityTestContext::ConnCallback, &Context); + TEST_QUIC_SUCCEEDED(Listener.GetInitStatus()); + TEST_QUIC_SUCCEEDED(Listener.Start("MsQuicTest")); + QuicAddr ServerLocalAddr; + TEST_QUIC_SUCCEEDED(Listener.GetLocalAddr(ServerLocalAddr)); + + MsQuicConnection Connection(Registration); + TEST_QUIC_SUCCEEDED(Connection.GetInitStatus()); + + TEST_QUIC_SUCCEEDED(Connection.Start(ClientConfiguration, ServerLocalAddr.GetFamily(), QUIC_TEST_LOOPBACK_FOR_AF(ServerLocalAddr.GetFamily()), ServerLocalAddr.GetPort())); + TEST_TRUE(Connection.HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout)); + TEST_TRUE(Connection.HandshakeComplete); + + uint8_t RawBuffer[100]; + QUIC_BUFFER Buffer { sizeof(RawBuffer), RawBuffer }; + MsQuicStream* Streams[ConnectionPriorityTestContext::NumSend] = {0}; + for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { + Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context); + TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); + // Queueing 100 StreamSendFlush operations during the Connection thread is blocked + TEST_QUIC_SUCCEEDED(Streams[i]->Start(QUIC_STREAM_START_FLAG_IMMEDIATE)); + TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_FIN)); + } + + MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context); + Context.ExpectedStream = &Stream; + TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_PRIORITY_WORK)); + TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, QUIC_SEND_FLAG_FIN | QUIC_SEND_FLAG_PRIORITY_WORK)); + Context.OperationQueuedComplete.Set(); TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout)); + TEST_TRUE(Context.TestSucceeded); for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { delete Streams[i]; } From e554d1582c7ffbd3e677415280ebdc98faa60f0c Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Tue, 11 Jun 2024 17:31:06 -0700 Subject: [PATCH 09/17] cleanup --- src/test/lib/DataTest.cpp | 69 +++++++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 25 deletions(-) diff --git a/src/test/lib/DataTest.cpp b/src/test/lib/DataTest.cpp index e694e1731f..0144b84654 100644 --- a/src/test/lib/DataTest.cpp +++ b/src/test/lib/DataTest.cpp @@ -3382,7 +3382,7 @@ struct ConnectionPriorityTestContext { UNREFERENCED_PARAMETER(Stream); auto TestContext = (ConnectionPriorityTestContext*)Context; if (Event->Type == QUIC_STREAM_EVENT_START_COMPLETE) { - if (TestContext->CurrentSendCount == 0) { + if (TestContext->CurrentStartCount++ == 0) { // pseudo blocking operation. // Needed for GetParam based test CxPlatSleep(1000); @@ -3403,13 +3403,11 @@ struct ConnectionPriorityTestContext { // initial dummy stream start to block this thread TestContext->OperationQueuedComplete.WaitTimeout(TestWaitTimeout); } else if (TestContext->CurrentStartCount == 1) { - fprintf(stderr, "START Stream %p started, ExpectedStream %p\n", Stream, TestContext->ExpectedStream); TestContext->TestSucceeded = TestContext->ExpectedStream == Stream; } TestContext->CurrentStartCount++; } else if (Event->Type == QUIC_STREAM_EVENT_SEND_COMPLETE) { if (TestContext->CurrentSendCount == 1) { - fprintf(stderr, "SEND Stream %p send, ExpectedStream %p\n", Stream, TestContext->ExpectedStream); TestContext->TestSucceeded = TestContext->TestSucceeded && (TestContext->ExpectedStream == Stream); } else if (TestContext->CurrentSendCount == TestContext->NumSend) { TestContext->AllReceivesComplete.Set(); @@ -3439,8 +3437,7 @@ void QuicTestConnectionGetParamPriority() MsQuicConfiguration ClientConfiguration(Registration, "MsQuicTest", MsQuicCredentialConfig()); TEST_QUIC_SUCCEEDED(ClientConfiguration.GetInitStatus()); - ConnectionPriorityTestContext Context; - MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, ConnectionPriorityTestContext::ConnCallback, &Context); + MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, ConnectionPriorityTestContext::ConnCallback); TEST_QUIC_SUCCEEDED(Listener.GetInitStatus()); TEST_QUIC_SUCCEEDED(Listener.Start("MsQuicTest")); QuicAddr ServerLocalAddr; @@ -3463,13 +3460,12 @@ void QuicTestConnectionGetParamPriority() uint8_t RawBuffer[100]; QUIC_BUFFER Buffer { sizeof(RawBuffer), RawBuffer }; + ConnectionPriorityTestContext Context; MsQuicStream* Streams[ConnectionPriorityTestContext::NumSend] = {0}; for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientGetParamStreamCallback, &Context); TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); // Queueing 100 StreamSendFlush operations during the Connection thread is blocked - // TEST_QUIC_SUCCEEDED(Streams[i]->Start(QUIC_STREAM_START_FLAG_IMMEDIATE)); - // TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_FIN)); TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); } @@ -3499,8 +3495,7 @@ void QuicTestConnectionStreamStartSendPriority() MsQuicConfiguration ClientConfiguration(Registration, "MsQuicTest", MsQuicCredentialConfig()); TEST_QUIC_SUCCEEDED(ClientConfiguration.GetInitStatus()); - ConnectionPriorityTestContext Context; - MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, ConnectionPriorityTestContext::ConnCallback, &Context); + MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, ConnectionPriorityTestContext::ConnCallback); TEST_QUIC_SUCCEEDED(Listener.GetInitStatus()); TEST_QUIC_SUCCEEDED(Listener.Start("MsQuicTest")); QuicAddr ServerLocalAddr; @@ -3515,25 +3510,49 @@ void QuicTestConnectionStreamStartSendPriority() uint8_t RawBuffer[100]; QUIC_BUFFER Buffer { sizeof(RawBuffer), RawBuffer }; - MsQuicStream* Streams[ConnectionPriorityTestContext::NumSend] = {0}; - for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { - Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context); - TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); - // Queueing 100 StreamSendFlush operations during the Connection thread is blocked - TEST_QUIC_SUCCEEDED(Streams[i]->Start(QUIC_STREAM_START_FLAG_IMMEDIATE)); - TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_FIN)); + { // ooxxxxx...xxx + ConnectionPriorityTestContext Context; + MsQuicStream* Streams[ConnectionPriorityTestContext::NumSend] = {0}; + for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { + Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context); + TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); + // Queueing 100 StreamSendFlush operations during the Connection thread is blocked + TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); + } + + MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context); + Context.ExpectedStream = &Stream; + TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_PRIORITY_WORK)); + TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, QUIC_SEND_FLAG_FIN | QUIC_SEND_FLAG_PRIORITY_WORK)); + Context.OperationQueuedComplete.Set(); + + TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout)); + TEST_TRUE(Context.TestSucceeded); + for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { + delete Streams[i]; + } } - MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context); - Context.ExpectedStream = &Stream; - TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_PRIORITY_WORK)); - TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, QUIC_SEND_FLAG_FIN | QUIC_SEND_FLAG_PRIORITY_WORK)); - Context.OperationQueuedComplete.Set(); + { // oxxxx....xxxo + ConnectionPriorityTestContext Context; + MsQuicStream* Streams[ConnectionPriorityTestContext::NumSend] = {0}; + for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { + Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context); + TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); + TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); + } - TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout)); - TEST_TRUE(Context.TestSucceeded); - for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { - delete Streams[i]; + MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context); + Context.ExpectedStream = &Stream; + TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_PRIORITY_WORK)); + TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, QUIC_SEND_FLAG_FIN)); + Context.OperationQueuedComplete.Set(); + + TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout)); + TEST_FALSE(Context.TestSucceeded); + for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { + delete Streams[i]; + } } } From b9a3f1d43a95170c3086bf8c7bf7032359366f87 Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Tue, 11 Jun 2024 17:41:10 -0700 Subject: [PATCH 10/17] change test name and support Kernel mode --- src/test/MsQuicTests.h | 7 +- src/test/bin/quic_gtest.cpp | 17 +---- src/test/bin/winkernel/control.cpp | 4 + src/test/lib/DataTest.cpp | 118 +++++++++++------------------ 4 files changed, 58 insertions(+), 88 deletions(-) diff --git a/src/test/MsQuicTests.h b/src/test/MsQuicTests.h index 2db4a94a60..a7b77d45a9 100644 --- a/src/test/MsQuicTests.h +++ b/src/test/MsQuicTests.h @@ -576,7 +576,7 @@ QuicTestStreamBlockUnblockConnFlowControl( ); void -QuicTestConnectionGetParamPriority( +QuicTestOperationPriority( ); void @@ -1302,4 +1302,7 @@ typedef struct { #define IOCTL_QUIC_RUN_NTH_PACKET_DROP \ QUIC_CTL_CODE(121, METHOD_BUFFERED, FILE_WRITE_DATA) -#define QUIC_MAX_IOCTL_FUNC_CODE 121 +#define IOCTL_QUIC_RUN_OPERATION_PRIORITY \ + QUIC_CTL_CODE(122, METHOD_BUFFERED, FILE_WRITE_DATA) + +#define QUIC_MAX_IOCTL_FUNC_CODE 122 diff --git a/src/test/bin/quic_gtest.cpp b/src/test/bin/quic_gtest.cpp index 8c98597a80..dbba33c080 100644 --- a/src/test/bin/quic_gtest.cpp +++ b/src/test/bin/quic_gtest.cpp @@ -2220,21 +2220,12 @@ TEST(Misc, StreamAbortConnFlowControl) { } } -TEST(Basic, ConnectionGetParamPriority) { - TestLogger Logger("ConnectionGetParamPriority"); +TEST(Basic, OperationPriority) { + TestLogger Logger("OperationPriority"); if (TestingKernelMode) { - GTEST_SKIP(); // TODO + ASSERT_TRUE(DriverClient.Run(IOCTL_QUIC_RUN_OPERATION_PRIORITY)); } else { - QuicTestConnectionGetParamPriority(); - } -} - -TEST(Basic, ConnectionStreamStartPriority) { - TestLogger Logger("ConnectionStreamStartPriority"); - if (TestingKernelMode) { - GTEST_SKIP(); // TODO - } else { - QuicTestConnectionStreamStartSendPriority(); + QuicTestOperationPriority(); } } diff --git a/src/test/bin/winkernel/control.cpp b/src/test/bin/winkernel/control.cpp index f29866136f..ca315fec69 100644 --- a/src/test/bin/winkernel/control.cpp +++ b/src/test/bin/winkernel/control.cpp @@ -1452,6 +1452,10 @@ QuicTestCtlEvtIoDeviceControl( QuicTestCtlRun(QuicTestNthPacketDrop()); break; + case IOCTL_QUIC_RUN_OPERATION_PRIORITY: + QuicTestCtlRun(QuicTestOperationPriority()); + break; + default: Status = STATUS_NOT_IMPLEMENTED; break; diff --git a/src/test/lib/DataTest.cpp b/src/test/lib/DataTest.cpp index 0144b84654..057d35d6fe 100644 --- a/src/test/lib/DataTest.cpp +++ b/src/test/lib/DataTest.cpp @@ -3362,7 +3362,7 @@ QuicTestStreamAbortConnFlowControl( TEST_TRUE(Context.ClientStreamShutdownComplete.WaitTimeout(TestWaitTimeout)); } -struct ConnectionPriorityTestContext { +struct OperationPriorityTestContext { static const uint8_t NumSend; CxPlatEvent AllReceivesComplete; CxPlatEvent OperationQueuedComplete; @@ -3380,7 +3380,7 @@ struct ConnectionPriorityTestContext { static QUIC_STATUS ClientGetParamStreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) { UNREFERENCED_PARAMETER(Stream); - auto TestContext = (ConnectionPriorityTestContext*)Context; + auto TestContext = (OperationPriorityTestContext*)Context; if (Event->Type == QUIC_STREAM_EVENT_START_COMPLETE) { if (TestContext->CurrentStartCount++ == 0) { // pseudo blocking operation. @@ -3396,8 +3396,7 @@ struct ConnectionPriorityTestContext { } static QUIC_STATUS ClientStreamStartStreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) { - UNREFERENCED_PARAMETER(Stream); - auto TestContext = (ConnectionPriorityTestContext*)Context; + auto TestContext = (OperationPriorityTestContext*)Context; if (Event->Type == QUIC_STREAM_EVENT_START_COMPLETE) { if (TestContext->CurrentStartCount == 0) { // initial dummy stream start to block this thread @@ -3424,9 +3423,9 @@ struct ConnectionPriorityTestContext { return QUIC_STATUS_SUCCESS; } }; -const uint8_t ConnectionPriorityTestContext::NumSend = 100; +const uint8_t OperationPriorityTestContext::NumSend = 100; -void QuicTestConnectionGetParamPriority() +void QuicTestOperationPriority() { MsQuicRegistration Registration(true); TEST_QUIC_SUCCEEDED(Registration.GetInitStatus()); @@ -3437,7 +3436,7 @@ void QuicTestConnectionGetParamPriority() MsQuicConfiguration ClientConfiguration(Registration, "MsQuicTest", MsQuicCredentialConfig()); TEST_QUIC_SUCCEEDED(ClientConfiguration.GetInitStatus()); - MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, ConnectionPriorityTestContext::ConnCallback); + MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, OperationPriorityTestContext::ConnCallback); TEST_QUIC_SUCCEEDED(Listener.GetInitStatus()); TEST_QUIC_SUCCEEDED(Listener.Start("MsQuicTest")); QuicAddr ServerLocalAddr; @@ -3450,77 +3449,51 @@ void QuicTestConnectionGetParamPriority() TEST_TRUE(Connection.HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout)); TEST_TRUE(Connection.HandshakeComplete); - QUIC_STATISTICS_V2 BaseStat = {0}; - uint32_t StatSize = sizeof(BaseStat); - TEST_QUIC_SUCCEEDED(MsQuic->GetParam( - Connection, - QUIC_PARAM_CONN_STATISTICS_V2_PLAT, - &StatSize, - &BaseStat)); - uint8_t RawBuffer[100]; QUIC_BUFFER Buffer { sizeof(RawBuffer), RawBuffer }; - ConnectionPriorityTestContext Context; - MsQuicStream* Streams[ConnectionPriorityTestContext::NumSend] = {0}; - for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { - Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientGetParamStreamCallback, &Context); - TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); - // Queueing 100 StreamSendFlush operations during the Connection thread is blocked - TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); - } - - QUIC_STATISTICS_V2 ActualStat = {0}; - TEST_QUIC_SUCCEEDED(MsQuic->GetParam( - Connection, - QUIC_PARAM_CONN_STATISTICS_V2_PLAT | QUIC_PARAM_HIGH_PRIORITY, - &StatSize, - &ActualStat)); - // if not prioritized, 300 - TEST_EQUAL(BaseStat.SendTotalStreamBytes, ActualStat.SendTotalStreamBytes); - - TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout)); - for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { - delete Streams[i]; - } -} - -void QuicTestConnectionStreamStartSendPriority() -{ - MsQuicRegistration Registration(true); - TEST_QUIC_SUCCEEDED(Registration.GetInitStatus()); - - MsQuicConfiguration ServerConfiguration(Registration, "MsQuicTest", MsQuicSettings().SetPeerUnidiStreamCount(3), ServerSelfSignedCredConfig); - TEST_QUIC_SUCCEEDED(ServerConfiguration.GetInitStatus()); - - MsQuicConfiguration ClientConfiguration(Registration, "MsQuicTest", MsQuicCredentialConfig()); - TEST_QUIC_SUCCEEDED(ClientConfiguration.GetInitStatus()); + MsQuicStream* Streams[OperationPriorityTestContext::NumSend] = {0}; + { + OperationPriorityTestContext Context; + QUIC_STATISTICS_V2 BaseStat = {0}; + uint32_t StatSize = sizeof(BaseStat); + TEST_QUIC_SUCCEEDED(MsQuic->GetParam( + Connection, + QUIC_PARAM_CONN_STATISTICS_V2_PLAT, + &StatSize, + &BaseStat)); - MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, ConnectionPriorityTestContext::ConnCallback); - TEST_QUIC_SUCCEEDED(Listener.GetInitStatus()); - TEST_QUIC_SUCCEEDED(Listener.Start("MsQuicTest")); - QuicAddr ServerLocalAddr; - TEST_QUIC_SUCCEEDED(Listener.GetLocalAddr(ServerLocalAddr)); + for (uint8_t i = 0; i < OperationPriorityTestContext::NumSend; ++i) { + Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientGetParamStreamCallback, &Context); + TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); + // Queueing 100 StreamSendFlush operations during the Connection thread is blocked + TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); + } - MsQuicConnection Connection(Registration); - TEST_QUIC_SUCCEEDED(Connection.GetInitStatus()); + QUIC_STATISTICS_V2 ActualStat = {0}; + TEST_QUIC_SUCCEEDED(MsQuic->GetParam( + Connection, + QUIC_PARAM_CONN_STATISTICS_V2_PLAT | QUIC_PARAM_HIGH_PRIORITY, + &StatSize, + &ActualStat)); + // if not prioritized, 300 + TEST_EQUAL(BaseStat.SendTotalStreamBytes, ActualStat.SendTotalStreamBytes); - TEST_QUIC_SUCCEEDED(Connection.Start(ClientConfiguration, ServerLocalAddr.GetFamily(), QUIC_TEST_LOOPBACK_FOR_AF(ServerLocalAddr.GetFamily()), ServerLocalAddr.GetPort())); - TEST_TRUE(Connection.HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout)); - TEST_TRUE(Connection.HandshakeComplete); + TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout)); + for (uint8_t i = 0; i < OperationPriorityTestContext::NumSend; ++i) { + delete Streams[i]; + } + } - uint8_t RawBuffer[100]; - QUIC_BUFFER Buffer { sizeof(RawBuffer), RawBuffer }; { // ooxxxxx...xxx - ConnectionPriorityTestContext Context; - MsQuicStream* Streams[ConnectionPriorityTestContext::NumSend] = {0}; - for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { - Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context); + OperationPriorityTestContext Context; + for (uint8_t i = 0; i < OperationPriorityTestContext::NumSend; ++i) { + Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); // Queueing 100 StreamSendFlush operations during the Connection thread is blocked TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); } - MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context); + MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); Context.ExpectedStream = &Stream; TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_PRIORITY_WORK)); TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, QUIC_SEND_FLAG_FIN | QUIC_SEND_FLAG_PRIORITY_WORK)); @@ -3528,21 +3501,20 @@ void QuicTestConnectionStreamStartSendPriority() TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout)); TEST_TRUE(Context.TestSucceeded); - for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { + for (uint8_t i = 0; i < OperationPriorityTestContext::NumSend; ++i) { delete Streams[i]; } } { // oxxxx....xxxo - ConnectionPriorityTestContext Context; - MsQuicStream* Streams[ConnectionPriorityTestContext::NumSend] = {0}; - for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { - Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context); + OperationPriorityTestContext Context; + for (uint8_t i = 0; i < OperationPriorityTestContext::NumSend; ++i) { + Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); } - MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, ConnectionPriorityTestContext::ClientStreamStartStreamCallback, &Context); + MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); Context.ExpectedStream = &Stream; TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_PRIORITY_WORK)); TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, QUIC_SEND_FLAG_FIN)); @@ -3550,7 +3522,7 @@ void QuicTestConnectionStreamStartSendPriority() TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout)); TEST_FALSE(Context.TestSucceeded); - for (uint8_t i = 0; i < ConnectionPriorityTestContext::NumSend; ++i) { + for (uint8_t i = 0; i < OperationPriorityTestContext::NumSend; ++i) { delete Streams[i]; } } From 4fe6c399ea2c7bbd452517cf181567a01bcc7fb0 Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Tue, 11 Jun 2024 17:56:27 -0700 Subject: [PATCH 11/17] add description for each test --- src/test/lib/DataTest.cpp | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/test/lib/DataTest.cpp b/src/test/lib/DataTest.cpp index 057d35d6fe..d7658a2ba5 100644 --- a/src/test/lib/DataTest.cpp +++ b/src/test/lib/DataTest.cpp @@ -3452,6 +3452,8 @@ void QuicTestOperationPriority() uint8_t RawBuffer[100]; QUIC_BUFFER Buffer { sizeof(RawBuffer), RawBuffer }; MsQuicStream* Streams[OperationPriorityTestContext::NumSend] = {0}; + // Insert GetParam in front of 100 StreamSend ops + // Validate by comparing SendTotalStreamBytes on the statistics { OperationPriorityTestContext Context; QUIC_STATISTICS_V2 BaseStat = {0}; @@ -3478,12 +3480,21 @@ void QuicTestOperationPriority() // if not prioritized, 300 TEST_EQUAL(BaseStat.SendTotalStreamBytes, ActualStat.SendTotalStreamBytes); + TEST_QUIC_SUCCEEDED(MsQuic->GetParam( + Connection, + QUIC_PARAM_CONN_STATISTICS_V2_PLAT, + &StatSize, + &ActualStat)); + TEST_NOT_EQUAL(BaseStat.SendTotalStreamBytes, ActualStat.SendTotalStreamBytes); + TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout)); for (uint8_t i = 0; i < OperationPriorityTestContext::NumSend; ++i) { delete Streams[i]; } } + // Insert StreamStart and StreamSend in front of 100 StreamSend ops + // Validate by whether the first processed StreamStart/Send are from specific ExpectedStream { // ooxxxxx...xxx OperationPriorityTestContext Context; for (uint8_t i = 0; i < OperationPriorityTestContext::NumSend; ++i) { @@ -3506,6 +3517,8 @@ void QuicTestOperationPriority() } } + // Insert StreamStart in front of 100 StreamSend ops, but StreamSend is not + // Validate by whether the first processed StreamStart are from specific ExpectedStream, StreamSend is not { // oxxxx....xxxo OperationPriorityTestContext Context; for (uint8_t i = 0; i < OperationPriorityTestContext::NumSend; ++i) { From 9259888163835a36c3760267c8fbb7ffe853ef61 Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Tue, 11 Jun 2024 17:56:43 -0700 Subject: [PATCH 12/17] add one more element to QUIC_IOCTL_BUFFER_SIZES --- src/test/bin/winkernel/control.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/bin/winkernel/control.cpp b/src/test/bin/winkernel/control.cpp index ca315fec69..54db2cb9cf 100644 --- a/src/test/bin/winkernel/control.cpp +++ b/src/test/bin/winkernel/control.cpp @@ -520,6 +520,7 @@ size_t QUIC_IOCTL_BUFFER_SIZES[] = sizeof(uint32_t), sizeof(BOOLEAN), 0, + 0, }; CXPLAT_STATIC_ASSERT( From aed1513a2323770215cb1685652c6af5ebe56442 Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Wed, 12 Jun 2024 10:44:01 -0700 Subject: [PATCH 13/17] fix kernel build error by 'new' --- src/test/lib/DataTest.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/test/lib/DataTest.cpp b/src/test/lib/DataTest.cpp index d7658a2ba5..d2dc5edd48 100644 --- a/src/test/lib/DataTest.cpp +++ b/src/test/lib/DataTest.cpp @@ -3430,7 +3430,7 @@ void QuicTestOperationPriority() MsQuicRegistration Registration(true); TEST_QUIC_SUCCEEDED(Registration.GetInitStatus()); - MsQuicConfiguration ServerConfiguration(Registration, "MsQuicTest", MsQuicSettings().SetPeerUnidiStreamCount(3), ServerSelfSignedCredConfig); + MsQuicConfiguration ServerConfiguration(Registration, "MsQuicTest", MsQuicSettings().SetPeerUnidiStreamCount(OperationPriorityTestContext::NumSend), ServerSelfSignedCredConfig); TEST_QUIC_SUCCEEDED(ServerConfiguration.GetInitStatus()); MsQuicConfiguration ClientConfiguration(Registration, "MsQuicTest", MsQuicCredentialConfig()); @@ -3465,7 +3465,7 @@ void QuicTestOperationPriority() &BaseStat)); for (uint8_t i = 0; i < OperationPriorityTestContext::NumSend; ++i) { - Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientGetParamStreamCallback, &Context); + Streams[i] = new(std::nothrow) MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientGetParamStreamCallback, &Context); TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); // Queueing 100 StreamSendFlush operations during the Connection thread is blocked TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); @@ -3477,7 +3477,6 @@ void QuicTestOperationPriority() QUIC_PARAM_CONN_STATISTICS_V2_PLAT | QUIC_PARAM_HIGH_PRIORITY, &StatSize, &ActualStat)); - // if not prioritized, 300 TEST_EQUAL(BaseStat.SendTotalStreamBytes, ActualStat.SendTotalStreamBytes); TEST_QUIC_SUCCEEDED(MsQuic->GetParam( @@ -3498,7 +3497,7 @@ void QuicTestOperationPriority() { // ooxxxxx...xxx OperationPriorityTestContext Context; for (uint8_t i = 0; i < OperationPriorityTestContext::NumSend; ++i) { - Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); + Streams[i] = new(std::nothrow) MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); // Queueing 100 StreamSendFlush operations during the Connection thread is blocked TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); @@ -3522,7 +3521,7 @@ void QuicTestOperationPriority() { // oxxxx....xxxo OperationPriorityTestContext Context; for (uint8_t i = 0; i < OperationPriorityTestContext::NumSend; ++i) { - Streams[i] = new MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); + Streams[i] = new(std::nothrow) MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); } From 7bf61b0c654867802e0cb657da211d2a8a528b64 Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Wed, 12 Jun 2024 11:54:34 -0700 Subject: [PATCH 14/17] remove tail adjustment --- src/core/operation.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/core/operation.c b/src/core/operation.c index a1827dd5a6..3c5b7c11f8 100644 --- a/src/core/operation.c +++ b/src/core/operation.c @@ -184,9 +184,6 @@ QuicOperationEnqueueFront( CXPLAT_DBG_ASSERT(Oper->Link.Flink == NULL); #endif StartProcessing = CxPlatListIsEmpty(&OperQ->List) && !OperQ->ActivelyProcessing; - if (*OperQ->PriorityTail == &OperQ->List) { - OperQ->PriorityTail = &Oper->Link.Flink; - } CxPlatListInsertHead(&OperQ->List, &Oper->Link); CxPlatDispatchLockRelease(&OperQ->Lock); QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_OPER_QUEUED); From b44533d1f7647d6f4f44cf9ebde2d16969bbaeca Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Wed, 12 Jun 2024 13:43:01 -0700 Subject: [PATCH 15/17] fix PriorityTail management --- src/core/operation.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/core/operation.c b/src/core/operation.c index 3c5b7c11f8..96f1bfcc86 100644 --- a/src/core/operation.c +++ b/src/core/operation.c @@ -165,6 +165,7 @@ QuicOperationEnqueuePriority( #endif StartProcessing = CxPlatListIsEmpty(&OperQ->List) && !OperQ->ActivelyProcessing; CxPlatListInsertTail(*OperQ->PriorityTail, &Oper->Link); + OperQ->PriorityTail = &Oper->Link.Flink; CxPlatDispatchLockRelease(&OperQ->Lock); QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_OPER_QUEUED); QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_OPER_QUEUE_DEPTH); @@ -185,6 +186,9 @@ QuicOperationEnqueueFront( #endif StartProcessing = CxPlatListIsEmpty(&OperQ->List) && !OperQ->ActivelyProcessing; CxPlatListInsertHead(&OperQ->List, &Oper->Link); + if (OperQ->PriorityTail == &OperQ->List.Flink) { + OperQ->PriorityTail = &Oper->Link.Flink; + } CxPlatDispatchLockRelease(&OperQ->Lock); QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_OPER_QUEUED); QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_OPER_QUEUE_DEPTH); @@ -210,7 +214,7 @@ QuicOperationDequeue( #if DEBUG Oper->Link.Flink = NULL; #endif - if (*OperQ->PriorityTail == &Oper->Link) { + if (OperQ->PriorityTail == &Oper->Link.Flink) { OperQ->PriorityTail = &OperQ->List.Flink; } } From a23bd3e0dafbc08a762cfea324adc8a302a13598 Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Wed, 12 Jun 2024 13:46:43 -0700 Subject: [PATCH 16/17] update test --- src/test/lib/DataTest.cpp | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/src/test/lib/DataTest.cpp b/src/test/lib/DataTest.cpp index d2dc5edd48..76e1733d30 100644 --- a/src/test/lib/DataTest.cpp +++ b/src/test/lib/DataTest.cpp @@ -3366,6 +3366,7 @@ struct OperationPriorityTestContext { static const uint8_t NumSend; CxPlatEvent AllReceivesComplete; CxPlatEvent OperationQueuedComplete; + CxPlatEvent BlockAfterInitialStart; uint32_t CurrentSendCount {0}; uint32_t CurrentStartCount {0}; MsQuicStream* ExpectedStream {nullptr}; @@ -3400,13 +3401,15 @@ struct OperationPriorityTestContext { if (Event->Type == QUIC_STREAM_EVENT_START_COMPLETE) { if (TestContext->CurrentStartCount == 0) { // initial dummy stream start to block this thread + TestContext->BlockAfterInitialStart.Set(); + // Wait until all operations are queued TestContext->OperationQueuedComplete.WaitTimeout(TestWaitTimeout); } else if (TestContext->CurrentStartCount == 1) { TestContext->TestSucceeded = TestContext->ExpectedStream == Stream; } TestContext->CurrentStartCount++; } else if (Event->Type == QUIC_STREAM_EVENT_SEND_COMPLETE) { - if (TestContext->CurrentSendCount == 1) { + if (TestContext->CurrentSendCount == 0) { TestContext->TestSucceeded = TestContext->TestSucceeded && (TestContext->ExpectedStream == Stream); } else if (TestContext->CurrentSendCount == TestContext->NumSend) { TestContext->AllReceivesComplete.Set(); @@ -3455,6 +3458,7 @@ void QuicTestOperationPriority() // Insert GetParam in front of 100 StreamSend ops // Validate by comparing SendTotalStreamBytes on the statistics { + // NOTE: this test can be flaky if all the operations are not queued during the Connection thread is blocked OperationPriorityTestContext Context; QUIC_STATISTICS_V2 BaseStat = {0}; uint32_t StatSize = sizeof(BaseStat); @@ -3496,6 +3500,14 @@ void QuicTestOperationPriority() // Validate by whether the first processed StreamStart/Send are from specific ExpectedStream { // ooxxxxx...xxx OperationPriorityTestContext Context; + MsQuicStream Stream1(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); + MsQuicStream Stream2(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); + Context.ExpectedStream = &Stream2; + + Stream1.Start(QUIC_STREAM_START_FLAG_IMMEDIATE); + // Wait until this StreamStart operation is drained + TEST_TRUE(Context.BlockAfterInitialStart.WaitTimeout(TestWaitTimeout)); + for (uint8_t i = 0; i < OperationPriorityTestContext::NumSend; ++i) { Streams[i] = new(std::nothrow) MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); @@ -3503,11 +3515,9 @@ void QuicTestOperationPriority() TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); } - MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); - Context.ExpectedStream = &Stream; - TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_PRIORITY_WORK)); - TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, QUIC_SEND_FLAG_FIN | QUIC_SEND_FLAG_PRIORITY_WORK)); - Context.OperationQueuedComplete.Set(); + TEST_QUIC_SUCCEEDED(Stream2.Start(QUIC_STREAM_START_FLAG_PRIORITY_WORK)); + TEST_QUIC_SUCCEEDED(Stream2.Send(&Buffer, 1, QUIC_SEND_FLAG_FIN | QUIC_SEND_FLAG_PRIORITY_WORK)); + Context.OperationQueuedComplete.Set(); // All operations are queued. Kick off processing the operations TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout)); TEST_TRUE(Context.TestSucceeded); @@ -3520,17 +3530,23 @@ void QuicTestOperationPriority() // Validate by whether the first processed StreamStart are from specific ExpectedStream, StreamSend is not { // oxxxx....xxxo OperationPriorityTestContext Context; + MsQuicStream Stream1(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); + MsQuicStream Stream2(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); + Context.ExpectedStream = &Stream2; + + Stream1.Start(QUIC_STREAM_START_FLAG_IMMEDIATE); + // Wait until this StreamStart operation is drained + TEST_TRUE(Context.BlockAfterInitialStart.WaitTimeout(TestWaitTimeout)); + for (uint8_t i = 0; i < OperationPriorityTestContext::NumSend; ++i) { Streams[i] = new(std::nothrow) MsQuicStream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); TEST_QUIC_SUCCEEDED(Streams[i]->GetInitStatus()); TEST_QUIC_SUCCEEDED(Streams[i]->Send(&Buffer, 1, QUIC_SEND_FLAG_START | QUIC_SEND_FLAG_FIN)); } - MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL, CleanUpManual, OperationPriorityTestContext::ClientStreamStartStreamCallback, &Context); - Context.ExpectedStream = &Stream; - TEST_QUIC_SUCCEEDED(Stream.Start(QUIC_STREAM_START_FLAG_PRIORITY_WORK)); - TEST_QUIC_SUCCEEDED(Stream.Send(&Buffer, 1, QUIC_SEND_FLAG_FIN)); - Context.OperationQueuedComplete.Set(); + TEST_QUIC_SUCCEEDED(Stream2.Start(QUIC_STREAM_START_FLAG_PRIORITY_WORK)); + TEST_QUIC_SUCCEEDED(Stream2.Send(&Buffer, 1, QUIC_SEND_FLAG_FIN)); + Context.OperationQueuedComplete.Set(); // All operations are queued. Kick off processing the operations TEST_TRUE(Context.AllReceivesComplete.WaitTimeout(TestWaitTimeout)); TEST_FALSE(Context.TestSucceeded); From 3d71a11bc79bbb53a8eb5e0b6da4b4caa578e0eb Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Fri, 14 Jun 2024 12:05:58 -0700 Subject: [PATCH 17/17] revert spinquic change --- src/tools/spin/spinquic.cpp | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/src/tools/spin/spinquic.cpp b/src/tools/spin/spinquic.cpp index 1ad4f439fa..ec1cafaefc 100644 --- a/src/tools/spin/spinquic.cpp +++ b/src/tools/spin/spinquic.cpp @@ -582,9 +582,6 @@ struct SetParamHelper { void SetUint64(uint32_t _Type, uint64_t Value) { Type = _Type; Param.u64 = Value; Size = sizeof(Value); } - void SetPriority() { - Type |= QUIC_PARAM_HIGH_PRIORITY; - } void Apply(HQUIC Handle) { if (Type != -1) { MsQuic.SetParam(Handle, Type, Size, IsPtr ? Param.ptr : &Param); @@ -831,10 +828,6 @@ void SpinQuicSetRandomConnectionParam(HQUIC Connection, uint16_t ThreadID) break; } - if (GetRandom(2)) { - Helper.SetPriority(); - } - Helper.Apply(Connection); } @@ -860,10 +853,6 @@ void SpinQuicSetRandomStreamParam(HQUIC Stream, uint16_t ThreadID) break; } - if (GetRandom(2)) { - Helper.SetPriority(); - } - Helper.Apply(Stream); } @@ -891,10 +880,6 @@ void SpinQuicGetRandomParam(HQUIC Handle, uint16_t ThreadID) uint32_t Param = (uint32_t)GetRandom(((ParamCounts[Level] & 0xFFFFFFF)) + 1); uint32_t Combined = ((Level+1) << 28) + Param; - if (GetRandom(2)) { - Combined |= QUIC_PARAM_HIGH_PRIORITY; - } - uint8_t OutBuffer[200]; uint32_t OutBufferLength = (uint32_t)GetRandom(sizeof(OutBuffer) + 1); @@ -1024,7 +1009,7 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste Buffer->Buffer = Gb.SendBuffer + StreamCtx->SendOffset; Buffer->Length = Length; if (QUIC_SUCCEEDED( - MsQuic.StreamSend(Stream, Buffer, 1, (QUIC_SEND_FLAGS)GetRandom(128), Buffer))) { + MsQuic.StreamSend(Stream, Buffer, 1, (QUIC_SEND_FLAGS)GetRandom(16), Buffer))) { StreamCtx->SendOffset = (uint8_t)(StreamCtx->SendOffset + Length); } else { delete Buffer; @@ -1159,7 +1144,7 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste if (Buffer) { Buffer->Buffer = Gb.SendBuffer; Buffer->Length = MaxBufferSizes[GetRandom(BufferCount)]; - if (QUIC_FAILED(MsQuic.DatagramSend(Connection, Buffer, 1, (QUIC_SEND_FLAGS)GetRandom(128), Buffer))) { + if (QUIC_FAILED(MsQuic.DatagramSend(Connection, Buffer, 1, (QUIC_SEND_FLAGS)GetRandom(8), Buffer))) { delete Buffer; } }