Skip to content

Commit

Permalink
wrapper: add network throttling
Browse files Browse the repository at this point in the history
Add network throttling using cgroups and tc.

This is choice of tools was not the best one and it has quite important
caveats:

 1) Not easy to (re-)configure when there is already some trafic control
    setup in place.

 2) We don't know in advance which interface should be throttled, that
    means we need to apply limit to all interfaces. We do this only in
    limited way because setting it up (and cleaning!) it properly is
    difficult (especially in connection with #1).

 3) Most importantly, we can limit only output traffic! That is too bad
    as we are more interested in limiting the input traffic.

Signed-off-by: Tomáš Golembiovský <tgolembi@redhat.com>
  • Loading branch information
nyoxi committed Mar 24, 2019
1 parent 03011c1 commit 19dc243
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 7 deletions.
24 changes: 21 additions & 3 deletions docs/Virt-v2v-wrapper.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,25 @@ Right before the wrapper terminates it updates the state with:
## Conversion throttling (rate limiting)

It is possible to throttle resources used by the conversion. At the moment one
can limit only CPU.
can limit CPU and network bandwidth. Before wrapper detaches to background

Example of throttling file content:

```
{
"cpu": "50%",
"network": "1048576"
}
```

This will assign half of single CPU to the process.
This will assign half of single CPU to the process and limit network bandwidth
to 1 MB/s.

Limits read from the throttling file are added to those already in place. That
means one does not strictly need to include all the possible limits in the
file. Although it is suggested to do so. Otherwise some information can be lost
if multiple changes occur to the throttling file before the wrapper manages to
read them.

To remove a limit one should assign value `unlimited`.

Expand All @@ -251,4 +259,14 @@ From systemd.resource-control(5):
hierarchy and "cpu.cfs_quota_us" on legacy.

For example, to assign half of one CPU use "50%" or to assign two CPUs use
"200%".
"200%". To remove any limit on CPU one can pass the string "unlimited".


### Network Bandwidth

Due to design of cgroup filter on tc the network is limited on output only.
(Input is limited only indirectly and is unreliable.)

The limit is specified in bytes per seconds without any units. E.g. "12345".
Note that despite being a number it should be passed as string in JSON. To
remove any limit one can pass the string "unlimited".
2 changes: 2 additions & 0 deletions v2v-conversion-host.spec.in
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ Ansible role to setup hosts as conversion host for ManageIQ

%package wrapper
Summary: Daemonizing wrapper for virt-v2v
Requires: libcgroup-tools
Requires: python
BuildArch: noarch

%description wrapper
Expand Down
252 changes: 248 additions & 4 deletions wrapper/virt-v2v-wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# limitations under the License.
#

import atexit
from contextlib import contextmanager
import copy
import errno
Expand Down Expand Up @@ -781,6 +782,7 @@ def __init__(self):
'failed': False,
'throttling': {
'cpu': None,
'network': None,
}
}
self._filename = None
Expand Down Expand Up @@ -814,6 +816,23 @@ def __getattr__(self, name):


# }}}

def atexit_command(cmd):
"""
Run command ignoring any errors. This is supposed to be used with atexit.
"""
def remove(cmd):
try:
logging.info('Running command at exit: %r', cmd)
subprocess.check_call(cmd)
except subprocess.CalledProcessError as e:
logging.warning(
'Ignoring failed command at exit,'
'returncode=%d, output=\n%s\n',
e.returncode, e.output)
atexit.register(lambda: remove(cmd))


