Skip to content

Commit

Permalink
Fix usage of sequential execution branches (#2783)
Browse files Browse the repository at this point in the history
* Fix usage of sequential execution branches

* clang-format-14 specific formatting fix

* Completely remove unnecessary TBB layer flag
  • Loading branch information
Alexsandruss authored May 22, 2024
1 parent d6f4dc3 commit 99570f8
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 348 deletions.
1 change: 0 additions & 1 deletion cpp/daal/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ daal_module(
name = "threading_tbb",
srcs = glob(["src/threading/**/*.cpp"]),
local_defines = [
"__DO_TBB_LAYER__",
"__TBB_NO_IMPLICIT_LINKAGE",
"__TBB_LEGACY_MODE",
"TBB_SUPPRESS_DEPRECATED_MESSAGES",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,6 @@ using namespace daal::data_management;
using namespace daal::internal;
using namespace daal::services::internal;

template <CpuType cpu, typename F>
void conditional_threader_for(bool condition, size_t n, size_t threadsRequest, const F & processIteration)
{
if (condition)
{
daal::threader_for(n, threadsRequest, processIteration);
}
else
{
for (size_t i = 0; i < n; i++)
{
processIteration(i);
}
}
}

template <typename algorithmFPType, CpuType cpu>
Status MergeKernel<algorithmFPType, cpu>::merge(const NumericTable & partialTable, algorithmFPType * result, bool threadingCondition)
{
Expand All @@ -69,7 +53,7 @@ Status MergeKernel<algorithmFPType, cpu>::merge(const NumericTable & partialTabl
algorithmFPType * partialResult = const_cast<algorithmFPType *>(block.get());

size_t resultSize = nRows * partialTable.getNumberOfColumns();
conditional_threader_for<cpu>(threadingCondition, resultSize, resultSize, [=](size_t i) { result[i] += partialResult[i]; });
daal::conditional_threader_for(threadingCondition, resultSize, [=](size_t i) { result[i] += partialResult[i]; });
return Status();
}

Expand Down
122 changes: 45 additions & 77 deletions cpp/daal/src/threading/service_thread_pinner.cpp
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,37 @@
#include "services/daal_memory.h"
#include "src/threading/threading.h"

#if defined(__DO_TBB_LAYER__)

#define USE_TASK_ARENA_CURRENT_SLOT 1
#define LOG_PINNING 1
#define TBB_PREVIEW_TASK_ARENA 1
#define TBB_PREVIEW_LOCAL_OBSERVER 1

#include "tbb/tbb.h"
#include <tbb/task_arena.h>
#include <tbb/task_scheduler_observer.h>
#include <tbb/parallel_reduce.h>
#include <tbb/blocked_range.h>
#include <tbb/tick_count.h>
#include <tbb/scalable_allocator.h>
#include "services/daal_atomic_int.h"
#define USE_TASK_ARENA_CURRENT_SLOT 1
#define LOG_PINNING 1
#define TBB_PREVIEW_TASK_ARENA 1
#define TBB_PREVIEW_LOCAL_OBSERVER 1

#include "tbb/tbb.h"
#include <tbb/task_arena.h>
#include <tbb/task_scheduler_observer.h>
#include <tbb/parallel_reduce.h>
#include <tbb/blocked_range.h>
#include <tbb/tick_count.h>
#include <tbb/scalable_allocator.h>
#include "services/daal_atomic_int.h"
using namespace daal::services;

#if defined(_WIN32) || defined(_WIN64)
#include <Windows.h>
#define __PINNER_WINDOWS__
#if defined(_WIN32) || defined(_WIN64)
#include <Windows.h>
#define __PINNER_WINDOWS__

#if defined(_WIN64)
#define MASK_WIDTH 64
#else
#define MASK_WIDTH 32
#endif
#if defined(_WIN64)
#define MASK_WIDTH 64
#else
#define MASK_WIDTH 32
#endif

#else // LINUX
#include <sched.h>
#define __PINNER_LINUX__
#else // LINUX
#include <sched.h>
#define __PINNER_LINUX__

#ifdef __FreeBSD__
#include <pthread_np.h>
#ifdef __FreeBSD__
#include <pthread_np.h>

cpu_set_t * __sched_cpualloc(size_t count)
{
Expand All @@ -73,25 +71,25 @@ int sched_getaffinity(pid_t pid, size_t cpusetsize, cpu_set_t * mask)
{
return cpuset_getaffinity(CPU_LEVEL_WHICH, CPU_WHICH_PID, pid == 0 ? -1 : pid, cpusetsize, mask);
}
#endif

#endif

#endif

struct cpu_mask_t
{
int status;
#if defined(_WIN32) || defined(_WIN64)
#if defined(_WIN32) || defined(_WIN64)
GROUP_AFFINITY ga;
#else
#else
int ncpus;
int bit_parts_size;
cpu_set_t * cpu_set;
#endif
#endif
cpu_mask_t()
{
status = 0;

#if defined __PINNER_LINUX__
#if defined __PINNER_LINUX__

ncpus = 0;
bit_parts_size = 0;
Expand All @@ -113,10 +111,10 @@ struct cpu_mask_t
}

if (cpu_set == NULL)
#else // defined __PINNER_WINDOWS__
#else // defined __PINNER_WINDOWS__
bool retval = GetThreadGroupAffinity(GetCurrentThread(), &ga);
if (!retval)
#endif
#endif
{
status--;
}
Expand All @@ -128,13 +126,13 @@ struct cpu_mask_t
{
if (status == 0)
{
#if defined __PINNER_LINUX__
#if defined __PINNER_LINUX__
int err = pthread_getaffinity_np(pthread_self(), bit_parts_size, cpu_set);
if (err)
#else // defined __PINNER_WINDOWS__
#else // defined __PINNER_WINDOWS__
bool retval = GetThreadGroupAffinity(GetCurrentThread(), &ga);
if (!retval)
#endif
#endif
{
status--;
}
Expand All @@ -147,15 +145,15 @@ struct cpu_mask_t
{
if (status == 0)
{
#if defined __PINNER_LINUX__
#if defined __PINNER_LINUX__

int err = pthread_setaffinity_np(pthread_self(), bit_parts_size, cpu_set);
if (err)
#else // defined __PINNER_WINDOWS__
#else // defined __PINNER_WINDOWS__

bool retval = SetThreadGroupAffinity(GetCurrentThread(), &ga, NULL);
if (!retval)
#endif
#endif
{
status--;
}
Expand All @@ -168,13 +166,13 @@ struct cpu_mask_t
{
if (status == 0)
{
#if defined __PINNER_LINUX__
#if defined __PINNER_LINUX__
CPU_ZERO_S(bit_parts_size, cpu_set);
CPU_SET_S(cpu_idx, bit_parts_size, cpu_set);
#else // defined __PINNER_WINDOWS__
#else // defined __PINNER_WINDOWS__
ga.Group = cpu_idx / MASK_WIDTH;
ga.Mask = cpu_idx % MASK_WIDTH;
#endif
#endif
}

return status;
Expand All @@ -184,12 +182,12 @@ struct cpu_mask_t

~cpu_mask_t()
{
#if defined __PINNER_LINUX__
#if defined __PINNER_LINUX__
if (cpu_set != NULL)
{
CPU_FREE(cpu_set);
}
#endif
#endif

return;
} // ~cpu_mask_t()
Expand Down Expand Up @@ -388,34 +386,4 @@ DAAL_EXPORT void _thread_pinner_on_scheduler_exit(bool p)
IMPL->on_scheduler_exit(p);
}

#else /* if __DO_TBB_LAYER__ is not defined */

DAAL_EXPORT void * _getThreadPinner(bool create_pinner, void (*read_topo)(int &, int &, int &, int **), void (*deleter)(void *))
{
return NULL;
}

DAAL_EXPORT void _thread_pinner_thread_pinner_init(void (*f)(int &, int &, int &, int **), void (*deleter)(void *)) {}
DAAL_EXPORT void _thread_pinner_execute(daal::services::internal::thread_pinner_task_t & task)
{
task();
}
DAAL_EXPORT bool _thread_pinner_get_pinning()
{
return false;
}
DAAL_EXPORT bool _thread_pinner_set_pinning(bool p)
{
return true;
}
DAAL_EXPORT int _thread_pinner_get_status()
{
return 0;
}

DAAL_EXPORT void _thread_pinner_on_scheduler_entry(bool p) {}
DAAL_EXPORT void _thread_pinner_on_scheduler_exit(bool p) {}

#endif /* if __DO_TBB_LAYER__ is not defined */

#endif /* #if !defined (DAAL_THREAD_PINNING_DISABLED) */
Loading

0 comments on commit 99570f8

Please sign in to comment.