Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE. Wip/get thread id schedules #6674

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
83 changes: 83 additions & 0 deletions docs/source/user/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -551,4 +551,87 @@ The report is split into the following sections:
``$const58.3 = const(int, 1)`` comes from the source ``b[j + 1]``, the
number ``1`` is clearly a constant and so can be hoisted out of the loop.

.. _numba-parallel-scheduling:

Scheduling
==========

By default, Numba divides the iterations of a parallel region into approximately equal
sized chunks and gives one such chunk to each configured thread.
(See :ref:`setting_the_number_of_threads`).
This scheduling approach is equivalent to OpenMP's static schedule with no specified
chunk size and is appropriate when the work required for each iteration is nearly constant.

Conversely, if the work required per iteration varies significantly then this static
scheduling approach can lead to load imbalances and longer execution times. In such cases,
Numba provides a mechanism to control how many iterations of a parallel region
(i.e., the chunk size) go into each chunk.
Numba then computes the number of required chunks which is
equal to the number of iterations divided by the chunk size, truncated to the nearest
integer. All of these chunks are then approximately, equally sized.
Numba then gives one such chunk to each configured
thread as above and when a thread finishes a chunk, Numba gives that thread the next
available chunk. This scheduling approach is similar to OpenMP's dynamic scheduling
option with the specified chunk size. To minimize execution time, the programmer must
pick a chunk size that strikes a balance between greater load balancing with smaller
chunk sizes and less scheduling overhead with larger chunk sizes.

There are some cases in which the actual chunk sizes may differ from the requested
chunk size. First, if the number of required chunks based on the specified chunk size
is less than the number of configured threads then Numba will use all of the configured
threads to execute the parallel region. In this case, the actual chunk size will be
less than the requested chunk size. Second, due to truncation, in cases where the
iteration count is slightly less than a multiple of the chunk size
(e.g., 14 iterations and a specified chunk size of 5), the actual chunk size will be
larger than the specified chunk size. As in the given example, the number of chunks
would be 2 and the actual chunk size would be 7 (i.e., 14 / 2). Lastly, since Numba
divides an N-dimensional iteration space into N-dimensional (hyper)rectangular chunks,
it may be the case there are not N integer factors whose product is equal to the chunk
size. In this case, some chunks will have an area/volume larger than the chunk size
whereas others will be less than the specified chunk size.

The number of iterations of a parallel region in a chunk is stored as a thread-local
variable and can be set using
:func:`numba.set_parallel_chunksize`. This function takes one integer parameter
whose value must be greater than
or equal to 0. A value of 0 is the default value and instructs Numba to use the
static scheduling approach above. Values greater than 0 instruct Numba to use that value
as the chunk size in the dynamic scheduling approach described above.
The current value of this thread local variable is used by all subsequent parallel regions
invoked by this thread.
:func:`numba.set_parallel_chunksize` returns the previous value of the chunksize.
The current value of the parallel chunk size can be obtained from
:func:`numba.get_parallel_chunksize`.
Both of these functions can be used from standard Python and from Numba jitted functions
as shown below. Both invocations of ``func1`` would be executed with a chunk size of 4 whereas
``func2`` would use a chunk size of 8.

.. literalinclude:: ../../../numba/tests/doc_examples/test_parallel_chunksize.py
:language: python
:caption: from ``test_chunksize_manual`` of ``numba/tests/doc_examples/test_parallel_chunksize.py``
:start-after: magictoken.ex_chunksize_manual.begin
:end-before: magictoken.ex_chunksize_manual.end
:dedent: 12
:linenos:

Since this idiom of saving and restoring is so common, Numba provides the
:func:`parallel_chunksize` with clause context-manager to simplify the idiom.
As shown below, this with clause can be invoked from both standard Python and
within Numba jitted functions. As with other Numba context-managers, be
aware that returns and raises are not supported from within a context managed
block that is part of a Numba jitted function.

.. literalinclude:: ../../../numba/tests/doc_examples/test_parallel_chunksize.py
:language: python
:caption: from ``test_chunksize_with`` of ``numba/tests/doc_examples/test_parallel_chunksize.py``
:start-after: magictoken.ex_chunksize_with.begin
:end-before: magictoken.ex_chunksize_with.end
:dedent: 12
:linenos:

