diff --git a/src/core/api.c b/src/core/api.c index 0dfed4d4d5..37d0e2994f 100644 --- a/src/core/api.c +++ b/src/core/api.c @@ -828,6 +828,19 @@ MsQuicStreamStart( goto Exit; } + if (Flags & QUIC_STREAM_START_FLAG_FAIL_BLOCKED_INLINE) { + BOOLEAN NewStreamBlocked; + Status = + QuicStreamSetNewLocalStreamID( + &Connection->Streams, + TRUE, + Stream, + &NewStreamBlocked); + if (QUIC_FAILED(Status)) { + goto Exit; + } + } + QUIC_OPERATION* Oper = QuicOperationAlloc(Connection->Worker, QUIC_OPER_TYPE_API_CALL); if (Oper == NULL) { diff --git a/src/core/stream.c b/src/core/stream.c index 0d5973bb2e..c93e3c76be 100644 --- a/src/core/stream.c +++ b/src/core/stream.c @@ -244,20 +244,10 @@ QuicStreamStart( } if (!IsRemoteStream) { - uint8_t Type = - QuicConnIsServer(Stream->Connection) ? - STREAM_ID_FLAG_IS_SERVER : - STREAM_ID_FLAG_IS_CLIENT; - - if (Stream->Flags.Unidirectional) { - Type |= STREAM_ID_FLAG_IS_UNI_DIR; - } - Status = QuicStreamSetNewLocalStream( &Stream->Connection->Streams, - Type, - !!(Flags & QUIC_STREAM_START_FLAG_FAIL_BLOCKED), + Flags & QUIC_STREAM_START_FLAG_FAIL_BLOCKED, // Inline flag was already handled Stream); if (QUIC_FAILED(Status)) { goto Exit; @@ -763,7 +753,7 @@ QuicStreamParamGet( break; } - if (!Stream->Flags.Started) { + if (Stream->ID == UINT64_MAX) { Status = QUIC_STATUS_INVALID_STATE; break; } diff --git a/src/core/stream_set.c b/src/core/stream_set.c index 5d37a8a15c..16b8efa9d9 100644 --- a/src/core/stream_set.c +++ b/src/core/stream_set.c @@ -49,6 +49,7 @@ QuicStreamSetInitialize( ) { CxPlatListInitializeHead(&StreamSet->ClosedStreams); + CxPlatDispatchLockInitialize(&StreamSet->TypesLock); #if DEBUG CxPlatListInitializeHead(&StreamSet->AllStreams); CxPlatDispatchLockInitialize(&StreamSet->AllStreamsLock); @@ -64,6 +65,7 @@ QuicStreamSetUninitialize( if (StreamSet->StreamTable != NULL) { CxPlatHashtableUninitialize(StreamSet->StreamTable); } + CxPlatDispatchLockUninitialize(&StreamSet->TypesLock); #if DEBUG CxPlatDispatchLockUninitialize(&StreamSet->AllStreamsLock); #endif @@ -191,8 +193,10 @@ QuicStreamSetReleaseStream( uint8_t Flags = (uint8_t)(Stream->ID & STREAM_ID_MASK); QUIC_STREAM_TYPE_INFO* Info = &StreamSet->Types[Flags]; + CxPlatDispatchLockAcquire(&StreamSet->TypesLock); CXPLAT_DBG_ASSERT(Info->CurrentStreamCount != 0); Info->CurrentStreamCount--; + CxPlatDispatchLockRelease(&StreamSet->TypesLock); if ((Flags & STREAM_ID_FLAG_IS_SERVER) == QuicConnIsServer(Stream->Connection)) { // @@ -520,57 +524,88 @@ QuicStreamSetGetFlowControlSummary( } } -_IRQL_requires_max_(PASSIVE_LEVEL) +_IRQL_requires_max_(DISPATCH_LEVEL) QUIC_STATUS -QuicStreamSetNewLocalStream( +QuicStreamSetNewLocalStreamID( _Inout_ QUIC_STREAM_SET* StreamSet, - _In_ uint8_t Type, _In_ BOOLEAN FailOnBlocked, - _In_ QUIC_STREAM* Stream + _In_ QUIC_STREAM* Stream, + _Out_ BOOLEAN* NewStreamBlocked ) { + CXPLAT_DBG_ASSERT(Stream->ID == UINT64_MAX); + QUIC_STATUS Status = QUIC_STATUS_SUCCESS; + uint8_t Type = + QuicConnIsServer(Stream->Connection) ? + STREAM_ID_FLAG_IS_SERVER : + STREAM_ID_FLAG_IS_CLIENT; + if (Stream->Flags.Unidirectional) { + Type |= STREAM_ID_FLAG_IS_UNI_DIR; + } QUIC_STREAM_TYPE_INFO* Info = &StreamSet->Types[Type]; + + CxPlatDispatchLockAcquire(&StreamSet->TypesLock); + uint64_t NewStreamId = Type + (Info->TotalStreamCount << 2); - BOOLEAN NewStreamBlocked = Info->TotalStreamCount >= Info->MaxTotalStreamCount; + *NewStreamBlocked = Info->TotalStreamCount >= Info->MaxTotalStreamCount; - if (FailOnBlocked && NewStreamBlocked) { - if (Stream->Connection->State.PeerTransportParameterValid) { - QuicSendSetSendFlag( - &Stream->Connection->Send, - STREAM_ID_IS_UNI_DIR(Type) ? - QUIC_CONN_SEND_FLAG_UNI_STREAMS_BLOCKED : QUIC_CONN_SEND_FLAG_BIDI_STREAMS_BLOCKED); - } + if (FailOnBlocked && *NewStreamBlocked) { Status = QUIC_STATUS_STREAM_LIMIT_REACHED; - goto Exit; + } else { + Stream->ID = NewStreamId; + Info->CurrentStreamCount++; + Info->TotalStreamCount++; } - Stream->ID = NewStreamId; + CxPlatDispatchLockRelease(&StreamSet->TypesLock); - if (!QuicStreamSetInsertStream(StreamSet, Stream)) { - Status = QUIC_STATUS_OUT_OF_MEMORY; - Stream->ID = UINT64_MAX; - goto Exit; - } + return Status; +} - if (NewStreamBlocked) { - // - // We don't call QuicStreamAddOutFlowBlockedReason here because we haven't - // logged the stream created event yet at this point. We will log the event - // after that. - // - Stream->OutFlowBlockedReasons |= QUIC_FLOW_BLOCKED_STREAM_ID_FLOW_CONTROL; - Stream->BlockedTimings.StreamIdFlowControl.LastStartTimeUs = CxPlatTimeUs64(); - if (Stream->Connection->State.PeerTransportParameterValid) { - QuicSendSetSendFlag( - &Stream->Connection->Send, - STREAM_ID_IS_UNI_DIR(Stream->ID) ? - QUIC_CONN_SEND_FLAG_UNI_STREAMS_BLOCKED : QUIC_CONN_SEND_FLAG_BIDI_STREAMS_BLOCKED); +_IRQL_requires_max_(PASSIVE_LEVEL) +QUIC_STATUS +QuicStreamSetNewLocalStream( + _Inout_ QUIC_STREAM_SET* StreamSet, + _In_ BOOLEAN FailOnBlocked, + _In_ QUIC_STREAM* Stream + ) +{ + QUIC_STATUS Status = QUIC_STATUS_SUCCESS; + + if (Stream->ID == UINT64_MAX) { + BOOLEAN NewStreamBlocked; + Status = + QuicStreamSetNewLocalStreamID( + StreamSet, + FailOnBlocked, + Stream, + &NewStreamBlocked); + if (QUIC_FAILED(Status)) { + goto Exit; + } + if (NewStreamBlocked) { + // + // We don't call QuicStreamAddOutFlowBlockedReason here because we + // haven't logged the stream created event yet at this point. We + // will log the event after that. + // + Stream->OutFlowBlockedReasons |= QUIC_FLOW_BLOCKED_STREAM_ID_FLOW_CONTROL; + Stream->BlockedTimings.StreamIdFlowControl.LastStartTimeUs = CxPlatTimeUs64(); + if (Stream->Connection->State.PeerTransportParameterValid) { + QuicSendSetSendFlag( + &Stream->Connection->Send, + STREAM_ID_IS_UNI_DIR(Stream->ID) ? + QUIC_CONN_SEND_FLAG_UNI_STREAMS_BLOCKED : QUIC_CONN_SEND_FLAG_BIDI_STREAMS_BLOCKED); + } } } - Info->CurrentStreamCount++; - Info->TotalStreamCount++; + if (!QuicStreamSetInsertStream(StreamSet, Stream)) { + Status = QUIC_STATUS_OUT_OF_MEMORY; + QuicConnFatalError(Stream->Connection, Status, "StreamSetInsertStream failed"); + goto Exit; + } QuicStreamAddRef(Stream, QUIC_STREAM_REF_STREAM_SET); diff --git a/src/core/stream_set.h b/src/core/stream_set.h index b489fa8b67..a539488f76 100644 --- a/src/core/stream_set.h +++ b/src/core/stream_set.h @@ -51,6 +51,11 @@ typedef struct QUIC_STREAM_SET { // CXPLAT_LIST_ENTRY ClosedStreams; + // + // Used to protect access to the Type variable across threads. + // + CXPLAT_DISPATCH_LOCK TypesLock; + #if DEBUG // // The list of allocated streams for leak tracking. @@ -172,6 +177,19 @@ QuicStreamSetGetFlowControlSummary( _Out_ uint64_t* SendWindow ); +// +// Assigns a new local stream ID to the stream. May be called on the app thread, +// and at dispatch level. +// +_IRQL_requires_max_(DISPATCH_LEVEL) +QUIC_STATUS +QuicStreamSetNewLocalStreamID( + _Inout_ QUIC_STREAM_SET* StreamSet, + _In_ BOOLEAN FailOnBlocked, + _In_ QUIC_STREAM* Stream, + _Out_ BOOLEAN* NewStreamBlocked + ); + // // Creates a new local stream. // @@ -179,7 +197,6 @@ _IRQL_requires_max_(PASSIVE_LEVEL) QUIC_STATUS QuicStreamSetNewLocalStream( _Inout_ QUIC_STREAM_SET* StreamSet, - _In_ uint8_t Type, _In_ BOOLEAN FailOnBlocked, _In_ QUIC_STREAM* Stream ); diff --git a/src/cs/lib/msquic_generated.cs b/src/cs/lib/msquic_generated.cs index 46157c55dd..850cb72952 100644 --- a/src/cs/lib/msquic_generated.cs +++ b/src/cs/lib/msquic_generated.cs @@ -162,6 +162,7 @@ internal enum QUIC_STREAM_START_FLAGS FAIL_BLOCKED = 0x0002, SHUTDOWN_ON_FAIL = 0x0004, INDICATE_PEER_ACCEPT = 0x0008, + FAIL_BLOCKED_INLINE = 0x0010, } [System.Flags] diff --git a/src/inc/msquic.h b/src/inc/msquic.h index bc63bf20ae..fb11a53fea 100644 --- a/src/inc/msquic.h +++ b/src/inc/msquic.h @@ -208,6 +208,7 @@ typedef enum QUIC_STREAM_START_FLAGS { QUIC_STREAM_START_FLAG_FAIL_BLOCKED = 0x0002, // Only opens the stream if flow control allows. QUIC_STREAM_START_FLAG_SHUTDOWN_ON_FAIL = 0x0004, // Shutdown the stream immediately after start failure. QUIC_STREAM_START_FLAG_INDICATE_PEER_ACCEPT = 0x0008, // Indicate PEER_ACCEPTED event if not accepted at start. + QUIC_STREAM_START_FLAG_FAIL_BLOCKED_INLINE = 0x0010, // Only opens the stream if flow control allows (inline to API call). } QUIC_STREAM_START_FLAGS; DEFINE_ENUM_FLAG_OPERATORS(QUIC_STREAM_START_FLAGS) diff --git a/src/test/lib/ApiTest.cpp b/src/test/lib/ApiTest.cpp index 96f201fba8..9518c33acc 100644 --- a/src/test/lib/ApiTest.cpp +++ b/src/test/lib/ApiTest.cpp @@ -1459,6 +1459,35 @@ void QuicTestValidateStream(bool Connect) TEST_FALSE(Context.ShutdownComplete); } + // + // Fail on blocked inline. + // + { + TestScopeLogger logScope("Fail on blocked inline"); + ShutdownStreamContext Context; + StreamScope Stream; + TEST_QUIC_SUCCEEDED( + MsQuic->StreamOpen( + Client.GetConnection(), + QUIC_STREAM_OPEN_FLAG_NONE, + ShutdownStreamCallback, + &Context, + &Stream.Handle)); + if (Connect) { + TEST_QUIC_SUCCEEDED( + MsQuic->StreamStart( + Stream.Handle, + QUIC_STREAM_START_FLAG_FAIL_BLOCKED_INLINE)); + } else { + TEST_QUIC_STATUS( + QUIC_STATUS_STREAM_LIMIT_REACHED, + MsQuic->StreamStart( + Stream.Handle, + QUIC_STREAM_START_FLAG_FAIL_BLOCKED_INLINE)); + } + TEST_FALSE(Context.ShutdownComplete); + } + // // Shutdown on fail. //