Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processor group support #13

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ bld/
[Oo]bj/
[Ll]og/
[Ll]ogs/

[Oo]ut/
# Visual Studio 2015/2017 cache/options directory
.vs/
# Uncomment if you have tasks that create the project's static files in wwwroot
Expand Down
62 changes: 52 additions & 10 deletions src/ntttcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,15 @@ typedef struct _FLAGS {
BOOL roundtrip;
BOOL hide_per_thread_stats;
BOOL udp_receive_coalescing;
BOOL group_aware;

} FLAGS, *PFLAGS;

// Worker thread context (with 1 session per worker thread)
typedef struct _PHP {
int index;
int proc;
WORD group;
int port;
PCHAR receiver_name;
PCHAR sender_name;
Expand All @@ -86,6 +89,7 @@ typedef struct _PHP {
typedef struct _MAP {
int threads;
int proc;
WORD group;
PCHAR receiver_name;
} MAP, *PMAP;

Expand Down Expand Up @@ -1213,11 +1217,13 @@ SetDefaultFlags(
mappings[i] = NULL;
maps[i].threads = 0;
maps[i].proc = 0;
maps[i].group = 0;
maps[i].receiver_name = NULL;
}

GetSystemInfo(&sbi);
num_processors = sbi.dwNumberOfProcessors;
num_processors = sbi.dwNumberOfProcessors; //Needs attention. In the fairly rare scenario where a processor group with < 64 CPUs is present (48-core CPUs with SMT, various other oddly-arranged products), calling GetSystemInfo from the small group will prevent worker threads from being scheduled on the large groups.
//todo: add num_groups to sbi with similar collection method
VMSG("NumberOfProcessors: %d\n", num_processors);

proc_speed = GetProcessorSpeed();
Expand Down Expand Up @@ -1639,13 +1645,14 @@ PrintUsage(
printf("\t Packet spacing is only supported for 1 thread synchronous sending.\n");
printf("\t The spacing must be between %d and %d ms.\n", PS_MIN_PACKET_PERIOD, PS_MAX_PACKET_PERIOD);
printf("\t -thr and -brn are not supported options when -ps is used.\n");
printf("\t-ga Enable processor group awareness for systems with more than 64 logical processors.\n");
printf("\t-m <mappings>\n"
"\t One or more mapping 3-tuples separated by spaces:\n"
"\t (number of threads, processor number, receiver address or name)\n"
"\t Processor number must be in the process kgroup. If processor number\n"
"\t is \"*\", the threads are not affinitized.\n"
"\t e.g. \"-m 4,0,1.2.3.4 2,*,contoso\" sets up:\n"
"\t -4 threads on processor 0 to connect to 1.2.3.4\n"
"\t -4 threads on processor 0 in processor group 1 to connect to 1.2.3.4\n"
"\t -2 unaffinitized threads to connect to contoso\n");
}

Expand Down Expand Up @@ -1706,6 +1713,7 @@ PrintFlags(
enum {
S_THREADS = 0,
S_PROCESSOR,
S_KGROUP,
S_HOST,
S_DONE
};
Expand All @@ -1730,6 +1738,7 @@ ProcessMappings(
//
// - Number of threads to run
// - The processor mask for thread affinity
// - The kgroup of the processor for thread affinity, if enabled
// - The host machine IP address

while (NULL != token) {
Expand Down Expand Up @@ -1766,6 +1775,12 @@ ProcessMappings(

maps[i].proc = processor;
++state;
} else if (S_KGROUP == state) {
if (flags.group_aware)
{
maps[i].group = (WORD)strtoul(token, NULL, 0);
}
++state;
} else if (S_HOST == state) {
maps[i].receiver_name = token;

Expand All @@ -1776,7 +1791,10 @@ ProcessMappings(
goto exit;
}

token = strtok(NULL, ",");
if (!(S_KGROUP == state && !flags.group_aware))
{ //Don't advance the token if there wasn't a kgroup token specified
token = strtok(NULL, ",");
}
}

if (S_DONE != state) {
Expand Down Expand Up @@ -2086,6 +2104,10 @@ ProcessArgs(

++i;
}
} else if (0 == _stricmp(argv[i], "-ga")) {
++i;
flags.group_aware = TRUE;

} else if (0 == _stricmp(argv[i], "-wu")) {
++i;

Expand Down Expand Up @@ -2475,6 +2497,7 @@ BOOL SetupThreads(
int num_threads,
int start_index,
int processor,
WORD group,
__in PCHAR receiver_name,
HANDLE io_compl_port,
HANDLE send_token,
Expand Down Expand Up @@ -2504,6 +2527,7 @@ BOOL SetupThreads(

php[i].index = index;
php[i].proc = processor;
php[i].group = group;
php[i].receiver_name = receiver_name;
php[i].sender_name = sender_name;
php[i].port = port;
Expand Down Expand Up @@ -3990,13 +4014,30 @@ StartSenderReceiver(
VMSG("StartSenderReceiver start thread %d port %d\n", php->index, php->port);

if (php->proc != NO_HARD_AFFINITY) {
// Note SetThreadAffinityMask requires the processor to be in the process's kgroup.
if (0 == SetThreadAffinityMask(GetCurrentThread(),
((KAFFINITY)(1ULL << php->proc)))) {
PrintThreadError(php->index,
"StartSenderReceiver",
"SetThreadAffinityMask failed");
goto exit;
if (flags.group_aware)
{
GROUP_AFFINITY group_affinity;
group_affinity.Mask = (KAFFINITY)1 << php->proc;
group_affinity.Group = php->group;
memset(group_affinity.Reserved, 0, sizeof(WORD) * 3);

if (0 == SetThreadGroupAffinity(GetCurrentThread(), &group_affinity, NULL))
{
PrintThreadError(php->index, "StartSenderReceiver", "SetThreadGroupAffinity failed");
goto exit;
}

}
else
{
// Note SetThreadAffinityMask requires the processor to be in the process's kgroup.
if (0 == SetThreadAffinityMask(GetCurrentThread(),
((KAFFINITY)(1ULL << php->proc)))) {
PrintThreadError(php->index,
"StartSenderReceiver",
"SetThreadAffinityMask failed");
goto exit;
}
}
} else if (node_affinity >= 0) {
GROUP_AFFINITY group;
Expand Down Expand Up @@ -4444,6 +4485,7 @@ DoWork(
SetupThreads(maps[i].threads,
index,
maps[i].proc,
maps[i].group,
maps[i].receiver_name,
io_compl_port,
send_token,
Expand Down