Skip to content

Commit

Permalink
Support cpuset and scheduler for cpuset
Browse files Browse the repository at this point in the history
This patch improves the numa objects, mem_available and mem_total are added.

This patch adds a parameter which named "cpu_policy" for Zun API to tag the mode of container.

This patch adds methods of node resource management for numa.

This patch also introduces a filter for cpuset.

Co-Authored-By: Hongbin Lu <hongbin034@gmail.com>
Change-Id: Ic65b86ceb6f7d37c2b4ef545425e4edd20328282
Implements: blueprint cpuset-container
  • Loading branch information
ZhaiMengdong and hongbin committed Dec 11, 2018
1 parent 3ec5806 commit c9d7639
Show file tree
Hide file tree
Showing 29 changed files with 446 additions and 23 deletions.
3 changes: 3 additions & 0 deletions bindep.txt
Expand Up @@ -9,6 +9,9 @@ graphviz [doc test]
libffi-dev [platform:dpkg]
libffi-devel [platform:rpm]

# Tool to retrieve numa topology of compute host
numactl

# MySQL and PostgreSQL databases since some jobs are set up in
# OpenStack infra that need these like
libmysqlclient-dev [platform:dpkg test]
Expand Down
1 change: 1 addition & 0 deletions devstack/files/debs/zun
@@ -0,0 +1 @@
numactl
1 change: 1 addition & 0 deletions devstack/files/rpms/zun
@@ -0,0 +1 @@
numactl
3 changes: 3 additions & 0 deletions zun/api/controllers/v1/containers.py
Expand Up @@ -375,6 +375,9 @@ def _do_post(self, run=False, **container_dict):

requested_volumes = self._build_requested_volumes(context, mounts)

cpu_policy = container_dict.pop('cpu_policy', None)
container_dict['cpu_policy'] = cpu_policy

privileged = container_dict.pop('privileged', None)
if privileged is not None:
api_utils.version_check('privileged', '1.21')
Expand Down
1 change: 1 addition & 0 deletions zun/api/controllers/v1/schemas/containers.py
Expand Up @@ -19,6 +19,7 @@
'image': parameter_types.image_name,
'command': parameter_types.command,
'cpu': parameter_types.cpu,
'cpu_policy': parameter_types.cpu_policy,
'memory': parameter_types.memory,
'workdir': parameter_types.workdir,
'auto_remove': parameter_types.auto_remove,
Expand Down
5 changes: 5 additions & 0 deletions zun/api/controllers/v1/schemas/parameter_types.py
Expand Up @@ -97,6 +97,11 @@
'maximum': CONF.maximum_cpus,
}

cpu_policy = {
'type': 'string',
'enum': ['dedicated', 'shared']
}

# TODO(pksingh) Memory provided must be in MBs
# Will find another way if people dont find it useful.
memory = {
Expand Down
1 change: 1 addition & 0 deletions zun/api/controllers/v1/views/containers_view.py
Expand Up @@ -48,6 +48,7 @@
'auto_heal',
'privileged',
'healthcheck',
'cpu_policy',
)


Expand Down
3 changes: 2 additions & 1 deletion zun/api/controllers/versions.py
Expand Up @@ -60,10 +60,11 @@
* 1.25 - Encode/Decode archive file
* 1.26 - Introduce Quota support
* 1.27 - Add support for deleting networks
* 1.28 - Add support cpuset
"""

BASE_VER = '1.1'
CURRENT_MAX_VER = '1.27'
CURRENT_MAX_VER = '1.28'


class Version(object):
Expand Down
6 changes: 6 additions & 0 deletions zun/api/rest_api_version_history.rst
Expand Up @@ -219,3 +219,9 @@ user documentation.
----

Introduce API for deleting network. By default, this is an admin API.

1.28
----

Add a new attribute 'cpu_policy'.
Users can use this attribute to determine which CPU policy the container uses.
46 changes: 44 additions & 2 deletions zun/compute/claims.py
Expand Up @@ -18,6 +18,7 @@
"""

from oslo_log import log as logging
import random