def hard_error(msg):
"""
Function to produce an error and terminate the wrapper.
Expand Down Expand Up @@ -1142,6 +1161,12 @@ def run(self):

# }}}
class SystemdRunner(BaseRunner): # {{{

def __init__(self, host, arguments, environment, log):
super(SystemdRunner, self).__init__(host, arguments, environment, log)
self._service_name = None
self._tc = None

def is_running(self):
try:
subprocess.check_call([
Expand All @@ -1164,6 +1189,7 @@ def kill(self):
error('Failed to kill virt-v2v unit', exception=True)

def run(self):
net_cls_dir = self._prepare_net_cls()
unit = [
'systemd-run',
'--description=virt-v2v conversion',
Expand All @@ -1173,9 +1199,11 @@ def run(self):
for k, v in six.iteritems(self._environment):
unit.append('--setenv=%s=%s' % (k, v))
unit.extend([
'cgexec', '-g', 'net_cls:%s' % net_cls_dir,
'/bin/sh', '-c',
'exec "%s" "$@" > "%s" 2>&1' % (VIRT_V2V, self._log),
VIRT_V2V]) # First argument is command name
logging.info('systemd-run invocation: %r', unit)
unit.extend(self._arguments)

proc = subprocess.Popen(
Expand Down Expand Up @@ -1248,6 +1276,15 @@ def systemd_set_property(self, property_name, value):
property_name)
return False

def set_network_limit(self, limit):
if self._tc is None:
return False
return self._tc.set_limit(limit)

def _prepare_net_cls(self):
self._tc = TcController(self._host.get_tag())
return self._tc.cgroup

def _systemd_return_code(self):
""" Return code after the unit exited """
code = self._systemd_property('ExecMainStatus')
Expand All @@ -1262,6 +1299,187 @@ def _systemd_return_code(self):


# }}}
class TcController(object):
"""
Handles communication with tc (traffic control) and associated net_cls
cgroup.
"""

# TC store rates as a 32-bit unsigned integer in bps internally
MAX_RATE = 0xffffffff

@staticmethod
def class_id_to_hex(class_id):
"""
Convert class ID in the form <major>:<minor> into hex string where
upper 16b are for major and lower 16b are for minor number.
e.g.: '1a:2b' -> '0x001a002b'
"""
parts = class_id.split(':')
major = int(parts[0], base=16)
minor = int(parts[1], base=16)
return '0x{:04x}{:04x}'.format(major, minor)

def __init__(self, tag):
self._cgroup = 'v2v-conversion/%s' % tag
self._class_id = None
self._interfaces = []
self._prepare()

@property
def class_id(self):
return self._class_id

@property
def cgroup(self):
return self._cgroup

def set_limit(self, limit):
if limit is None or limit == 'unlimited':
limit = TcController.MAX_RATE
ret = True
for iface in self._interfaces:
if self._run_tc([
'class', 'change', 'dev', iface,
'classid', self._class_id, 'htb',
'rate', '{}bps'.format(limit),
]) is None:
ret = False
return ret

def _prepare(self):
logging.info('Preparing tc')
root_handle = self._create_qdiscs()
if root_handle is None:
return
for iface in self._interfaces[:]:
if not self._create_filter(root_handle, iface) or \
not self._create_class(root_handle, iface):
self._interfaces.remove(iface)
self._prepare_cgroup()

def _prepare_cgroup(self):
logging.info('Preparing net_cls cgroup %s', self._cgroup)
# Create cgroup -- we do this even when tc is not properly set
# otherwise cgexec would fail
cgroup_dir = '/sys/fs/cgroup/net_cls/%s' % self._cgroup
atexit_command(['/usr/bin/rmdir', '-p', cgroup_dir])
os.makedirs(cgroup_dir)
# Store class ID
if self._class_id is not None:
with open(os.path.join(cgroup_dir, 'net_cls.classid'), 'w') as f:
f.write(TcController.class_id_to_hex(self._class_id))
else:
logging.info(
'Not assigning class ID to net_cls cgroup'
' because of previous errors')

def _create_qdiscs(self):
qdiscs = self._run_tc(['qdisc', 'show'])
if qdiscs is None:
logging.error('Failed to query existing qdiscs')
return None
logging.debug('Found following qdiscs: %r', qdiscs)

root_handle = 'abc:'
ifaces = []
roots = None
try:
# (interface, type, root handle)
roots = [(qdisc[4], qdisc[1], qdisc[2])
for qdisc in qdiscs if qdisc[5] == 'root']
except IndexError:
logging.exception('Failed to process tc output')
logging.error('%r', qdiscs)
return None
logging.debug('Found following root qdiscs: %r', roots)
#
# Here we go through all interfaces and try to set our root handle.
# For interfaces that already have some configuration this will likely
# fail, we ignore those (but we give it a try first).
#
for qdisc in roots:
if qdisc[1] == 'htb' and qdisc[2] == root_handle:
# Already ours
ifaces.append(qdisc[0])
continue
# Try to change the qdisc type
if self._run_tc([
'qdisc', 'add', 'dev', qdisc[0],
'root', 'handle', root_handle, 'htb'
]) is None:
logging.info('Failed to setup HTB qdisc on %s', qdisc[0])
else:
ifaces.append(qdisc[0])
self._interfaces = ifaces
return root_handle

def _create_class(self, handle, iface):
# If there is no class ID assigned yet, try to find first free
if self._class_id is None:
# First list existing classes
classes = self._run_tc([
'class', 'show', 'dev', iface, 'parent', handle])
if classes is None:
logging.error(
'Failed to query existing classes for parent %s on %s',
handle, iface)
return False
logging.debug('Found existing tc classes: %r', classes)
# Gather IDs and find first free
ids = [class_[2] for class_ in classes]
new_id = None
logging.debug('Existing class IDs on %s: %r', iface, ids)
for i in xrange(1, 0x10000):
test_id = '{}{:x}'.format(handle, i)
if test_id not in ids:
new_id = test_id
break
if new_id is None:
logging.error(
'Could not find any free class ID on %s under %s',
iface, handle)
return False
else:
# We already chose ID before
new_id = self._class_id
# Create new class
logging.info('Creating new tc class on %s with class ID: %s',
iface, new_id)
if self._run_tc([
'class', 'add', 'dev', iface,
'parent', handle, 'classid', new_id,
'htb', 'rate', '{}bps'.format(TcController.MAX_RATE),
]) is None:
logging.error('Failed to create tc class')
return False
atexit_command(['tc', 'class', 'del', 'dev', iface, 'classid', new_id])
self._class_id = new_id
return True

def _create_filter(self, handle, iface):
# It is OK if same filter already exists. However, if there is already
# a different filter we're in trouble.
return self._run_tc([
'filter', 'add', 'dev', iface, 'parent', handle,
'protocol', 'ip', 'prio', '10', 'handle', '1:', 'cgroup'
]) is not None

def _run_tc(self, args):
try:
output = subprocess.check_output(['tc'] + args)
except subprocess.CalledProcessError as e:
logging.exception(
'tc command failed; return code %d, output:\n%s\n',
e.returncode, e.output)
return None
# Split into words by line
output = output.splitlines()
output = list(map(str.split, output))
return output


@contextmanager
def log_parser(v2v_log):
parser = None
Expand Down Expand Up @@ -1334,6 +1552,7 @@ def throttling_update(runner, initial=None):
# Remove file when finished to prevent spamming logs with repeated
# messages
os.remove(state['internal']['throttling_file'])
logging.info('Fetched updated throttling info from file')
except IOError as e:
if e.errno != errno.ENOENT:
error('Failed to read throttling file', exception=True)
Expand Down Expand Up @@ -1364,16 +1583,41 @@ def throttling_update(runner, initial=None):
set_val = val
else:
error(
'Failed to parse value for CPU quota',
'Failed to parse value for CPU quota: %s', v)
'Failed to parse value for CPU limit',
'Failed to parse value for CPU limit: %s', v)
continue
if val != state['throttling']['cpu'] and \
runner.systemd_set_property('CPUQuota', set_val):
processed[k] = val
else:
error(
'Failed to set CPU quota',
'Failed to set CPU quota to %s', val)
'Failed to set CPU limit',
'Failed to set CPU limit to %s', val)
elif k == 'network':
if v is None or v == 'unlimited':
# Treat empty value and 'unlimited' in the same way
val = 'unlimited'
set_val = 'unlimited'
else:
m = re.match("([+0-9]+)$", v)
if m is not None:
val = m.group(1)
set_val = val
else:
error(
'Failed to parse value for network limit',
'Failed to parse value for network limit: %s', v)
continue
if val != state['throttling']['network'] and \
runner.set_network_limit(set_val):
logging.debug(
'Changing network throttling to %s (previous: %s)',
val, state['throttling']['network'])
processed[k] = val
else:
error(
'Failed to set network limit',
'Failed to set network limit to %s', val)
else:
logging.debug('Ignoring unknown throttling request: %s', k)
state['throttling'].update(processed)
Expand Down

0 comments on commit 19dc243

Please sign in to comment.