Skip to content

Commit bd4b04a

Browse files
committed
ansible: new multiplexer/workers configuration
Following on from 152effc, * Pin mux to CPU 0 * Pin top-level CPU 1 * Pin workers sequentially to CPU 2..n Nets 19.5% improvement on issue_140__thread_pileup.yml when targetting 64 Docker containers on the same 8 core/16 thread machine. Before (prior to last scheme, no affinity at all): 2294528.731458 task-clock (msec) # 6.443 CPUs utilized 10,429,745 context-switches # 0.005 M/sec 2,049,618 cpu-migrations # 0.893 K/sec 8,258,952 page-faults # 0.004 M/sec 5,532,719,253,824 cycles # 2.411 GHz (83.35%) 3,267,471,616,230 instructions # 0.59 insn per cycle # 1.22 stalled cycles per insn (83.35%) 662,006,455,943 branches # 288.515 M/sec (83.33%) 39,453,895,977 branch-misses # 5.96% of all branches (83.37%) 356.148064576 seconds time elapsed After: 2226463.958975 task-clock (msec) # 7.784 CPUs utilized 9,831,466 context-switches # 0.004 M/sec 180,065 cpu-migrations # 0.081 K/sec 5,082,278 page-faults # 0.002 M/sec 5,592,548,587,259 cycles # 2.512 GHz (83.35%) 3,135,038,855,414 instructions # 0.56 insn per cycle # 1.32 stalled cycles per insn (83.32%) 636,397,509,232 branches # 285.833 M/sec (83.30%) 39,135,441,790 branch-misses # 6.15% of all branches (83.35%) 286.036681644 seconds time elapsed
1 parent 3d0bdc8 commit bd4b04a

File tree

5 files changed

+138
-54
lines changed

5 files changed

+138
-54
lines changed

ansible_mitogen/affinity.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
# Copyright 2017, David Wilson
2+
#
3+
# Redistribution and use in source and binary forms, with or without
4+
# modification, are permitted provided that the following conditions are met:
5+
#
6+
# 1. Redistributions of source code must retain the above copyright notice,
7+
# this list of conditions and the following disclaimer.
8+
#
9+
# 2. Redistributions in binary form must reproduce the above copyright notice,
10+
# this list of conditions and the following disclaimer in the documentation
11+
# and/or other materials provided with the distribution.
12+
#
13+
# 3. Neither the name of the copyright holder nor the names of its contributors
14+
# may be used to endorse or promote products derived from this software without
15+
# specific prior written permission.
16+
#
17+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20+
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21+
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22+
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23+
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24+
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25+
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26+
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27+
# POSSIBILITY OF SUCH DAMAGE.
28+
29+
import ctypes
30+
import mmap
31+
import multiprocessing
32+
import os
33+
import struct
34+
35+
import mitogen.parent
36+
37+
38+
try:
39+
_libc = ctypes.CDLL(None, use_errno=True)
40+
_strerror = _libc.strerror
41+
_strerror.restype = ctypes.c_char_p
42+
_pthread_mutex_init = _libc.pthread_mutex_init
43+
_pthread_mutex_lock = _libc.pthread_mutex_lock
44+
_pthread_mutex_unlock = _libc.pthread_mutex_unlock
45+
_sched_setaffinity = _libc.sched_setaffinity
46+
except (OSError, AttributeError):
47+
_libc = None
48+
49+
50+
class pthread_mutex_t(ctypes.Structure):
51+
_fields_ = [
52+
('data', ctypes.c_uint8 * 512),
53+
]
54+
55+
def init(self):
56+
if _pthread_mutex_init(self.data, 0):
57+
raise Exception(_strerror(ctypes.get_errno()))
58+
59+
def acquire(self):
60+
if _pthread_mutex_lock(self.data):
61+
raise Exception(_strerror(ctypes.get_errno()))
62+
63+
def release(self):
64+
if _pthread_mutex_unlock(self.data):
65+
raise Exception(_strerror(ctypes.get_errno()))
66+
67+
68+
class State(ctypes.Structure):
69+
_fields_ = [
70+
('lock', pthread_mutex_t),
71+
('counter', ctypes.c_uint8),
72+
]
73+
74+
75+
class Manager(object):
76+
"""
77+
Bind this process to a randomly selected CPU. If done prior to starting
78+
threads, all threads will be bound to the same CPU. This call is a no-op on
79+
systems other than Linux.
80+
81+
A hook is installed that causes `reset_affinity(clear=True)` to run in the
82+
child of any process created with :func:`mitogen.parent.detach_popen`,
83+
ensuring CPU-intensive children like SSH are not forced to share the same
84+
core as the (otherwise potentially very busy) parent.
85+
86+
Threads bound to the same CPU share cache and experience the lowest
87+
possible inter-thread roundtrip latency, for example ensuring the minimum
88+
possible time required for :class:`mitogen.service.Pool` to interact with
89+
:class:`mitogen.core.Broker`, as required for every message transmitted or
90+
received.
91+
92+
Binding threads of a Python process to one CPU makes sense, as they are
93+
otherwise unable to operate in parallel, and all must acquire the same lock
94+
prior to executing.
95+
"""
96+
def __init__(self):
97+
self.mem = mmap.mmap(-1, 4096)
98+
self.state = State.from_buffer(self.mem)
99+
self.state.lock.init()
100+
101+
def _set_affinity(self, mask):
102+
mitogen.parent._preexec_hook = self.clear
103+
s = struct.pack('L', mask)
104+
_sched_setaffinity(os.getpid(), len(s), s)
105+
106+
def cpu_count(self):
107+
return multiprocessing.cpu_count()
108+
109+
def clear(self):
110+
"""
111+
Clear any prior binding, except for reserved CPUs.
112+
"""
113+
self._set_affinity(0xffffffff & ~3)
114+
115+
def set_cpu(self, cpu):
116+
"""
117+
Bind to 0-based `cpu`.
118+
"""
119+
self._set_affinity(1 << cpu)
120+
121+
def assign(self):
122+
self.state.lock.acquire()
123+
try:
124+
n = self.state.counter
125+
self.state.counter += 1
126+
finally:
127+
self.state.lock.release()
128+
129+
self.set_cpu(2 + (n % (self.cpu_count() - 2)))
130+
131+
132+
manager = Manager()

