In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
from numba import njit

In [2]:
index = np.random.choice(100, (100, 50))

In [3]:
@njit
def _random_sample_with_replacement(ndat, nrep):
    out = np.empty((ndat, nrep), dtype=np.int64)
    for i in range(ndat):
        for j in range(nrep):
            out[i, j] = np.random.randint(0, ndat)
    return out

In [4]:
@njit
def _random_sample_with_replacement_freq_out(freq):
    nrep = freq.shape[0]
    ndat = freq.shape[1]
    for i in range(nrep):
        for j in range(ndat):
            index = np.random.randint(0, ndat)
            freq[i, index] += 1


@njit
def _random_sample_with_replacement_freq(ndat, nrep):
    """
    instead of building an index for samples build a frequency table

    freq[irep, jdat] = {# of sample of data at (j,..) for sample irep}

    note that this is the transpose of np.random.choice.  but this is faster to construct
    """
    freq = np.zeros((nrep, ndat), dtype=np.int64)
    for i in range(nrep):
        for j in range(ndat):
            index = np.random.randint(0, ndat)
            freq[i, index] += 1

    return freq


import threading


def random_sample_with_replacement_freq_thread(ndat, nrep, nthread):
    out = np.zeros((nrep, ndat), dtype=np.int64)

    chunk_size = (nrep + nthread - 1) // nthread

    # args_list = [(data,_freq) for _freq in np.split(freq, nthread, axis=1)]
    args_list = [
        (out[chunk_size * i : chunk_size * (i + 1), :],) for i in range(nthread)
    ]

    threads = [
        threading.Thread(target=_random_sample_with_replacement_freq_out, args=args)
        for args in args_list
    ]

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    return out


import multiprocessing


def random_sample_with_replacement_freq_mult(ndat, nrep, nthread):
    if nthread < 0:
        nthread = multiprocessing.cpu_count()

    chunk_size = (nrep + nthread - 1) // nthread

    args_list = []
    count = 0
    for i in range(nthread):
        count += chunk_size
        if count > nrep:
            args_list.append((ndat, count - nrep))
        else:
            args_list.append((ndat, chunk_size))

    # args_list = [(data,_freq) for _freq in np.split(freq, nthread, axis=1)]
    #     args_list = [(ndat, chunk_size)out[chunk_size*i : chunk_size*(i+1), :],)
    #                  for i in range(nthread)]

    pool = multiprocessing.Pool(processes=nthread)
    pools = [
        pool.apply_async(_random_sample_with_replacement_freq, args=args)
        for args in args_list
    ]

    outputs = [p.get() for p in pools]
    return np.concatenate(outputs, axis=0)

In [39]:
ndat = 100
nrep = ndat * 10

In [40]:
from importlib import reload
import accumulator as acc

reload(acc)

<module 'accumulator' from '/Users/wpk/Documents/python/projects/python_dropin_files/accumulator.py'>

In [41]:
ndat = 300
nrep = ndat * 100

In [42]:
reload(acc)

<module 'accumulator' from '/Users/wpk/Documents/python/projects/python_dropin_files/accumulator.py'>

In [43]:
index, freq = acc.randsamp_numpy(ndat, nrep)

In [127]:
# create synthetic data
x = np.random.rand(10000, ndat, 1)

In [128]:
s = acc.StatsAccumVec.from_vals(x)

In [129]:
g = s.to_array()
s2 = g.resample(index)

In [130]:
s.data.shape

(300, 1, 3)

In [132]:
out = acc.resample_data(s.data, freq, parallel=True)

In [133]:
np.testing.assert_allclose(s2.data, out)

In [134]:
%%timeit -n 1 -r 1
out = acc.resample_data(s.data, freq, parallel=False)

176 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [135]:
%%timeit -n 1 -r 1
out = acc.resample_data(s.data, freq, parallel=True)

12.7 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [136]:
%%timeit -n 1 -r 1
out = acc.resample_data(s.data, freq, nthread=100, parallel=False)