Note that these functions to set the chunk size only have an effect on
Numba automatic parallelization with the :ref:`parallel_jit_option` option.
Chunk size specification has no effect on the :func:`~numba.vectorize` decorator
or the :func:`~numba.guvectorize` decorator.

.. seealso:: :ref:`parallel_jit_option`, :ref:`Parallel FAQs <parallel_FAQs>`
6 changes: 5 additions & 1 deletion numba/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@

# Re-export vectorize decorators and the thread layer querying function
from numba.np.ufunc import (vectorize, guvectorize, threading_layer,
get_num_threads, set_num_threads)
get_num_threads, set_num_threads,
set_parallel_chunksize, get_parallel_chunksize)

# Re-export Numpy helpers
from numba.np.numpy_support import carray, farray, from_dtype
Expand All @@ -52,6 +53,7 @@
# Initialize withcontexts
import numba.core.withcontexts
from numba.core.withcontexts import objmode_context as objmode
from numba.core.withcontexts import parallel_chunksize

# Keep this for backward compatibility.
test = runtests.main
Expand All @@ -76,6 +78,8 @@
literal_unroll
get_num_threads
set_num_threads
set_parallel_chunksize
get_parallel_chunksize
""".split() + types.__all__ + errors.__all__


Expand Down
2 changes: 1 addition & 1 deletion numba/core/cgutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def alloca_once(builder, ty, size=None, name='', zfill=False):
with builder.goto_entry_block():
ptr = builder.alloca(ty, size=size, name=name)
# Always zero-fill at init-site. This is safe.
builder.store(ptr.type.pointee(None), ptr)
builder.store(ty(None), ptr)
# Also zero-fill at the use-site
if zfill:
builder.store(ptr.type.pointee(None), ptr)
Expand Down
2 changes: 1 addition & 1 deletion numba/core/ir_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ def find_global_value(func_ir, var):
or raise GuardException otherwise.
"""
dfn = get_definition(func_ir, var)
if isinstance(dfn, ir.Global):
if isinstance(dfn, (ir.Global, ir.FreeVar)):
return dfn.value

if isinstance(dfn, ir.Expr) and dfn.op == 'getattr':
Expand Down
62 changes: 62 additions & 0 deletions numba/core/withcontexts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from numba.core import types, errors, ir, sigutils, ir_utils
from numba.core.typing.typeof import typeof_impl
from numba.core.transforms import find_region_inout_vars
import numba


class WithContext(object):
Expand Down Expand Up @@ -399,3 +400,64 @@ def _mutate_with_block_callee(blocks, blk_start, blk_end, inputs, outputs):
block=ir.Block(scope=scope, loc=loc),
outputs=outputs,
)

class _ParallelChunksize(WithContext):
is_callable = True

"""A context-manager that remembers and saves away the current
chunksize for executing parfors and then changes that chunksize
to the programmer specified value. At the end of the region,
the original chunksize is restored.
"""
def mutate_with_body(self, func_ir, blocks, blk_start, blk_end,
body_blocks, dispatcher_factory, extra):
ir_utils.dprint_func_ir(func_ir, "Before with changes", blocks=blocks)
assert extra is not None
args = extra["args"]
assert len(args) == 1
arg = args[0]
scope = blocks[blk_start].scope
loc = blocks[blk_start].loc
if isinstance(arg, ir.Arg):
arg = ir.Var(scope, arg.name, loc)

set_state = []
restore_state = []

# global for Numba itself
gvar = ir.Var(scope, ir_utils.mk_unique_var("$ngvar"), loc)
set_state.append(ir.Assign(ir.Global('numba', numba, loc), gvar, loc))
# getattr for set chunksize function in Numba
spcattr = ir.Expr.getattr(gvar, 'set_parallel_chunksize', loc)
spcvar = ir.Var(scope, ir_utils.mk_unique_var("$spc"), loc)
set_state.append(ir.Assign(spcattr, spcvar, loc))
# call set_parallel_chunksize
orig_pc_var = ir.Var(scope, ir_utils.mk_unique_var("$save_pc"), loc)
cs_var = ir.Var(scope, ir_utils.mk_unique_var("$cs_var"), loc)
set_state.append(ir.Assign(arg, cs_var, loc))
spc_call = ir.Expr.call(spcvar, [cs_var], (), loc)
set_state.append(ir.Assign(spc_call, orig_pc_var, loc))

