Skip to content

Commit

Permalink
Improve Thread Creation Logic (#4531)
Browse files Browse the repository at this point in the history
  • Loading branch information
nibanks committed Sep 9, 2024
1 parent 8ee995e commit add8a3a
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 110 deletions.
21 changes: 18 additions & 3 deletions src/core/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,24 @@ QuicWorkerInitialize(
} else
#endif // _KERNEL_MODE
{
const uint16_t ThreadFlags =
ExecProfile == QUIC_EXECUTION_PROFILE_TYPE_REAL_TIME ?
CXPLAT_THREAD_FLAG_SET_AFFINITIZE : CXPLAT_THREAD_FLAG_NONE;
uint16_t ThreadFlags;
switch (ExecProfile) {
default:
case QUIC_EXECUTION_PROFILE_LOW_LATENCY:
case QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT:
ThreadFlags = CXPLAT_THREAD_FLAG_SET_IDEAL_PROC;
break;
case QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER:
ThreadFlags = CXPLAT_THREAD_FLAG_NONE;
break;
case QUIC_EXECUTION_PROFILE_TYPE_REAL_TIME:
ThreadFlags = CXPLAT_THREAD_FLAG_SET_AFFINITIZE | CXPLAT_THREAD_FLAG_HIGH_PRIORITY;
break;
}

if (MsQuicLib.ExecutionConfig && MsQuicLib.ExecutionConfig->Flags & QUIC_EXECUTION_CONFIG_FLAG_HIGH_PRIORITY) {
ThreadFlags |= CXPLAT_THREAD_FLAG_HIGH_PRIORITY;
}

CXPLAT_THREAD_CONFIG ThreadConfig = {
ThreadFlags,
Expand Down
1 change: 1 addition & 0 deletions src/cs/lib/msquic_generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ internal enum QUIC_EXECUTION_CONFIG_FLAGS
RIO = 0x0002,
XDP = 0x0004,
NO_IDEAL_PROC = 0x0008,
HIGH_PRIORITY = 0x0010,
}

internal unsafe partial struct QUIC_EXECUTION_CONFIG
Expand Down
14 changes: 7 additions & 7 deletions src/generated/linux/platform_winuser.c.clog.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ tracepoint(CLOG_PLATFORM_WINUSER_C, WindowsUserProcessorStateV3 , arg2, arg3, ar


/*----------------------------------------------------------
// Decoder Ring for ProcessorInfoV2
// [ dll] Proc[%u] Group[%hu] Index[%u] Active=%hhu
// Decoder Ring for ProcessorInfoV3
// [ dll] Proc[%u] Group[%hu] Index[%hhu] Active=%hhu
// QuicTraceLogInfo(
ProcessorInfoV2,
"[ dll] Proc[%u] Group[%hu] Index[%u] Active=%hhu",
ProcessorInfoV3,
"[ dll] Proc[%u] Group[%hu] Index[%hhu] Active=%hhu",
Proc,
(uint16_t)Group,
CxPlatProcessorInfo[Proc].Index,
Expand All @@ -96,9 +96,9 @@ tracepoint(CLOG_PLATFORM_WINUSER_C, WindowsUserProcessorStateV3 , arg2, arg3, ar
// arg4 = arg4 = CxPlatProcessorInfo[Proc].Index = arg4
// arg5 = arg5 = (uint8_t)!!(CxPlatProcessorGroupInfo[Group].Mask & (1ULL << CxPlatProcessorInfo[Proc].Index)) = arg5
----------------------------------------------------------*/
#ifndef _clog_6_ARGS_TRACE_ProcessorInfoV2
#define _clog_6_ARGS_TRACE_ProcessorInfoV2(uniqueId, encoded_arg_string, arg2, arg3, arg4, arg5)\
tracepoint(CLOG_PLATFORM_WINUSER_C, ProcessorInfoV2 , arg2, arg3, arg4, arg5);\
#ifndef _clog_6_ARGS_TRACE_ProcessorInfoV3
#define _clog_6_ARGS_TRACE_ProcessorInfoV3(uniqueId, encoded_arg_string, arg2, arg3, arg4, arg5)\
tracepoint(CLOG_PLATFORM_WINUSER_C, ProcessorInfoV3 , arg2, arg3, arg4, arg5);\

#endif

Expand Down
14 changes: 7 additions & 7 deletions src/generated/linux/platform_winuser.c.clog.h.lttng.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ TRACEPOINT_EVENT(CLOG_PLATFORM_WINUSER_C, WindowsUserProcessorStateV3,


/*----------------------------------------------------------
// Decoder Ring for ProcessorInfoV2
// [ dll] Proc[%u] Group[%hu] Index[%u] Active=%hhu
// Decoder Ring for ProcessorInfoV3
// [ dll] Proc[%u] Group[%hu] Index[%hhu] Active=%hhu
// QuicTraceLogInfo(
ProcessorInfoV2,
"[ dll] Proc[%u] Group[%hu] Index[%u] Active=%hhu",
ProcessorInfoV3,
"[ dll] Proc[%u] Group[%hu] Index[%hhu] Active=%hhu",
Proc,
(uint16_t)Group,
CxPlatProcessorInfo[Proc].Index,
Expand All @@ -79,16 +79,16 @@ TRACEPOINT_EVENT(CLOG_PLATFORM_WINUSER_C, WindowsUserProcessorStateV3,
// arg4 = arg4 = CxPlatProcessorInfo[Proc].Index = arg4
// arg5 = arg5 = (uint8_t)!!(CxPlatProcessorGroupInfo[Group].Mask & (1ULL << CxPlatProcessorInfo[Proc].Index)) = arg5
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_PLATFORM_WINUSER_C, ProcessorInfoV2,
TRACEPOINT_EVENT(CLOG_PLATFORM_WINUSER_C, ProcessorInfoV3,
TP_ARGS(
unsigned int, arg2,
unsigned short, arg3,
unsigned int, arg4,
unsigned char, arg4,
unsigned char, arg5),
TP_FIELDS(
ctf_integer(unsigned int, arg2, arg2)
ctf_integer(unsigned short, arg3, arg3)
ctf_integer(unsigned int, arg4, arg4)
ctf_integer(unsigned char, arg4, arg4)
ctf_integer(unsigned char, arg5, arg5)
)
)
Expand Down
1 change: 1 addition & 0 deletions src/inc/msquic.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ typedef enum QUIC_EXECUTION_CONFIG_FLAGS {
QUIC_EXECUTION_CONFIG_FLAG_RIO = 0x0002,
QUIC_EXECUTION_CONFIG_FLAG_XDP = 0x0004,
QUIC_EXECUTION_CONFIG_FLAG_NO_IDEAL_PROC = 0x0008,
QUIC_EXECUTION_CONFIG_FLAG_HIGH_PRIORITY = 0x0010,
#endif
} QUIC_EXECUTION_CONFIG_FLAGS;

Expand Down
88 changes: 6 additions & 82 deletions src/inc/quic_platform_winuser.h
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ CxPlatEventQDequeue(
)
{
ULONG out_count = 0;
if (!GetQueuedCompletionStatusEx(*queue, events, count, &out_count, wait_time, FALSE)) return FALSE;
if (!GetQueuedCompletionStatusEx(*queue, events, count, &out_count, wait_time, FALSE)) return 0;
CXPLAT_DBG_ASSERT(out_count != 0);
CXPLAT_DBG_ASSERT(events[0].lpOverlapped != NULL || out_count == 1);
#if DEBUG
Expand Down Expand Up @@ -1001,10 +1001,13 @@ CxPlatTimeAtOrBefore32(
//

typedef struct CXPLAT_PROCESSOR_INFO {
uint32_t Index; // Index in the current group
uint16_t Group; // The group number this processor is a part of
uint8_t Index; // Index in the current group
uint8_t PADDING; // Here to align with PROCESSOR_NUMBER struct
} CXPLAT_PROCESSOR_INFO;

CXPLAT_STATIC_ASSERT(sizeof(CXPLAT_PROCESSOR_INFO) == sizeof(PROCESSOR_NUMBER), "Size check");

typedef struct CXPLAT_PROCESSOR_GROUP_INFO {
KAFFINITY Mask; // Bit mask of active processors in the group
uint32_t Count; // Count of active processors in the group
Expand Down Expand Up @@ -1108,90 +1111,11 @@ CXPLAT_THREAD_CALLBACK(CxPlatThreadCustomStart, CustomContext); // CXPLAT_THREAD

#endif // CXPLAT_USE_CUSTOM_THREAD_CONTEXT

inline
QUIC_STATUS
CxPlatThreadCreate(
_In_ CXPLAT_THREAD_CONFIG* Config,
_Out_ CXPLAT_THREAD* Thread
)
{
#ifdef CXPLAT_USE_CUSTOM_THREAD_CONTEXT
CXPLAT_THREAD_CUSTOM_CONTEXT* CustomContext =
CXPLAT_ALLOC_NONPAGED(sizeof(CXPLAT_THREAD_CUSTOM_CONTEXT), QUIC_POOL_CUSTOM_THREAD);
if (CustomContext == NULL) {
QuicTraceEvent(
AllocFailure,
"Allocation of '%s' failed. (%llu bytes)",
"Custom thread context",
sizeof(CXPLAT_THREAD_CUSTOM_CONTEXT));
return QUIC_STATUS_OUT_OF_MEMORY;
}
CustomContext->Callback = Config->Callback;
CustomContext->Context = Config->Context;
*Thread =
CreateThread(
NULL,
0,
CxPlatThreadCustomStart,
CustomContext,
0,
NULL);
if (*Thread == NULL) {
CXPLAT_FREE(CustomContext, QUIC_POOL_CUSTOM_THREAD);
return GetLastError();
}
#else // CXPLAT_USE_CUSTOM_THREAD_CONTEXT
*Thread =
CreateThread(
NULL,
0,
Config->Callback,
Config->Context,
0,
NULL);
if (*Thread == NULL) {
return GetLastError();
}
#endif // CXPLAT_USE_CUSTOM_THREAD_CONTEXT
CXPLAT_DBG_ASSERT(Config->IdealProcessor < CxPlatProcCount());
const CXPLAT_PROCESSOR_INFO* ProcInfo = &CxPlatProcessorInfo[Config->IdealProcessor];
GROUP_AFFINITY Group = {0};
if (Config->Flags & CXPLAT_THREAD_FLAG_SET_AFFINITIZE) {
Group.Mask = (KAFFINITY)(1ull << ProcInfo->Index); // Fixed processor
} else {
Group.Mask = CxPlatProcessorGroupInfo[ProcInfo->Group].Mask;
}
Group.Group = ProcInfo->Group;
SetThreadGroupAffinity(*Thread, &Group, NULL);
if (Config->Flags & CXPLAT_THREAD_FLAG_SET_IDEAL_PROC) {
SetThreadIdealProcessor(*Thread, ProcInfo->Index);
}
if (Config->Flags & CXPLAT_THREAD_FLAG_HIGH_PRIORITY) {
SetThreadPriority(*Thread, THREAD_PRIORITY_HIGHEST);
}
if (Config->Name) {
WCHAR WideName[64] = L"";
size_t WideNameLength;
mbstowcs_s(
&WideNameLength,
WideName,
ARRAYSIZE(WideName) - 1,
Config->Name,
_TRUNCATE);
#if defined(QUIC_RESTRICTED_BUILD)
SetThreadDescription(*Thread, WideName);
#else
THREAD_NAME_INFORMATION_PRIVATE ThreadNameInfo;
RtlInitUnicodeString(&ThreadNameInfo.ThreadName, WideName);
NtSetInformationThread(
*Thread,
ThreadNameInformationPrivate,
&ThreadNameInfo,
sizeof(ThreadNameInfo));
#endif
}
return QUIC_STATUS_SUCCESS;
}
);
#define CxPlatThreadDelete(Thread) CxPlatCloseHandle(*(Thread))
#define CxPlatThreadWait(Thread) WaitForSingleObject(*(Thread), INFINITE)
typedef uint32_t CXPLAT_THREAD_ID;
Expand Down
29 changes: 29 additions & 0 deletions src/manifest/clog.sidecar
Original file line number Diff line number Diff line change
Expand Up @@ -8874,6 +8874,30 @@
],
"macroName": "QuicTraceLogInfo"
},
"ProcessorInfoV3": {
"ModuleProperites": {},
"TraceString": "[ dll] Proc[%u] Group[%hu] Index[%hhu] Active=%hhu",
"UniqueId": "ProcessorInfoV3",
"splitArgs": [
{
"DefinationEncoding": "u",
"MacroVariableName": "arg2"
},
{
"DefinationEncoding": "hu",
"MacroVariableName": "arg3"
},
{
"DefinationEncoding": "hhu",
"MacroVariableName": "arg4"
},
{
"DefinationEncoding": "hhu",
"MacroVariableName": "arg5"
}
],
"macroName": "QuicTraceLogInfo"
},
"ProviderAttachClient": {
"ModuleProperites": {},
"TraceString": "[ nmr][%p] Client attached Ver %hu Size %hu Number %u ModuleID { %x-%x-%x-%llx }",
Expand Down Expand Up @@ -16002,6 +16026,11 @@
"TraceID": "ProcessorInfoV2",
"EncodingString": "[ dll] Proc[%u] Group[%hu] Index[%u] Active=%hhu"
},
{
"UniquenessHash": "c224289c-70bc-0be5-bc16-ea902175e280",
"TraceID": "ProcessorInfoV3",
"EncodingString": "[ dll] Proc[%u] Group[%hu] Index[%hhu] Active=%hhu"
},
{
"UniquenessHash": "6509226e-b945-543f-bf07-0147bcb93ca5",
"TraceID": "ProviderAttachClient",
Expand Down
9 changes: 8 additions & 1 deletion src/perf/lib/PerfClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,15 @@ PerfClient::Start(
//
// Configure and start all the workers.
//
uint16_t ThreadFlags =
AffinitizeWorkers ?
(uint16_t)CXPLAT_THREAD_FLAG_SET_AFFINITIZE :
(uint16_t)CXPLAT_THREAD_FLAG_SET_IDEAL_PROC;
if (PerfDefaultHighPriority) {
ThreadFlags |= CXPLAT_THREAD_FLAG_HIGH_PRIORITY;
}
CXPLAT_THREAD_CONFIG ThreadConfig = {
(uint16_t)(AffinitizeWorkers ? CXPLAT_THREAD_FLAG_SET_AFFINITIZE : CXPLAT_THREAD_FLAG_SET_IDEAL_PROC),
ThreadFlags,
0,
"Perf Worker",
PerfClientWorker::s_WorkerThread,
Expand Down
1 change: 1 addition & 0 deletions src/perf/lib/SecNetPerf.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ extern TCP_EXECUTION_PROFILE TcpDefaultExecutionProfile;
extern QUIC_CONGESTION_CONTROL_ALGORITHM PerfDefaultCongestionControl;
extern uint8_t PerfDefaultEcnEnabled;
extern uint8_t PerfDefaultQeoAllowed;
extern uint8_t PerfDefaultHighPriority;

extern CXPLAT_DATAPATH* Datapath;

Expand Down
12 changes: 10 additions & 2 deletions src/perf/lib/SecNetPerfMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ TCP_EXECUTION_PROFILE TcpDefaultExecutionProfile = TCP_EXECUTION_PROFILE_LOW_LAT
QUIC_CONGESTION_CONTROL_ALGORITHM PerfDefaultCongestionControl = QUIC_CONGESTION_CONTROL_ALGORITHM_CUBIC;
uint8_t PerfDefaultEcnEnabled = false;
uint8_t PerfDefaultQeoAllowed = false;
uint8_t PerfDefaultHighPriority = false;

#ifdef _KERNEL_MODE
volatile int BufferCurrent;
Expand Down Expand Up @@ -94,11 +95,12 @@ PrintHelp(
" -pollidle:<time_us> Amount of time to poll while idle before sleeping (default: 0).\n"
" -ecn:<0/1> Enables/disables sender-side ECN support. (def:0)\n"
" -qeo:<0/1> Allows/disallowes QUIC encryption offload. (def:0)\n"
#ifndef _KERNEL_MODE
" -io:<mode> Configures a requested network IO model to be used.\n"
" - {iocp, rio, xdp, qtip, wsk, epoll, kqueue}\n"
#ifndef _KERNEL_MODE
" -cpu:<cpu_index> Specify the processor(s) to use.\n"
" -cipher:<value> Decimal value of 1 or more QUIC_ALLOWED_CIPHER_SUITE_FLAGS.\n"
" -highpri:<0/1> Configures MsQuic to run threads at high priority. (def:0)\n"
#endif // _KERNEL_MODE
"\n",
PERF_DEFAULT_PORT,
Expand Down Expand Up @@ -137,7 +139,7 @@ QuicMainStart(

uint8_t RawConfig[QUIC_EXECUTION_CONFIG_MIN_SIZE + 256 * sizeof(uint16_t)] = {0};
QUIC_EXECUTION_CONFIG* Config = (QUIC_EXECUTION_CONFIG*)RawConfig;
Config->PollingIdleTimeoutUs = UINT32_MAX; // Default to no sleep.
Config->PollingIdleTimeoutUs = 0; // Default to no polling.
bool SetConfig = false;

#ifndef _KERNEL_MODE
Expand Down Expand Up @@ -173,6 +175,12 @@ QuicMainStart(
} while (*CpuStr && Config->ProcessorCount < 256);
}
}

TryGetValue(argc, argv, "highpri", &PerfDefaultHighPriority);
if (PerfDefaultHighPriority) {
Config->Flags |= QUIC_EXECUTION_CONFIG_FLAG_HIGH_PRIORITY;
SetConfig = true;
}
#endif // _KERNEL_MODE

if (TryGetValue(argc, argv, "pollidle", &Config->PollingIdleTimeoutUs)) {
Expand Down
6 changes: 5 additions & 1 deletion src/perf/lib/Tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,11 @@ bool TcpWorker::Initialize(TcpEngine* _Engine, uint16_t PartitionIndex)
}
#endif

CXPLAT_THREAD_CONFIG Config = { 0, PartitionIndex, "TcpPerfWorker", WorkerThread, this };
uint16_t ThreadFlags = CXPLAT_THREAD_FLAG_SET_IDEAL_PROC;
if (PerfDefaultHighPriority) {
ThreadFlags |= CXPLAT_THREAD_FLAG_HIGH_PRIORITY;
}
CXPLAT_THREAD_CONFIG Config = { ThreadFlags, PartitionIndex, "TcpPerfWorker", WorkerThread, this };
if (QUIC_FAILED(
CxPlatThreadCreate(
&Config,
Expand Down
Loading

0 comments on commit add8a3a

Please sign in to comment.