135 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [137]:
%%timeit -n 1 -r 1
out = acc.resample_data(s.data, freq, nproc=5, parallel=False)

277 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [138]:
from numba import prange


@njit(parallel=True)
def _resample_2(data, freq):
    ndata = data.shape[0]
    nmeta = data.shape[1]

    ndat = freq.shape[0]
    nrep = freq.shape[1]
    assert ndat == ndata

    out = np.zeros((nrep, nmeta, 3))
    for idat in range(ndat):
        for imeta in range(nmeta):
            _w = data[idat, imeta, 0]
            if _w == 0.0:
                continue
            a = data[idat, imeta, 1]
            v = data[idat, imeta, 2]
            for irep in range(nrep):
                f = freq[idat, irep]
                if f == 0.0:
                    continue
                w = _w * f

                weight = out[irep, imeta, 0] + w
                ave = out[irep, imeta, 1]
                var = out[irep, imeta, 2]

                alpha = w / weight
                delta = a - ave
                incr = delta * alpha

                out[irep, imeta, 0] += w
                out[irep, imeta, 1] += incr
                out[irep, imeta, 2] += (v - var) * alpha + (1.0 - alpha) * delta * incr

    return out


@njit
def _push_stat(data, w, a, v):
    if w == 0.0:
        return
    weight = data[0] + w
    ave = data[1]
    var = data[2]

    alpha = w / weight
    delta = a - ave
    incr = delta * alpha

    data[0] += w
    data[1] += incr
    data[2] += (v - var) * alpha + (1.0 - alpha) * (delta * incr)


@njit(parallel=True)
def _resample_3(data, freq, out):
    ndata = data.shape[0]
    nmeta = data.shape[1]

    ndat = freq.shape[0]
    nrep = freq.shape[1]
    assert ndat == ndata

    for idat in range(ndat):
        for imeta in range(nmeta):
            _w = data[idat, imeta, 0]
            if _w == 0.0:
                continue
            a = data[idat, imeta, 1]
            v = data[idat, imeta, 2]
            for irep in prange(nrep):
                f = freq[idat, irep]
                if f == 0.0:
                    continue
                acc._push_stat(out[irep, imeta], _w * f, a, v)


@njit(parallel=True)
def _resample_4(data, freq, out):
    ndata = data.shape[0]
    nmeta = data.shape[1]

    ndat = freq.shape[0]
    nrep = freq.shape[1]
    assert ndat == ndata

    for irep in prange(nrep):
        for idat in range(ndat):
            f = freq[idat, irep]
            if f == 0.0:
                continue

            for imeta in range(nmeta):
                _w = data[idat, imeta, 0]
                if _w == 0.0:
                    continue
                a = data[idat, imeta, 1]
                v = data[idat, imeta, 2]

                acc._push_stat(out[irep, imeta], _w * f, a, v)


@njit(parallel=True)
def _resample_4b(data, freq, out):
    ndata = data.shape[0]
    nmeta = data.shape[1]

    ndat = freq.shape[0]
    nrep = freq.shape[1]
    assert ndat == ndata

    for irep in prange(nrep):
        for idat in range(ndat):
            f = freq[idat, irep]
            if f == 0.0:
                continue

            for imeta in range(nmeta):
                _w = data[idat, imeta, 0]
                if _w == 0.0:
                    continue
                a = data[idat, imeta, 1]
                v = data[idat, imeta, 2]

                _push_stat(out[irep, imeta], _w * f, a, v)