ansible_mitogen/process.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import ansible_mitogen.services
5757

5858
from mitogen.core import b
59+
import ansible_mitogen.affinity
5960

6061

6162
LOG = logging.getLogger(__name__)
@@ -172,11 +173,12 @@ def start(cls, _init_logging=True):
172173
if _init_logging:
173174
ansible_mitogen.logging.setup()
174175
if cls.child_pid:
176+
ansible_mitogen.affinity.manager.set_cpu(1)
175177
cls.child_sock.close()
176178
cls.child_sock = None
177179
mitogen.core.io_op(cls.worker_sock.recv, 1)
178180
else:
179-
mitogen.utils.reset_affinity()
181+
ansible_mitogen.affinity.manager.set_cpu(0)
180182
cls.worker_sock.close()
181183
cls.worker_sock = None
182184
self = cls()

ansible_mitogen/strategy.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import threading
3232

3333
import mitogen.core
34+
import ansible_mitogen.affinity
3435
import ansible_mitogen.loaders
3536
import ansible_mitogen.mixins
3637
import ansible_mitogen.process
@@ -105,6 +106,7 @@ def wrap_worker__run(*args, **kwargs):
105106
import signal
106107
signal.signal(signal.SIGTERM, signal.SIG_IGN)
107108

109+
ansible_mitogen.affinity.manager.assign()
108110
return mitogen.core._profile_hook('WorkerProcess',
109111
lambda: worker__run(*args, **kwargs)
110112
)

mitogen/utils.py

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,9 @@
2828

2929
import datetime
3030
import logging
31-
import multiprocessing
3231
import os
33-
import random
34-
import struct
3532
import sys
3633

37-
try:
38-
import ctypes
39-
except ImportError:
40-
ctypes = None
41-
4234
import mitogen
4335
import mitogen.core
4436
import mitogen.master
@@ -53,50 +45,6 @@
5345
else:
5446
iteritems = dict.iteritems
5547

56-
if ctypes:
57-
try:
58-
_libc = ctypes.CDLL(None)
59-
_sched_setaffinity = _libc.sched_setaffinity
60-
except (OSError, AttributeError):
61-
_sched_setaffinity = None
62-
63-
64-
def reset_affinity(clear=False):
65-
"""
66-
Bind this process to a randomly selected CPU. If done prior to starting
67-
threads, all threads will be bound to the same CPU. This call is a no-op on
68-
systems other than Linux.
69-
70-
:param bool clear:
71-
If :data:`True`, clear any prior binding.
72-
73-
A hook is installed that causes `reset_affinity(clear=True)` to run in the
74-
child of any process created with :func:`mitogen.parent.detach_popen`,
75-
ensuring CPU-intensive children like SSH are not forced to share the same
76-
core as the (otherwise potentially very busy) parent.
77-
78-
Threads bound to the same CPU share cache and experience the lowest
79-
possible inter-thread roundtrip latency, for example ensuring the minimum
80-
possible time required for :class:`mitogen.service.Pool` to interact with
81-
:class:`mitogen.core.Broker`, as required for every message transmitted or
82-
received.
83-
84-
Binding threads of a Python process to one CPU makes sense, as they are
85-
otherwise unable to operate in parallel, and all must acquire the same lock
86-
prior to executing.
87-
"""
88-
if _sched_setaffinity is None:
89-
return
90-
91-
if clear:
92-
mask = 0xffffffff
93-
else:
94-
mask = 1 << random.randint(0, multiprocessing.cpu_count() - 1)
95-
96-
s = struct.pack('L', mask)
97-
_sched_setaffinity(os.getpid(), len(s), s)
98-
mitogen.parent._preexec_hook = lambda: reset_affinity(clear=True)
99-
10048

10149
def setup_gil():
10250
"""

run_tests

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ fi
4242
--pattern '*_test.py' \
4343
"$@"
4444
else
45-
coverage run -a "${UNIT2}" discover \
45+
"${UNIT2}" discover \
4646
--start-directory "tests/ansible" \
4747
--pattern '*_test.py' \
4848
"$@"

0 commit comments

Comments
 (0)