restore_spc_call = ir.Expr.call(spcvar, [orig_pc_var], (), loc)
restore_state.append(ir.Assign(restore_spc_call, orig_pc_var, loc))

blocks[blk_start].body = (blocks[blk_start].body[1:-1] +
set_state +
[blocks[blk_start].body[-1]])
blocks[blk_end].body = restore_state + blocks[blk_end].body
ir_utils.dprint_func_ir(func_ir, "After with changes", blocks=blocks)

def __call__(self, *args, **kwargs):
assert len(args) == 1
assert not kwargs
self.chunksize = args[0]
return self

def __enter__(self):
self.orig_chunksize = numba.get_parallel_chunksize()
numba.set_parallel_chunksize(self.chunksize)

def __exit__(self, typ, val, tb):
numba.set_parallel_chunksize(self.orig_chunksize)

parallel_chunksize = _ParallelChunksize()
4 changes: 3 additions & 1 deletion numba/np/ufunc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from numba.np.ufunc._internal import PyUFunc_None, PyUFunc_Zero, PyUFunc_One
from numba.np.ufunc import _internal, array_exprs
from numba.np.ufunc.parallel import (threading_layer, get_num_threads,
set_num_threads, _get_thread_id)
set_num_threads, _get_thread_id,
set_parallel_chunksize,
get_parallel_chunksize)


if hasattr(_internal, 'PyUFunc_ReorderableNone'):
Expand Down
78 changes: 78 additions & 0 deletions numba/np/ufunc/gufunc_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@
#include <stdio.h>
#include "gufunc_scheduler.h"

#ifdef _MSC_VER
#define THREAD_LOCAL(ty) __declspec(thread) ty
#else
/* Non-standard C99 extension that's understood by gcc and clang */
#define THREAD_LOCAL(ty) __thread ty
#endif

// Default 0 value means one evenly-sized chunk of work per worker thread.
static THREAD_LOCAL(uintp) parallel_chunksize = 0;
#define _NO_SCHEDULE_SET_SIZE uintp(-1)
// Default -1 value means schedule size has not been computed.
static THREAD_LOCAL(uintp) _TLS_schedule_size = _NO_SCHEDULE_SET_SIZE;
THREAD_LOCAL(void *) _TLS_schedule = NULL;

// round not available on VS2010.
double guround (double number) {
return number < 0.0 ? ceil(number - 0.5) : floor(number + 0.5);
Expand Down Expand Up @@ -68,8 +82,56 @@ class RangeActual {
}
return ret;
}

uintp total_size() const {
std::vector<intp> per_dim = iters_per_dim();
uintp res = 1;
for (unsigned i = 0; i < per_dim.size(); ++i) {
res *= per_dim[i];
}
return res;
}
};


extern "C" uintp set_parallel_chunksize(uintp n) {
uintp orig = parallel_chunksize;
parallel_chunksize = n;
return orig;
}

extern "C" uintp get_parallel_chunksize() {
return parallel_chunksize;
}

extern "C" uintp compute_sched_size(uintp num_threads, uintp num_dim, intp *starts, intp *ends) {
if (parallel_chunksize == 0) {
return num_threads;
}
RangeActual ra(num_dim, starts, ends);
uintp total_work_size = ra.total_size();
uintp num_divisions = total_work_size / parallel_chunksize;
uintp ret = num_divisions < num_threads ? num_threads : num_divisions;
// printf("total_work_size=%zd num_divisions=%zd return=%zd\n", total_work_size, num_divisions, ret);
return ret;
}

extern "C" uintp get_sched_size(void) {
return _TLS_schedule_size;
}

extern "C" void set_sched_size(uintp size) {
_TLS_schedule_size = size;
}

extern "C" void * get_sched(void) {
return _TLS_schedule;
}

extern "C" void set_sched(void * sched) {
_TLS_schedule = sched;
}