# this seems to be the best one by far.
@njit(parallel=True)
def _resample_2b(data, freq):
    ndata = data.shape[0]
    nmeta = data.shape[1]

    ndat = freq.shape[0]
    nrep = freq.shape[1]
    assert ndat == ndata

    out = np.zeros((nrep, nmeta, 3))
    for irep in prange(nrep):
        for idat in range(ndat):
            f = freq[idat, irep]
            if f == 0.0:
                continue
            for imeta in range(nmeta):
                _w = data[idat, imeta, 0]
                if _w == 0.0:
                    continue
                a = data[idat, imeta, 1]
                v = data[idat, imeta, 2]

                w = _w * f

                weight = out[irep, imeta, 0] + w
                ave = out[irep, imeta, 1]
                var = out[irep, imeta, 2]

                alpha = w / weight
                delta = a - ave
                incr = delta * alpha

                out[irep, imeta, 0] += w
                out[irep, imeta, 1] += incr
                out[irep, imeta, 2] += (v - var) * alpha + (1.0 - alpha) * delta * incr

    return out


# this seems to be the best one by far.
@njit(parallel=True)
def _resample_2b2(data, freq):
    ndata = data.shape[0]
    nmeta = data.shape[1]

    ndat = freq.shape[0]
    nrep = freq.shape[1]
    assert ndat == ndata

    out = np.zeros((nrep, nmeta, 3))
    for irep in prange(nrep):
        for idat in range(ndat):
            f = freq[idat, irep]
            if f == 0.0:
                continue
            for imeta in range(nmeta):
                _w = data[idat, imeta, 0]
                if _w == 0.0:
                    continue
                a = data[idat, imeta, 1]
                v = data[idat, imeta, 2]

                w = _w * f

                weight = out[irep, imeta, 0] + w
                ave = out[irep, imeta, 1]
                var = out[irep, imeta, 2]

                alpha = w / weight
                delta = a - ave
                incr = delta * alpha

                out[irep, imeta, 0] += w
                out[irep, imeta, 1] += incr
                out[irep, imeta, 2] += (v - var) * alpha + (1.0 - alpha) * delta * incr

    return out


@njit(parallel=True)
def _resample_2c(data, freq):
    ndata = data.shape[0]
    nmeta = data.shape[1]

    ndat = freq.shape[0]
    nrep = freq.shape[1]
    assert ndat == ndata

    out = np.zeros((nrep, nmeta, 3))
    for irep in prange(nrep):
        for idat in range(ndat):
            f = freq[idat, irep]
            if f == 0.0:
                continue
            for imeta in range(nmeta):
                _w = data[idat, imeta, 0]
                if _w == 0.0:
                    continue
                a = data[idat, imeta, 1]
                v = data[idat, imeta, 2]
                w = _w * f
                _push_stat(out[irep, imeta], _w * f, a, v)

    return out


@njit(parallel=True)
def _resample_2d(data, freq):
    ndata = data.shape[0]
    nmeta = data.shape[1]

    ndat = freq.shape[0]
    nrep = freq.shape[1]
    assert ndat == ndata

    out = np.zeros((nrep, nmeta, 3))
    for irep in prange(nrep):
        for idat in range(ndat):
            f = freq[idat, irep]
            if f == 0.0:
                continue
            for imeta in range(nmeta):
                _w = data[idat, imeta, 0]
                if _w == 0.0:
                    continue
                a = data[idat, imeta, 1]
                v = data[idat, imeta, 2]
                w = _w * f
                acc._push_stat(out[irep, imeta], _w * f, a, v)

    return out

In [165]:
# this seems to be the best one by far.
@njit(parallel=True)
def _resample_2b(data, freq):
    ndata = data.shape[0]
    nmeta = data.shape[1]

    ndat = freq.shape[0]
    nrep = freq.shape[1]
    assert ndat == ndata

    out = np.zeros((nrep, nmeta, 3))
    for irep in prange(nrep):
        for idat in range(ndat):
            f = freq[idat, irep]
            if f == 0.0:
                continue
            for imeta in range(nmeta):
                _w = data[idat, imeta, 0]
                if _w == 0.0:
                    continue
                a = data[idat, imeta, 1]
                v = data[idat, imeta, 2]

                w = _w * f

                weight = out[irep, imeta, 0] + w
                ave = out[irep, imeta, 1]
                var = out[irep, imeta, 2]

                alpha = w / weight
                delta = a - ave
                incr = delta * alpha

                out[irep, imeta, 0] += w
                out[irep, imeta, 1] += incr
                out[irep, imeta, 2] += (v - var) * alpha + (1.0 - alpha) * delta * incr

    return out

