In [5]:
import numa
from IPython.parallel import Client
client = Client()
len(client)

30

In [6]:
!echo 3 | sudo tee /proc/sys/vm/drop_caches

3


In [7]:
for node_id in range(numa.get_max_node() + 1):
    print(numa.get_node_size(node_id))

(127552450560L, 131071533056L)
(127627759616L, 131072000000L)


In [8]:
import numpy as np
from os.path import exists, join

filename = 'a.mmap'
mode = 'r+' if exists(filename) else 'w+'
a = np.memmap(filename, shape=(int(8e6), 784), dtype=np.float32, mode=mode)

n_jobs = 60

a.nbytes

25088000000

In [9]:
def do_stuff(params):
    seed, filename, n_iter, numa_aware = params
    import numpy as np
    import os
    n_samples, n_features = int(8e6), 784
    rng = np.random.RandomState(seed)
    
    if numa_aware:
        # Create hard link for the current process numa node
        import numa
        effective_filename = "%s_%d" % (filename, numa.get_preferred())
        if not os.path.exists(effective_filename):
            os.link(filename, effective_filename)
    else:
        effective_filename = filename
    data = np.memmap(effective_filename, shape=(n_samples, n_features),
                     dtype=np.float32, mode='r')
    # Trigger one sequential scan of the whole readonly data
    data.max()
    
    # Trigger n_iter random access to chunks of data
    for i in range(n_iter):
        idx = rng.random_integers(low=0, high=n_samples - 1, size=1000)
        np.mean(data[idx])
        
    return effective_filename

workers = client.load_balanced_view()

In [10]:
%%px

import numa
print(numa.get_preferred(), numa.get_affinity(0))