class dimlength {
public:
uintp dim;
Expand Down Expand Up @@ -348,6 +410,14 @@ extern "C" void do_scheduling_signed(uintp num_dim, intp *starts, intp *ends, ui
RangeActual full_space(num_dim, starts, ends);
std::vector<RangeActual> ret = create_schedule(full_space, num_threads);
flatten_schedule(ret, sched);
// This isn't ideal, but the schedule and its size need to be accessible
// somewhere so that they can be sanity checked in testing.
_TLS_schedule_size = compute_sched_size(num_threads, num_dim, starts, ends);
if (_TLS_schedule_size != _NO_SCHEDULE_SET_SIZE) {
_TLS_schedule = (void *)sched;
} else {
_TLS_schedule = NULL; // in case stale schedule pointer
}
}

extern "C" void do_scheduling_unsigned(uintp num_dim, intp *starts, intp *ends, uintp num_threads, uintp *sched, intp debug) {
Expand All @@ -367,4 +437,12 @@ extern "C" void do_scheduling_unsigned(uintp num_dim, intp *starts, intp *ends,
RangeActual full_space(num_dim, starts, ends);
std::vector<RangeActual> ret = create_schedule(full_space, num_threads);
flatten_schedule(ret, sched);
// This isn't ideal, but the schedule and its size need to be accessible
// somewhere so that they can be sanity checked in testing.
_TLS_schedule_size = compute_sched_size(num_threads, num_dim, starts, ends);
if (_TLS_schedule_size != _NO_SCHEDULE_SET_SIZE) {
_TLS_schedule = (void *)sched;
} else {
_TLS_schedule = NULL; // in case stale schedule pointer
}
}
8 changes: 8 additions & 0 deletions numba/np/ufunc/gufunc_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ extern "C"

void do_scheduling_signed(uintp num_dim, intp *starts, intp *ends, uintp num_threads, intp *sched, intp debug);
void do_scheduling_unsigned(uintp num_dim, intp *starts, intp *ends, uintp num_threads, uintp *sched, intp debug);
uintp set_parallel_chunksize(uintp);
uintp get_parallel_chunksize(void);
uintp compute_sched_size(uintp num_threads, uintp num_dim, intp *starts, intp *ends);
uintp get_sched_size(void);
void set_sched_size(uintp);
void * get_sched(void);
void set_sched(void*);


#ifdef __cplusplus
}
Expand Down
25 changes: 22 additions & 3 deletions numba/np/ufunc/omppool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ get_thread_id(void)
}

static void
add_task(void *fn, void *args, void *dims, void *steps, void *data)
add_task(void *fn, void *args, void *dims, void *steps, void *data,
bool broadcast)
{
puts("Running add_task() with omppool sequentially");
typedef void (*func_ptr_t)(void *args, void *dims, void *steps, void *data);
Expand Down Expand Up @@ -152,16 +153,24 @@ parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *dat
printf("\n");
}

// Fetch the schedule data from the main thread, this will be broadcast
// in the parallel region.
uintp sched_size = get_sched_size();
void * schedule = get_sched();

// Set the thread mask on the pragma such that the state is scope limited
// and passed via a register on the OMP region call site, this limiting
// global state and racing
#pragma omp parallel num_threads(num_threads), shared(agreed_nthreads)
#pragma omp parallel num_threads(num_threads), shared(agreed_nthreads, sched_size, schedule)
{
size_t * count_space = (size_t *)alloca(sizeof(size_t) * arg_len);
char ** array_arg_space = (char**)alloca(sizeof(char*) * array_count);

// tell the active thread team about the number of threads
// tell the active thread team about the number of threads, schedule
// size and the schedule pointer
set_num_threads(agreed_nthreads);
set_sched_size(sched_size);
set_sched(schedule);

#pragma omp for
for(ptrdiff_t r = 0; r < size; r++)
Expand Down Expand Up @@ -265,5 +274,15 @@ MOD_INIT(omppool)
PyLong_FromVoidPtr((void*)&get_num_threads));
PyObject_SetAttrString(m, "get_thread_id",
PyLong_FromVoidPtr((void*)&get_thread_id));
PyObject_SetAttrString(m, "set_parallel_chunksize",
PyLong_FromVoidPtr((void*)&set_parallel_chunksize));
PyObject_SetAttrString(m, "get_parallel_chunksize",
PyLong_FromVoidPtr((void*)&get_parallel_chunksize));
PyObject_SetAttrString(m, "compute_sched_size",
PyLong_FromVoidPtr((void*)&compute_sched_size));
PyObject_SetAttrString(m, "get_sched_size",
PyLong_FromVoidPtr((void*)&get_sched_size));
PyObject_SetAttrString(m, "get_sched",
PyLong_FromVoidPtr((void*)&get_sched));
return MOD_SUCCESS_VAL(m);
}