Skip to content

Conversation

mroeschke
Copy link
Collaborator

@mroeschke mroeschke commented Jul 26, 2021

General Flow:

  1. An Indexer class determines the start and end bounds for the mean | groupby mean | rolling mean operation
  2. A numba function prange loops over each DataFrame column applying a mean function given the start and end bounds

Mean functions tested:

  1. Sliding (window) mean
  2. np.nanmean parallelized over each start and end bounds
  3. TODO: Dispatch to Cython C func

Performance Table:

df.shape = (10_000, 10_001) Sliding Mean Pure Numba np.nanmean Pure Numba
mean 78.4 µs 229 µs
groupby mean (1000 groups) 368 µs 546 µs
rolling mean (window size =2) 111 µs 277 µs

Performance Table (parallel columns=True / parallel start & end bounds=True):

df.shape = (1_000_000, 1001) Sliding Mean Pure Numba np.nanmean Pure Numba
mean 22.7 µs 137 µs
groupby mean (10000 groups) 351 µs 561 µs
rolling mean (window size =2) 71.9 µs 301 µs

Performance Table (parallel columns=True / parallel start & end bounds=False):

df.shape = (1_000_000, 1001) Sliding Mean Pure Numba np.nanmean Pure Numba
mean 18.9 µs 18.9 µs
groupby mean (10000 groups) 307 µs 338 µs
rolling mean (window size =2) 60.6 µs 60.3 µs

Performance Table (parallel columns=False / parallel start & end bounds =True):

df.shape = (1_000_000, 1001) Sliding Mean Pure Numba np.nanmean Pure Numba
mean 7.36 µs 34 µs
groupby mean (10000 groups) 247 µs 336 µs
rolling mean (window size =2) 36.8 µs 65.6 µs

Performance Table (parallel columns=False / parallel start & end bounds=False):

df.shape = (1_000_000, 1001) Sliding Mean Pure Numba np.nanmean Pure Numba
mean 8.32 µs 9.88 µs
groupby mean (10000 groups) 271 µs 284 µs
rolling mean (window size =2) 36.8 µs 40.6 µs

@mroeschke
Copy link
Collaborator Author

Test script:

In [2]: import warnings
   ...:
   ...: from numba.core.errors import NumbaPerformanceWarning
   ...: import numpy as np
   ...: import pandas as pd
   ...:
   ...: from pandas.core import shared_executer, kernels
   ...:
   ...: warnings.simplefilter('ignore', NumbaPerformanceWarning)
   ...:
   ...: configs = [
   ...:     ["mean", shared_executer.mean, {"min_periods": 0}, lambda x: x.mean()],
   ...:     ["groupby mean", shared_executer.groupby_mean, {"groupby_key": "A", "min_periods": 0}, lambda x: x.groupby("A").mean()],
   ...:     ["rolling mean", shared_executer.rolling_mean, {"window": 2, "min_periods": 0}, lambda x: x.rolling(2, min_periods=0).mean()]
   ...: ]
   ...:
   ...: kernel_funcs = [
   ...:     ["Sliding Window Mean", kernels.sliding_mean_pure_numba],
   ...:     ["Numpy nanmean", kernels.np_nanmean_pure_numba],
   ...: ]
   ...: for method_name, shared_func, shared_params, original_func in configs:
   ...:     for kernel_name, kernel_func in kernel_funcs:
   ...:         print(f"{method_name}: {kernel_name}")
   ...:         simple_data = pd.DataFrame({"A": np.repeat(np.arange(5), 2), "B": range(10)})
   ...:         shared_params["kernel_func"] = kernel_func
   ...:         shared_params["kernel_type"] = kernel_name
   ...:         print("Original")
   ...:         print(original_func(simple_data))
   ...:         print("Shared")
   ...:         print(shared_func(simple_data, **shared_params))
   ...:         print()
   ...:         perf_data = pd.DataFrame(np.random.rand(10000, 10000))
   ...:         perf_data["A"] = np.repeat(np.arange(1000), 10)
   ...:         print(f"Performance on {perf_data.shape}")
   ...:         %timeit shared_func(simple_data, **shared_params)
   ...:
mean: Sliding Window Mean
Original
A    2.0
B    4.5
dtype: float64
Shared
[[2.  4.5]]

Performance on (10000, 10001)
78.4 µs ± 1.44 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
mean: Numpy nanmean
Original
A    2.0
B    4.5
dtype: float64
Shared
[[2.  4.5]]

Performance on (10000, 10001)
229 µs ± 5.14 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
groupby mean: Sliding Window Mean
Original
     B
A
0  0.5
1  2.5
2  4.5
3  6.5
4  8.5
Shared
[[0.  0.5]
 [1.  2.5]
 [2.  4.5]
 [3.  6.5]
 [4.  8.5]]