from zun.common import exception
from zun.common.i18n import _
Expand Down Expand Up @@ -85,6 +86,10 @@ def __init__(self, context, container, tracker, resources, pci_requests,
# Check claim at constructor to avoid mess code
# Raise exception ResourcesUnavailable if claim failed
self._claim_test(resources, limits)
if container.cpu_policy == 'dedicated':
container.cpuset = objects.container.Cpuset()
self.claim_cpuset_cpu_for_container(container, limits)
self.claim_cpuset_mem_for_container(container, limits)

@property
def memory(self):
Expand All @@ -103,6 +108,15 @@ def abort(self):
LOG.debug("Aborting claim: %s", self)
self.tracker.abort_container_claim(self.context, self.container)

def claim_cpuset_cpu_for_container(self, container, limits):
avaliable_cpu = list(set(limits['cpuset']['cpuset_cpu']) -
set(limits['cpuset']['cpuset_cpu_pinned']))
cpuset_cpu_usage = random.sample(avaliable_cpu, int(self.cpu))
container.cpuset.cpuset_cpus = set(cpuset_cpu_usage)

def claim_cpuset_mem_for_container(self, container, limits):
container.cpuset.cpuset_mems = set(limits['cpuset']['node'])

def _claim_test(self, resources, limits=None):
"""Test if this claim can be satisfied.
Expand All @@ -122,6 +136,7 @@ def _claim_test(self, resources, limits=None):
memory_limit = limits.get('memory')
cpu_limit = limits.get('cpu')
disk_limit = limits.get('disk')
cpuset_limit = limits.get('cpuset', None)

LOG.info('Attempting claim: memory %(memory)s, '
'cpu %(cpu).02f CPU, disk %(disk)s',
Expand All @@ -130,8 +145,9 @@ def _claim_test(self, resources, limits=None):
reasons = [self._test_memory(resources, memory_limit),
self._test_cpu(resources, cpu_limit),
self._test_disk(resources, disk_limit),
self._test_pci()]
# TODO(Shunli): test numa here
self._test_pci(),
self._test_cpuset_cpu(resources, cpuset_limit),
self._test_cpuset_mem(resources, cpuset_limit)]
reasons = [r for r in reasons if r is not None]
if len(reasons) > 0:
raise exception.ResourcesUnavailable(reason="; ".join(reasons))
Expand Down Expand Up @@ -163,6 +179,32 @@ def _test_cpu(self, resources, limit):

return self._test(type_, unit, total, used, requested, limit)

def _test_cpuset_cpu(self, resources, limit):
if limit:
type_ = _("cpuset_cpu")
unit = "core"
total = len(limit['cpuset_cpu'])
used = len(limit['cpuset_cpu_pinned'])
requested = self.cpu

return self._test(type_, unit, total, used, requested,
len(limit['cpuset_cpu']))
else:
return

def _test_cpuset_mem(self, resources, limit):
if limit:
type_ = _("cpuset_mem")
unit = "M"
total = resources.numa_topology.nodes[limit['node']].mem_total
used = 0
requested = self.memory

return self._test(type_, unit, total, used, requested,
limit['cpuset_mem'])
else:
return

def _test_disk(self, resources, limit):
type_ = _("disk")
unit = "GB"
Expand Down
22 changes: 20 additions & 2 deletions zun/compute/compute_node_tracker.py
Expand Up @@ -226,8 +226,14 @@ def _update_usage(self, usage, sign=1):
mem_usage = usage['memory']
cpus_usage = usage.get('cpu', 0)
disk_usage = usage['disk']
cpuset_cpus_usage = None
numa_node_id = 0
if 'cpuset_cpus' in usage.keys():
cpuset_cpus_usage = usage['cpuset_cpus']
numa_node_id = usage['node']

cn = self.compute_node
numa_topology = cn.numa_topology.nodes
cn.mem_used += sign * mem_usage
cn.cpu_used += sign * cpus_usage
cn.disk_used += sign * disk_usage
Expand All @@ -237,7 +243,17 @@ def _update_usage(self, usage, sign=1):

cn.running_containers += sign * 1

# TODO(Shunli): Calculate the numa usage here
if cpuset_cpus_usage:
for numa_node in numa_topology:
if numa_node.id == numa_node_id:
numa_node.mem_available = (numa_node.mem_available -
mem_usage * sign)
if sign > 0:
numa_node.pin_cpus(cpuset_cpus_usage)
cn._changed_fields.add('numa_topology')
else:
numa_node.unpin_cpus(cpuset_cpus_usage)
cn._changed_fields.add('numa_topology')

def _update(self, compute_node):
if not self._resource_change(compute_node):
Expand Down Expand Up @@ -301,7 +317,9 @@ def _get_usage_dict(self, container, **updates):
usage = {'memory': memory,
'cpu': container.cpu or 0,
'disk': container.disk or 0}
# update numa usage here
if container.cpuset.cpuset_cpus:
usage['cpuset_cpus'] = container.cpuset.cpuset_cpus
usage['node'] = int(container.cpuset.cpuset_mems)

return usage

Expand Down
17 changes: 17 additions & 0 deletions zun/compute/manager.py
Expand Up @@ -850,6 +850,9 @@ def container_update(self, context, container, patch):
# FIXME(hongbin): rt.compute_node could be None
limits = {'cpu': rt.compute_node.cpus,
'memory': rt.compute_node.mem_total}
if container.cpu_policy == 'dedicated':
limits['cpuset'] = self._get_cpuset_limits(rt.compute_node,
container)
with rt.container_update_claim(context, container, old_container,
limits):
self.driver.update(context, container)
Expand Down Expand Up @@ -1103,6 +1106,20 @@ def inventory_host(self, context):
rt = self._get_resource_tracker()
rt.update_available_resources(context)

def _get_cpuset_limits(self, compute_node, container):
for numa_node in compute_node.numa_topology.nodes:
if len(numa_node.cpuset) - len(
numa_node.pinned_cpus) >= container.cpu and \
numa_node.mem_available >= container.memory:
return {
'node': numa_node.id,
'cpuset_cpu': numa_node.cpuset,
'cpuset_cpu_pinned': numa_node.pinned_cpus,
'cpuset_mem': numa_node.mem_available
}
msg = _("There may be not enough numa resources.")
raise exception.NoValidHost(reason=msg)

def _get_resource_tracker(self):
if not self._resource_tracker:
rt = compute_node_tracker.ComputeNodeTracker(self.host,
Expand Down
4 changes: 4 additions & 0 deletions zun/conf/compute.py
Expand Up @@ -25,6 +25,10 @@
'reserve_disk_for_image',
default=0.2,
help='reserve disk for docker images'),
cfg.BoolOpt(
'enable_cpu_pinning',
default=False,
help='allow the container with cpu_policy is dedicated'),
]

service_opts = [
Expand Down
3 changes: 3 additions & 0 deletions zun/container/docker/driver.py
Expand Up @@ -311,6 +311,9 @@ def create(self, context, container, image, requested_networks,
if container.disk:
disk_size = str(container.disk) + 'G'
host_config['storage_opt'] = {'size': disk_size}
if container.cpu_policy == 'dedicated':
host_config['cpuset_cpus'] = container.cpuset.cpuset_cpus
host_config['cpuset_mems'] = str(container.cpuset.cpuset_mems)
# The time unit in docker of heath checking is us, and the unit
# of interval and timeout is seconds.
if container.healthcheck:
Expand Down
6 changes: 5 additions & 1 deletion zun/container/os_capability/host_capability.py
Expand Up @@ -38,9 +38,11 @@ def get_host_numa_topology(self, numa_topo_obj):
# Replace this call with a more generic call when we obtain other
# NUMA related data like memory etc.
cpu_info = self.get_cpu_numa_info()
mem_info = self.get_mem_numa_info()
floating_cpus = utils.get_floating_cpu_set()
numa_node_obj = []
for node, cpuset in cpu_info.items():
for cpu, mem_total in zip(cpu_info.items(), mem_info):
node, cpuset = cpu
numa_node = objects.NUMANode()
if floating_cpus:
allowed_cpus = set(cpuset) - (floating_cpus & set(cpuset))
Expand All @@ -52,6 +54,8 @@ def get_host_numa_topology(self, numa_topo_obj):
# in nature.
numa_node.cpuset = allowed_cpus
numa_node.pinned_cpus = set([])
numa_node.mem_total = mem_total
numa_node.mem_available = mem_total
numa_node_obj.append(numa_node)
numa_topo_obj.nodes = numa_node_obj

Expand Down
14 changes: 14 additions & 0 deletions zun/container/os_capability/linux/os_capability_linux.py
Expand Up @@ -52,3 +52,17 @@ def get_cpu_numa_info(self):
elif len(val) == 2 and old_lscpu:
sock_map[val[0]].append(int(val[1]))
return sock_map

def get_mem_numa_info(self):
try:
output = utils.execute('numactl', '-H')
except exception.CommandError:
LOG.info("There was a problem while executing numactl -H, "
"Try again without the online column.")
return []

sizes = re.findall("size\: \d*", str(output))
mem_numa = []
for size in sizes:
mem_numa.append(int(size.split(' ')[1]))
return mem_numa
@@ -0,0 +1,38 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.


"""support cpuset
Revision ID: 2b129060baff
Revises: 33cdd98bb9b2
Create Date: 2018-011-10 10:08:40.547664
"""

# revision identifiers, used by Alembic.
revision = '2b129060baff'
down_revision = '33cdd98bb9b2'
branch_labels = None
depends_on = None

from alembic import op
import sqlalchemy as sa

from zun.db.sqlalchemy import models


def upgrade():
op.add_column('container',
sa.Column('cpu_policy', sa.String(length=255)))
op.add_column('container',
sa.Column('cpuset', models.JSONEncodedDict, nullable=True))
2 changes: 2 additions & 0 deletions zun/db/sqlalchemy/models.py
Expand Up @@ -142,6 +142,8 @@ class Container(Base):
name = Column(String(255))
image = Column(String(255))
cpu = Column(Float)
cpu_policy = Column(String(255), default='shared')
cpuset = Column(JSONEncodedDict, nullable=True)
command = Column(JSONEncodedList)
memory = Column(String(255))
status = Column(String(20))
Expand Down

0 comments on commit c9d7639

Please sign in to comment.