Skip to content

Commit

Permalink
Small cleanup of ThreadsInternal::initialize
Browse files Browse the repository at this point in the history
  • Loading branch information
Rombur committed Oct 26, 2023
1 parent 6ac5aa8 commit 7d31c22
Showing 1 changed file with 28 additions and 41 deletions.
69 changes: 28 additions & 41 deletions core/src/Threads/Kokkos_Threads_Instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,7 @@ void ThreadsInternal::print_configuration(std::ostream &s, const bool detail) {
int ThreadsInternal::is_initialized() { return nullptr != s_threads_exec[0]; }

void ThreadsInternal::initialize(int thread_count_arg) {
// legacy arguments
unsigned thread_count = thread_count_arg == -1 ? 0 : thread_count_arg;
unsigned use_numa_count = 0;
unsigned use_cores_per_numa = 0;
bool allow_asynchronous_threadpool = false;
unsigned thread_count = thread_count_arg == -1 ? 0 : thread_count_arg;

const bool is_initialized = 0 != s_thread_pool_size[0];

Expand All @@ -546,10 +542,8 @@ void ThreadsInternal::initialize(int thread_count_arg) {
s_threads_exec[i] = nullptr;

if (!is_initialized) {
// If thread_count, use_numa_count, or use_cores_per_numa are zero
// then they will be given default values based upon hwloc detection
// and allowed asynchronous execution.

// If thread_count is zero then it will be given default values based upon
// hwloc detection.
const bool hwloc_avail = Kokkos::hwloc::available();
const bool hwloc_can_bind =
hwloc_avail && Kokkos::hwloc::can_bind_threads();
Expand All @@ -562,25 +556,26 @@ void ThreadsInternal::initialize(int thread_count_arg) {
: 1;
}

const unsigned thread_spawn_begin = hwloc::thread_mapping(
"Kokkos::Threads::initialize", allow_asynchronous_threadpool,
thread_count, use_numa_count, use_cores_per_numa, s_threads_coord);
const bool allow_asynchronous_threadpool = false;
unsigned use_numa_count = 0;
unsigned use_cores_per_numa = 0;
hwloc::thread_mapping("Kokkos::Threads::initialize",
allow_asynchronous_threadpool, thread_count,
use_numa_count, use_cores_per_numa, s_threads_coord);

const std::pair<unsigned, unsigned> proc_coord = s_threads_coord[0];

if (thread_spawn_begin) {
// Synchronous with s_threads_coord[0] as the process core
// Claim entry #0 for binding the process core.
s_threads_coord[0] = std::pair<unsigned, unsigned>(~0u, ~0u);
}
// Synchronous with s_threads_coord[0] as the process core
// Claim entry #0 for binding the process core.
s_threads_coord[0] = std::pair<unsigned, unsigned>(~0u, ~0u);

s_thread_pool_size[0] = thread_count;
s_thread_pool_size[1] = s_thread_pool_size[0] / use_numa_count;
s_thread_pool_size[2] = s_thread_pool_size[1] / use_cores_per_numa;
s_current_function =
&execute_function_noop; // Initialization work function

for (unsigned ith = thread_spawn_begin; ith < thread_count; ++ith) {
for (unsigned ith = 1; ith < thread_count; ++ith) {
s_threads_process.m_pool_state = ThreadState::Inactive;

// If hwloc available then spawned thread will
Expand All @@ -604,10 +599,10 @@ void ThreadsInternal::initialize(int thread_count_arg) {

// Wait for all spawned threads to deactivate before zeroing the function.

for (unsigned ith = thread_spawn_begin; ith < thread_count; ++ith) {
for (unsigned ith = 1; ith < thread_count; ++ith) {
// Try to protect against cache coherency failure by casting to volatile.
ThreadsInternal *const th =
((ThreadsInternal * volatile *)s_threads_exec)[ith];
((ThreadsInternal *volatile *)s_threads_exec)[ith];
if (th) {
wait_yield(th->m_pool_state, ThreadState::Active);
} else {
Expand All @@ -628,27 +623,19 @@ void ThreadsInternal::initialize(int thread_count_arg) {
Kokkos::hwloc::bind_this_thread(proc_coord);
}

if (thread_spawn_begin) { // Include process in pool.
const std::pair<unsigned, unsigned> coord =
Kokkos::hwloc::get_this_thread_coordinate();

s_threads_exec[0] = &s_threads_process;
s_threads_process.m_numa_rank = coord.first;
s_threads_process.m_numa_core_rank = coord.second;
s_threads_process.m_pool_base = s_threads_exec;
s_threads_process.m_pool_rank =
thread_count - 1; // Reversed for scan-compatible reductions
s_threads_process.m_pool_size = thread_count;
s_threads_process.m_pool_fan_size = fan_size(
s_threads_process.m_pool_rank, s_threads_process.m_pool_size);
s_threads_pid[s_threads_process.m_pool_rank] =
std::this_thread::get_id();
} else {
s_threads_process.m_pool_base = nullptr;
s_threads_process.m_pool_rank = 0;
s_threads_process.m_pool_size = 0;
s_threads_process.m_pool_fan_size = 0;
}
const std::pair<unsigned, unsigned> coord =
Kokkos::hwloc::get_this_thread_coordinate();

s_threads_exec[0] = &s_threads_process;
s_threads_process.m_numa_rank = coord.first;
s_threads_process.m_numa_core_rank = coord.second;
s_threads_process.m_pool_base = s_threads_exec;
s_threads_process.m_pool_rank =
thread_count - 1; // Reversed for scan-compatible reductions
s_threads_process.m_pool_size = thread_count;
s_threads_process.m_pool_fan_size = fan_size(
s_threads_process.m_pool_rank, s_threads_process.m_pool_size);
s_threads_pid[s_threads_process.m_pool_rank] = std::this_thread::get_id();

// Initial allocations:
ThreadsInternal::resize_scratch(1024, 1024);
Expand Down

0 comments on commit 7d31c22

Please sign in to comment.