Performance on (10000, 10001)
368 µs ± 3.96 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
groupby mean: Numpy nanmean
Original
     B
A
0  0.5
1  2.5
2  4.5
3  6.5
4  8.5
Shared
[[0.  0.5]
 [1.  2.5]
 [2.  4.5]
 [3.  6.5]
 [4.  8.5]]

Performance on (10000, 10001)
546 µs ± 2.74 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
rolling mean: Sliding Window Mean
Original
     A    B
0  0.0  0.0
1  0.0  0.5
2  0.5  1.5
3  1.0  2.5
4  1.5  3.5
5  2.0  4.5
6  2.5  5.5
7  3.0  6.5
8  3.5  7.5
9  4.0  8.5
Shared
[[0.  0. ]
 [0.  0.5]
 [0.5 1.5]
 [1.  2.5]
 [1.5 3.5]
 [2.  4.5]
 [2.5 5.5]
 [3.  6.5]
 [3.5 7.5]
 [4.  8.5]]

Performance on (10000, 10001)
111 µs ± 580 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
rolling mean: Numpy nanmean
Original
     A    B
0  0.0  0.0
1  0.0  0.5
2  0.5  1.5
3  1.0  2.5
4  1.5  3.5
5  2.0  4.5
6  2.5  5.5
7  3.0  6.5
8  3.5  7.5
9  4.0  8.5
Shared
[[0.  0. ]
 [0.  0.5]
 [0.5 1.5]
 [1.  2.5]
 [1.5 3.5]
 [2.  4.5]
 [2.5 5.5]
 [3.  6.5]
 [3.5 7.5]
 [4.  8.5]]

Performance on (10000, 10001)
277 µs ± 7.15 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

@mroeschke
Copy link
Collaborator Author

cc @max-sixty you might interested in the performance benchmarking of sliding mean vs np.nanmean

pandas-dev#38895 (comment)

@max-sixty
Copy link

max-sixty commented Jul 26, 2021

Thanks for the CC @mroeschke . That's a great speedup!

Do you know whether numba will inline the add_mean_numba etc functions in the sliding window loop? I'm guessing it does from the performance numbers...

Here's the equivalent code in numbagg: https://github.com/numbagg/numbagg/blob/main/numbagg/moving.py#L82-L123. Assuming the function inline, I guess the performance should be similar — though I'm not sure whether the gufunc machinery gives prange for free or not at all, which will have a big impact.

@max-sixty
Copy link

max-sixty commented Jul 27, 2021

It looks like numba's gufuncs do parallelize, equivalent to parallel=True. If you want another candidate to compare to, here's the numbagg equivalent:

In [14]: import numbagg

In [16]: df = pd.DataFrame(np.random.rand(10000, 10000))

In [21]: %timeit numbagg.move_mean(df.values, 2)
698 ms ± 25.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [22]: %timeit df.rolling(2).mean()
5.37 s ± 208 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

@mroeschke
Copy link
Collaborator Author

Thanks for the numbagg comparison!

I am not exactly sure how to check if numba in inlining add_mean_numba/remove_mean_numba. I can try directly inserting the logic in this function in the loop to see if the performance is similar.

@mroeschke
Copy link
Collaborator Author

mroeschke commented Aug 9, 2021

Note to self. Using np.nanmean with parallel=True over the columns and parallel=True over the start and end bounds doesn't seem to parallelize all the loops:

================================================================================
 Parallel Accelerator Optimizing:  Function
generate_numba_func.<locals>.column_looper, /Users/matthewroeschke/pandas-
twosigma/pandas/core/shared_executer.py (13)
================================================================================


Parallel loop listing for  Function generate_numba_func.<locals>.column_looper, /Users/matthewroeschke/pandas-twosigma/pandas/core/shared_executer.py (13)
------------------------------------------------------------------|loop #ID
    @numba.jit(nopython=True, nogil=True, parallel=True)          |
    def column_looper(                                            |
        df_values,                                                |
        start,                                                    |
        end,                                                      |
        min_periods,                                              |
    ):                                                            |
        result = np.empty((len(start), df_values.shape[1]))       |
        for i in numba.prange(df_values.shape[1]):----------------| #4
            values = df_values[:, i]                              |
            sub_result = np.empty(len(start))                     |
            for j in numba.prange(len(start)):--------------------| #3
                s = start[j]                                      |
                e = end[j]                                        |
                window = values[s:e]                              |
                num_nan = np.sum(np.isnan(window))----------------| #1, 2
                if num_nan > min_periods:                         |
                    sub_result[j] = np.nan                        |
                else:                                             |
                    sub_result[j] = np.nanmean(window)            |
            result[:, i] = sub_result-----------------------------| #0
        return result                                             |