In [168]:
# out2 = np.zeros_like(out)

out2 = _resample_2b(s.data, freq)

np.testing.assert_allclose(out, out2)

In [169]:
%timeit -n 1 -r 1 out2 = _resample_2b(s.data, freq)

11.4 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [141]:
out3 = np.zeros_like(out)

_resample_3(s.data, freq, out3)

np.testing.assert_allclose(out, out3)

In [142]:
out4 = np.zeros_like(out)

_resample_4(s.data, freq, out4)

np.testing.assert_allclose(out, out4)

In [143]:
out4b = np.zeros_like(out)

_resample_4b(s.data, freq, out4b)

np.testing.assert_allclose(out, out4b)

In [424]:
for func in [_resample_2b, _resample_2b2, _resample_2c, _resample_2d]:
    out2 = func(s.data, freq)
    np.testing.assert_allclose(out, out2)
    
    %timeit -n 5 -r 3 func(s.data, freq)

# out2 = _resample_2b(s.data, freq)
# np.testing.assert_allclose(out, out2)

# out2 = _resample_2b2(s.data, freq)
# np.testing.assert_allclose(out, out2)

# out2 = _resample_2c(s.data, freq)
# np.testing.assert_allclose(out, out2)

# out2 = _resample_2d(s.data, freq)
# np.testing.assert_allclose(out, out2)



13.5 ms ± 1.03 ms per loop (mean ± std. dev. of 3 runs, 5 loops each)
14.3 ms ± 1.84 ms per loop (mean ± std. dev. of 3 runs, 5 loops each)
21.9 ms ± 827 µs per loop (mean ± std. dev. of 3 runs, 5 loops each)
21.4 ms ± 628 µs per loop (mean ± std. dev. of 3 runs, 5 loops each)


In [405]:
%%timeit -n 1 -r 1
_resample_2(s.data, freq)  # , out2)

282 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [231]:
%%timeit -n 1 -r 1
_resample_3(s.data, freq, out3)

126 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [232]:
%%timeit -n 1 -r 1
_resample_4(s.data, freq, out3)

28.3 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [233]:
%%timeit -n 5 -r 1
_resample_4b(s.data, freq, out3)

22.7 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 5 loops each)


In [234]:
%%timeit -n 5 -r 1
_resample_2b(s.data, freq)

15 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 5 loops each)


In [235]:
%%timeit -n 5 -r 1
_resample_2c(s.data, freq)

23.7 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 5 loops each)


In [236]:
%%timeit -n 5 -r 1
_resample_2d(s.data, freq)

22.8 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 5 loops each)


In [23]:
d = np.abs(out - out2)
w = np.where(d > 1)

In [24]:
w

(array([   62,    66,   420, ..., 29961, 29994, 29997]),
 array([0, 0, 0, ..., 2, 2, 2]),
 array([0, 0, 0, ..., 0, 0, 0]))

In [596]:
np.testing.assert_allclose(out, out2)

In [567]:
%%timeit -n 1 -r 1
_resample_2(s.data, freq, out2)

451 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [564]:
np.testing.assert_allclose(out2, out)

In [534]:
np.testing.assert_allclose(out, s2.data)

In [540]:
%%timeit -n 1 -r 1
s2 = g.resample(index)

1.14 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [542]:
%%timeit -n 1 -r 1
g.resample_and_reduce(freq, nthread=100)

328 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [503]:
np.testing.assert_allclose(g.resample_and_reduce(freq, nthread=100).data, s2.data)

In [None]:
%timeit -n 1 -r 1 out = acc.resample_data(s.data, freq)#, nproc=10)

In [467]:
%timeit -n 1 -r 1 out = acc.resample_data(s.data, freq, nthread=200)

773 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [463]:
%timeit -n 1 -r 1 out = acc.resample_data(s.data, freq, nproc=-1)

717 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


Process ForkPoolWorker-408:
Process ForkPoolWorker-409:
Process ForkPoolWorker-405:
Process ForkPoolWorker-414:
Process ForkPoolWorker-413:
Process ForkPoolWorker-406:
Process ForkPoolWorker-407:
Process ForkPoolWorker-404:
Process ForkPoolWorker-403:
Process ForkPoolWorker-411:
Process ForkPoolWorker-410:
Process ForkPoolWorker-412:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/wpk/.conda/envs/py37/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/wpk/.conda/envs/py37/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/wpk/.conda/envs/py37/lib/python3.7/multiprocessing/process.py",

In [459]:
out = acc.resample_data(s.data, freq, nproc=-1)

Process ForkPoolWorker-398:
Process ForkPoolWorker-391:
Process ForkPoolWorker-393:
Process ForkPoolWorker-397:
Process ForkPoolWorker-402:
Process ForkPoolWorker-396:
Process ForkPoolWorker-395:
Process ForkPoolWorker-392:
Process ForkPoolWorker-394:
Process ForkPoolWorker-399:
Process ForkPoolWorker-400:
Process ForkPoolWorker-401:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/wpk/.conda/envs/py37/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/wpk/.conda/envs/py37/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/wpk/.conda/envs/py37/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/wpk/.conda/envs/py37/lib/python3.7/multiprocessing

In [476]:
np.testing.assert_allclose(out, s2.data)

In [500]:
%%timeit -n 1 -r 1
# g = s.to_array()
s2 = g.resample(index)

1.04 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [501]:
%%timeit -n 1 -r 1
out = resample_data(s.data, freq)

447 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [356]:
%%timeit -n 1 -r 1
out = resample_data_thread(s.data, freq, 10)

985 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [357]:
%%timeit -n 1 -r 1
out = resample_data_mult(s.data, freq, -1)

704 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


Process ForkPoolWorker-346:
Process ForkPoolWorker-342:
Process ForkPoolWorker-350:
Process ForkPoolWorker-344:
Process ForkPoolWorker-343:
Process ForkPoolWorker-341:
Process ForkPoolWorker-345:
Process ForkPoolWorker-349:
Process ForkPoolWorker-348:
Process ForkPoolWorker-339:
Process ForkPoolWorker-340:
Process ForkPoolWorker-347:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/wpk/.conda/envs/py37/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/wpk/.conda/envs/py37/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/wpk/.conda/envs/py37/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call la

In [228]:
%%timeit -n 3 -r 1
resample_data(s.data, freqTT)

3 (100, 10, 3) (100, 10, 3)
3 (100, 10, 3) (100, 10, 3)
3 (100, 10, 3) (100, 10, 3)
114 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)


In [341]:
g = s.to_array()
s2 = g.resample(index)

out = resample_data(s.data, freq)

np.testing.assert_allclose(out, s2.data)

In [423]:
import threading


def resample_data_thread(data, freq, nthread):
    nrep = freq.shape[1]
    out = np.zeros((nrep,) + data.shape[1:], dtype=data.dtype)

    chunk_size = (nrep + nthread - 1) // nthread

    # args_list = [(data,_freq) for _freq in np.split(freq, nthread, axis=1)]
    args_list = [
        (
            data,
            freq[:, chunk_size * i : chunk_size * (i + 1)],
            out[chunk_size * i : chunk_size * (i + 1), ...],
        )
        for i in range(nthread)
    ]

    threads = [threading.Thread(target=resample_data, args=args) for args in args_list]

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    return out

In [188]:
%%timeit -n 1 -r 1
out_thread = resample_data_thread(s.data, freq, nthread=400)

2.19 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [179]:
import multiprocessing

# multiprocessing
nthread = 8

nr = 1000
fr = freq[:, :nr]


def resample_data_mult(data, freq, nthread):
    if nthread < 0:
        nthread = multiprocessing.cpu_count()

    nrep = freq.shape[1]
    chunk_size = (nrep + nthread - 1) // nthread

    # args_list = [(data,_freq) for _freq in np.split(freq, nthread, axis=1)]
    args_list = [
        (data, freq[:, chunk_size * i : chunk_size * (i + 1)]) for i in range(nthread)
    ]

    pool = multiprocessing.Pool(processes=nthread)
    pools = [pool.apply_async(resample_data, args=args) for args in args_list]

    outputs = [p.get() for p in pools]
    return np.concatenate(outputs, axis=0)

In [190]:
%%timeit -n 1 -r 1
out_mult = resample_data_mult(s.data, freq, nthread=-1)

1.52 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


Process ForkPoolWorker-249:
Process ForkPoolWorker-251:
Process ForkPoolWorker-245:
Process ForkPoolWorker-243:
Process ForkPoolWorker-244:
Process ForkPoolWorker-246:
Process ForkPoolWorker-247:
Process ForkPoolWorker-248:
Process ForkPoolWorker-252:
Process ForkPoolWorker-250:
Process ForkPoolWorker-241:
Process ForkPoolWorker-242:
Traceback (most recent call last):
  File "/Users/wpk/.conda/envs/py37/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/wpk/.conda/envs/py37/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/wpk/.conda/envs/py37/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call la

In [107]:
out = np.zeros((nr,) + s.data.shape[1:], dtype=s.data.dtype)

In [108]:
nthread = 2

In [109]:
chunk_size = (nr + nthread - 1) // nthread

In [110]:
args_list = [
    (
        s.data,
        fr[:, chunk_size * i : chunk_size * (i + 1)],
        out[chunk_size * i : chunk_size * (i + 1), ...],
    )
    for i in range(nthread)
]

In [111]:
args_list[1][2].shape

(525, 10, 3)

In [112]:
threads = [threading.Thread(target=resample_data, args=args) for args in args_list]

In [113]:
for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

In [115]:
out - resample_data(data, fr)

array([[[0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.],
        ...,
        [0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.]],

       [[0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.],
        ...,
        [0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.]],

       [[0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.],
        ...,
        [0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.]],

       ...,

       [[0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.],
        ...,
        [0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.]],

       [[0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.],
        ...,
        [0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.]],

       [[0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.],
        ...,
        [0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.]]])

In [21]:
def resample_data(data, freq, out=None):
    """
    reduce data along axis=0 from freq table
    """
    data_shape = data.shape
    assert data_shape[-1] == 3

    ndim = data.ndim
    assert ndim > 1

    assert data_shape[0] == freq.shape[0]
    nrep = freq.shape[-1]

    out_shape = (nrep,) + data_shape[1:]

    if out is not None:
        assert out.shape == out_shape
    else:
        out = np.zeros(out_shape)

    if ndim == 2:
        datar_shape = (data_shape[0], 1, data_shape[-1])
    else:
        datar_shape = (
            data_shape[0],
            np.prod(data_shape[1:-1], dtype=np.int),
            data_shape[-1],
        )

    outr_shape = (nrep,) + datar_shape[1:]

    # print(ndim, data.shape, datar_shape)

    datar = data.reshape(datar_shape)
    outr = out.reshape(outr_shape)

    _resample(datar, freq, outr)

    return out


@njit
def _resample(data, freq, out):
    ndata = data.shape[0]
    nmeta = data.shape[1]

    ndat = freq.shape[0]
    nrep = freq.shape[1]
    assert ndat == ndata

    for idat in range(ndat):
        for imeta in range(nmeta):
            w = data[idat, imeta, 0]
            a = data[idat, imeta, 1]
            v = data[idat, imeta, 2]
            for irep in range(nrep):
                f = freq[idat, irep]
                if f > 0:
                    acc._push_stat(out[irep, imeta], f * w, a, v)