[stdout:0] (0, set([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]))
[stdout:1] (0, set([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]))
[stdout:2] (0, set([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]))
[stdout:3] (0, set([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]))
[stdout:4] (0, set([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]))
[stdout:5] (0, set([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]))
[stdout:6] (0, set([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]))
[stdout:7] (0, set([

In [11]:
%time workers.map(do_stuff, [(i, filename, 50, False) for i in range(n_jobs)]).get()

CPU times: user 6.56 s, sys: 592 ms, total: 7.15 s
Wall time: 1min 11s


['a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap']

In [12]:
def assign_cpu_numa_node(engine_id, fix_cpu=False):
    import numa
    n_nodes = numa.get_max_node() + 1
    local_engine_id, node_id = divmod(engine_id, n_nodes)
    # Assing current process to a fixed numa node
    numa.set_preferred(node_id)
    cpu_ids = list(sorted(numa.node_to_cpus(node_id)))
    if fix_cpu:
        # Fix engine to one specific CPU that is bound with node_id
        cpu_id = cpu_ids[local_engine_id % len(cpu_ids)]
        numa.set_affinity(0, {cpu_id})
    else:
        # Set affinity of current process to any of the CPUs bound
        # with node_id
        numa.set_affinity(0, cpu_ids)

for engine_id in client.ids:
    client[engine_id].apply(assign_cpu_numa_node, engine_id, fix_cpu=False)

In [13]:
%%px

import numa
print(numa.get_preferred(), numa.get_affinity(0))

[stdout:0] (0, set([0, 1, 2, 3, 4, 5, 6, 7, 16, 17, 18, 19, 20, 21, 22, 23]))
[stdout:1] (1, set([8, 9, 10, 11, 12, 13, 14, 15, 24, 25, 26, 27, 28, 29, 30, 31]))
[stdout:2] (0, set([0, 1, 2, 3, 4, 5, 6, 7, 16, 17, 18, 19, 20, 21, 22, 23]))
[stdout:3] (1, set([8, 9, 10, 11, 12, 13, 14, 15, 24, 25, 26, 27, 28, 29, 30, 31]))
[stdout:4] (0, set([0, 1, 2, 3, 4, 5, 6, 7, 16, 17, 18, 19, 20, 21, 22, 23]))
[stdout:5] (1, set([8, 9, 10, 11, 12, 13, 14, 15, 24, 25, 26, 27, 28, 29, 30, 31]))
[stdout:6] (0, set([0, 1, 2, 3, 4, 5, 6, 7, 16, 17, 18, 19, 20, 21, 22, 23]))
[stdout:7] (1, set([8, 9, 10, 11, 12, 13, 14, 15, 24, 25, 26, 27, 28, 29, 30, 31]))
[stdout:8] (0, set([0, 1, 2, 3, 4, 5, 6, 7, 16, 17, 18, 19, 20, 21, 22, 23]))
[stdout:9] (1, set([8, 9, 10, 11, 12, 13, 14, 15, 24, 25, 26, 27, 28, 29, 30, 31]))
[stdout:10] (0, set([0, 1, 2, 3, 4, 5, 6, 7, 16, 17, 18, 19, 20, 21, 22, 23]))
[stdout:11] (1, set([8, 9, 10, 11, 12, 13, 14, 15, 24, 25, 26, 27, 28, 29, 30, 31]))
[stdout:12] (0, set([0, 1,

In [14]:
!echo 3 | sudo tee /proc/sys/vm/drop_caches

3


In [15]:
%time workers.map(do_stuff, [(i, filename, 50, True) for i in range(n_jobs)]).get()

CPU times: user 8.42 s, sys: 508 ms, total: 8.93 s
Wall time: 1min 24s


['a.mmap_0',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1']

In [16]:
for engine_id in client.ids:
    client[engine_id].apply(assign_cpu_numa_node, engine_id, fix_cpu=True)

In [17]:
%%px

import numa
print(numa.get_preferred(), numa.get_affinity(0))

[stdout:0] (0, set([0]))
[stdout:1] (1, set([8]))
[stdout:2] (0, set([1]))
[stdout:3] (1, set([9]))
[stdout:4] (0, set([2]))
[stdout:5] (1, set([10]))
[stdout:6] (0, set([3]))
[stdout:7] (1, set([11]))
[stdout:8] (0, set([4]))
[stdout:9] (1, set([12]))
[stdout:10] (0, set([5]))
[stdout:11] (1, set([13]))
[stdout:12] (0, set([6]))
[stdout:13] (1, set([14]))
[stdout:14] (0, set([7]))
[stdout:15] (1, set([15]))
[stdout:16] (0, set([16]))
[stdout:17] (1, set([24]))
[stdout:18] (0, set([17]))
[stdout:19] (1, set([25]))
[stdout:20] (0, set([18]))
[stdout:21] (1, set([26]))
[stdout:22] (0, set([19]))
[stdout:23] (1, set([27]))
[stdout:24] (0, set([20]))
[stdout:25] (1, set([28]))
[stdout:26] (0, set([21]))
[stdout:27] (1, set([29]))
[stdout:28] (0, set([22]))
[stdout:29] (1, set([30]))


In [18]:
!echo 3 | sudo tee /proc/sys/vm/drop_caches

3


In [19]:
%time workers.map(do_stuff, [(i, filename, 50, True) for i in range(n_jobs)]).get()

CPU times: user 5.54 s, sys: 520 ms, total: 6.06 s
Wall time: 1min 11s


['a.mmap_0',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_0',
 'a.mmap_1',
 'a.mmap_0',
 'a.mmap_1']

In [20]:
!echo 3 | sudo tee /proc/sys/vm/drop_caches

3


In [21]:
%time workers.map(do_stuff, [(i, filename, 50, False) for i in range(n_jobs)]).get()

CPU times: user 7.08 s, sys: 480 ms, total: 7.56 s
Wall time: 1min 7s


['a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap',
 'a.mmap']

Conclusion apparently none of the numa aware modifications seem to have any impact on the total runtime while by having a look a top during the computation one can see that in all cases, a significant part of the CPU time of the engines is spent in "sys" instead of "user".