--------------------------------- Fusing loops ---------------------------------
Attempting fusion of parallel loops (combines loops with similar properties)...
----------------------------- Before Optimisation ------------------------------
Parallel region 0:
+--4 (parallel)
   +--0 (parallel)
   +--3 (parallel)
      +--1 (parallel)
      +--2 (parallel)


--------------------------------------------------------------------------------
------------------------------ After Optimisation ------------------------------
Parallel region 0:
+--4 (parallel)
   +--0 (serial)
   +--3 (serial)
      +--1 (serial)
      +--2 (serial)



Parallel region 0 (loop #4) had 0 loop(s) fused and 4 loop(s) serialized as part
 of the larger parallel loop (#4).
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------

---------------------------Loop invariant code motion---------------------------
Allocation hoisting:
The memory allocation derived from the instruction at
/Users/matthewroeschke/pandas-twosigma/pandas/core/shared_executer.py (23) is
hoisted out of the parallel loop labelled #4 (it will be performed before the
loop is executed and reused inside the loop):
   Allocation:: sub_result = np.empty(len(start))
    - numpy.empty() is used for the allocation.

Instruction hoisting:
loop #4:
  Has the following hoisted:
    $const48.3 = const(NoneType, None)
    $const50.4 = const(NoneType, None)
    $52build_slice.5 = global(slice: <class 'slice'>)
    $52build_slice.6 = call $52build_slice.5($const48.3, $const50.4, func=$52build_slice.5, args=(Var($const48.3, shared_executer.py:22), Var($const50.4, shared_executer.py:22)), kws=(), vararg=None)
    $62load_global.10 = global(np: <module 'numpy' from '/Users/matthewroeschke/opt/miniconda3/envs/pandas-dev-twosigma/lib/python3.8/site-packages/numpy/__init__.py'>)
    $64load_method.11 = getattr(value=$62load_global.10, attr=empty)
    sub_result = call $64load_method.11(start__size0_34, func=$64load_method.11, args=[Var(start__size0_34, shared_executer.py:20)], kws=(), vararg=None)
    $const178.3 = const(NoneType, None)
    $const180.4 = const(NoneType, None)
    $182build_slice.5 = global(slice: <class 'slice'>)
    $182build_slice.6 = call $182build_slice.5($const178.3, $const180.4, func=$182build_slice.5, args=(Var($const178.3, shared_executer.py:33), Var($const180.4, shared_executer.py:33)), kws=(), vararg=None)
  Failed to hoist the following:
    dependency: i = $parfor__index_71.197
    dependency: $56build_tuple.8 = build_tuple(items=[Var($52build_slice.6, shared_executer.py:22), Var($parfor__index_71.197, <string>:2)])
    dependency: values = getitem(value=df__values, index=$56build_tuple.8, fn=<built-in function getitem>)
    dependency: $186build_tuple.8 = build_tuple(items=[Var($182build_slice.6, shared_executer.py:33), Var($parfor__index_71.197, <string>:2)])

(This is the version that was originally being called)

================================================================================
 Parallel Accelerator Optimizing:  Function
generate_numba_func.<locals>.column_looper, /Users/matthewroeschke/pandas-
twosigma/pandas/core/shared_executer.py (13)
================================================================================


Parallel loop listing for  Function generate_numba_func.<locals>.column_looper, /Users/matthewroeschke/pandas-twosigma/pandas/core/shared_executer.py (13)
-----------------------------------------------------------------------------|loop #ID
    @numba.jit(nopython=True, nogil=True, parallel=True)                     |
    def column_looper(                                                       |
        df_values,                                                           |
        start,                                                               |
        end,                                                                 |
        min_periods,                                                         |
    ):                                                                       |
        result = np.empty((len(start), df_values.shape[1]))                  |
        for i in numba.prange(df_values.shape[1]):---------------------------| #1
            result[:, i] = func(df_values[:, i], start, end, min_periods)----| #0
        return result                                                        |
--------------------------------- Fusing loops ---------------------------------
Attempting fusion of parallel loops (combines loops with similar properties)...
----------------------------- Before Optimisation ------------------------------
Parallel region 0:
+--1 (parallel)
   +--0 (parallel)


--------------------------------------------------------------------------------
------------------------------ After Optimisation ------------------------------
Parallel region 0:
+--1 (parallel)
   +--0 (serial)



Parallel region 0 (loop #1) had 0 loop(s) fused and 1 loop(s) serialized as part
 of the larger parallel loop (#1).
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------

---------------------------Loop invariant code motion---------------------------
Allocation hoisting:
No allocation hoisting found

Instruction hoisting:
loop #1:
  Has the following hoisted:
    $46load_deref.2 = freevar(func: CPUDispatcher(<function sliding_mean_pure_numba at 0x7fc4cc503430>))
    $const50.4 = const(NoneType, None)
    $const52.5 = const(NoneType, None)
    $54build_slice.6 = global(slice: <class 'slice'>)
    $54build_slice.7 = call $54build_slice.6($const50.4, $const52.5, func=$54build_slice.6, args=(Var($const50.4, shared_executer.py:22), Var($const52.5, shared_executer.py:22)), kws=(), vararg=None)
    $const72.16 = const(NoneType, None)
    $const74.17 = const(NoneType, None)
    $76build_slice.18 = global(slice: <class 'slice'>)
    $76build_slice.19 = call $76build_slice.18($const72.16, $const74.17, func=$76build_slice.18, args=(Var($const72.16, shared_executer.py:22), Var($const74.17, shared_executer.py:22)), kws=(), vararg=None)
    msg.95 = const(str, Sizes of result, $68call_function.14 do not match on /Users/matthewroeschke/pandas-twosigma/pandas/core/shared_executer.py (22))
    assert.96 = global(assert_equiv: <intrinsic assert_equiv>)
  Failed to hoist the following:
    dependency: i = $i.172
    dependency: $58build_tuple.9 = build_tuple(items=[Var($54build_slice.7, shared_executer.py:22), Var($i.172, <string>:2)])
    dependency: $68call_function.14 = call $46load_deref.2($60binary_subscr.10, start, end, min__periods, func=$46load_deref.2, args=[Var($60binary_subscr.10, shared_executer.py:22), Var(start, shared_executer.py:20), Var(end, shared_executer.py:20), Var(min__periods, shared_executer.py:20)], kws=(), vararg=None)
    dependency: $68call_function.14_shape.91 = getattr(value=$68call_function.14, attr=shape)
    dependency: $68call_function.14_size0.92 = static_getitem(value=$68call_function.14_shape.91, index=0, index_var=None, fn=<built-in function getitem>)
    dependency: $80build_tuple.21 = build_tuple(items=[Var($76build_slice.19, shared_executer.py:22), Var($i.172, <string>:2)])
    dependency: ret.97 = call $push_global_to_block.181(msg.95, start__size0_82, $68call_function.14_size0.92, func=$push_global_to_block.181, args=[Var(msg.95, shared_executer.py:22), Var(start__size0_82, shared_executer.py:20), Var($68call_function.14_size0.92, shared_executer.py:22)], kws={}, vararg=None)

numba version 0.53.1

cc @stuartarchibald if you have any ideas if the function above could be refactored to not serialize the inner loops (PS thanks for being a great resource for answering numba questions)

@stuartarchibald
Copy link

@mroeschke RE serialization of inner loops.

Numba will by default serialize the nested parallelism. This is because the outer most loop contains the most work and distributing this often gives best performance.

Consider the following:

@njit(parallel=True)
def foo(x, y, z):
  for i in prange(x):
    for j in prange(y):
      for k in prange(z):
        #<work>

The default is that the loop in x will be parallel (it has the most work) and the loops in y and z will be serialized. Numba's threadpool has a default size of "number of available cores" (call this nthreads), were Numba to actually create a triple nested parallel loop, this could end up with nthreads running nthreads running nthreads and would potentially lead to massive over subscription of compute resources. In practice it's only the OpenMP and TBB backends that support such a thing and they'd likely prevent a degree of over subscription, but this ends up backend-specific-detail orientated.

Obviously there are use cases where you might want to either oversubscribe resources or run a n x m thread pool with n thread teams of m threads, e.g. consider locality/affinity. To do this you need to wrap the inner loops in a function call as the compiler won't serialize loops through function call boundaries. Numba also provides routines to set the number of threads in a given region (docs).

Example:

@njit(parallel=True)
def inner_foo(y, z)
    for j in prange(y):  # parallel loop
      for k in prange(z): # serial loop
        #<work>

@njit(parallel=True)
def foo(x, y, z):
  for i in prange(x): # parallel loop
    inner_foo(y, z)

As an aside, you'll also note that there's numerous "other" parallel loops spotted, this is because Numba is analysing array expressions for parallel loops and also has a number of specialised parallel versions of routines e.g. np.zeros. Once the parallelism is identified Numba then tries to fuse loops that are the same size and will do the serialisation of nested loops as described above.

if you have any ideas if the function above could be refactored to not serialize the inner loops

Hopefully the above answers this, but I think the question might need to be "What do I really want to run in parallel"?

(PS thanks for being a great resource for answering numba questions)

No problem, thanks for trying all these things out. Practical use cases and feedback help drive Numba's design and development! So thank you!!!

@mroeschke
Copy link
Collaborator Author

Closing as a POC

@mroeschke mroeschke closed this Aug 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants