Skip to content

Commit

Permalink
CNI split - introducing CNI daemon
Browse files Browse the repository at this point in the history
This commit implements basic CNI daemon service. The aim of this new
entity is to increase scalability of CNI operations by moving watching
for VIF to a separate process.

This commit:
* Introduces kuryr-daemon service
* Implements communication between CNI driver and CNI daemon using HTTP
* Consolidates watching for VIF on CNI side to a single Watcher that
  looks for all the pods on the node it is running on.
* Solves bug 1731485 when running with CNI daemon.
* Enables new service in DevStack plugin
* Provides unit tests for new code.

Follow up patches will include:
- Documentation.
- Support for running in containerized mode.

To test the patch add `enable_service kuryr-daemon` to your DevStack's
local.conf file.

Partial-Bug: 1731485
Co-Authored-By: Janonymous <janonymous.codevulture@gmail.com>
Implements: blueprint cni-split-exec-daemon
Change-Id: I1bd6406dacab0735a94474e146645c63d933be16
  • Loading branch information
dulek and codevulture committed Nov 21, 2017
1 parent 1c2320e commit 2f65d99
Show file tree
Hide file tree
Showing 17 changed files with 983 additions and 74 deletions.
14 changes: 14 additions & 0 deletions devstack/local.conf.sample
Expand Up @@ -176,6 +176,20 @@ enable_service kubelet
# resource events and convert them to Neutron actions
enable_service kuryr-kubernetes


# Kuryr Daemon
# ============
#
# Kuryr can run CNI plugin in daemonized way - i.e. kubelet will run kuryr CNI
# driver and the driver will pass requests to Kuryr daemon running on the node,
# instead of processing them on its own. This limits the number of Kubernetes
# API requests (as only Kuryr Daemon will watch for new pod events) and should
# increase scalability in environments that often delete and create pods.
# Please note that kuryr-daemon is not yet supported in containerized
# deployment. To enable kuryr-daemon uncomment next line.
# enable_service kuryr-daemon


# Containerized Kuryr
# ===================
#
Expand Down
22 changes: 21 additions & 1 deletion devstack/plugin.sh
Expand Up @@ -63,6 +63,10 @@ function configure_kuryr {

iniset "$KURYR_CONFIG" kubernetes port_debug "$KURYR_PORT_DEBUG"

if is_service_enabled kuryr-daemon; then
iniset "$KURYR_CONFIG" cni_daemon daemon_enabled True
fi

create_kuryr_cache_dir

# Neutron API server & Neutron plugin
Expand Down Expand Up @@ -518,10 +522,23 @@ function run_kuryr_kubernetes {
}


function run_kuryr_daemon {
local daemon_bin=$(which kuryr-daemon)
run_process kuryr-daemon "$daemon_bin --config-file $KURYR_CONFIG" root root
}


source $DEST/kuryr-kubernetes/devstack/lib/kuryr_kubernetes

# main loop
if [[ "$1" == "stack" && "$2" == "install" ]]; then
if [[ "$1" == "stack" && "$2" == "pre-install" ]]; then
KURYR_K8S_CONTAINERIZED_DEPLOYMENT=$(trueorfalse False KURYR_K8S_CONTAINERIZED_DEPLOYMENT)
if is_service_enabled kuryr-daemon && [[ "$KURYR_K8S_CONTAINERIZED_DEPLOYMENT" == "True" ]]; then
die $LINENO "Cannot enable kuryr-daemon with KURYR_K8S_CONTAINERIZED_DEPLOYMENT."
fi


elif [[ "$1" == "stack" && "$2" == "install" ]]; then
setup_develop "$KURYR_HOME"
if is_service_enabled kubelet; then
KURYR_K8S_CONTAINERIZED_DEPLOYMENT=$(trueorfalse False KURYR_K8S_CONTAINERIZED_DEPLOYMENT)
Expand Down Expand Up @@ -580,6 +597,8 @@ if [[ "$1" == "stack" && "$2" == "extra" ]]; then
run_k8s_scheduler
fi

run_kuryr_daemon

if is_service_enabled kubelet; then
prepare_kubelet
extract_hyperkube
Expand Down Expand Up @@ -620,6 +639,7 @@ if [[ "$1" == "unstack" ]]; then
elif is_service_enabled kubelet; then
$KURYR_HYPERKUBE_BINARY kubectl delete nodes ${HOSTNAME}
fi
stop_process kuryr-daemon
docker kill devstack-k8s-setup-files
docker rm devstack-k8s-setup-files

Expand Down
22 changes: 22 additions & 0 deletions kuryr_kubernetes/cmd/daemon.py
@@ -0,0 +1,22 @@
# Copyright 2017 NEC Technologies India Pvt. Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from kuryr_kubernetes.cni.daemon import service


start = service.start

if __name__ == '__main__':
start()
151 changes: 96 additions & 55 deletions kuryr_kubernetes/cni/api.py
Expand Up @@ -13,49 +13,25 @@
# License for the specific language governing permissions and limitations
# under the License.


import abc
import six
from six.moves import http_client as httplib
import traceback

import requests

from kuryr.lib._i18n import _
from os_vif.objects import base
from oslo_log import log as logging
from oslo_serialization import jsonutils

from kuryr_kubernetes.cni import utils
from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes import exceptions as k_exc

LOG = logging.getLogger(__name__)
_CNI_TIMEOUT = 60


class CNIConfig(dict):
def __init__(self, cfg):
super(CNIConfig, self).__init__(cfg)

for k, v in self.items():
if not k.startswith('_'):
setattr(self, k, v)


class CNIArgs(object):
def __init__(self, value):
for item in value.split(';'):
k, v = item.split('=', 1)
if not k.startswith('_'):
setattr(self, k, v)


class CNIParameters(object):
def __init__(self, env, cfg):
for k, v in env.items():
if k.startswith('CNI_'):
setattr(self, k, v)
self.config = CNIConfig(cfg)
self.args = CNIArgs(self.CNI_ARGS)

def __repr__(self):
return repr({key: value for key, value in self.__dict__.items() if
key.startswith('CNI_')})


@six.add_metaclass(abc.ABCMeta)
Expand All @@ -70,35 +46,20 @@ def delete(self, params):
raise NotImplementedError()


@six.add_metaclass(abc.ABCMeta)
class CNIRunner(object):

# TODO(ivc): extend SUPPORTED_VERSIONS and format output based on
# requested params.CNI_VERSION and/or params.config.cniVersion
VERSION = '0.3.0'
SUPPORTED_VERSIONS = ['0.3.0']

def __init__(self, plugin):
self._plugin = plugin
@abc.abstractmethod
def _add(self, params):
raise NotImplementedError()

def run(self, env, fin, fout):
try:
params = CNIParameters(env, jsonutils.load(fin))

if params.CNI_COMMAND == 'ADD':
vif = self._plugin.add(params)
self._write_vif(fout, vif)
elif params.CNI_COMMAND == 'DEL':
self._plugin.delete(params)
elif params.CNI_COMMAND == 'VERSION':
self._write_version(fout)
else:
raise k_exc.CNIError(_("unknown CNI_COMMAND: %s")
% params.CNI_COMMAND)
return 0
except Exception as ex:
# LOG.exception
self._write_exception(fout, str(ex))
return 1
@abc.abstractmethod
def _delete(self, params):
raise NotImplementedError()

def _write_dict(self, fout, dct):
output = {'cniVersion': self.VERSION}
Expand All @@ -116,7 +77,31 @@ def _write_exception(self, fout, msg):
def _write_version(self, fout):
self._write_dict(fout, {'supportedVersions': self.SUPPORTED_VERSIONS})

def _write_vif(self, fout, vif):
@abc.abstractmethod
def prepare_env(self, env, stdin):
raise NotImplementedError()

def run(self, env, fin, fout):
try:
# Prepare params according to calling Object
params = self.prepare_env(env, fin)
if env.get('CNI_COMMAND') == 'ADD':
vif = self._add(params)
self._write_dict(fout, vif)
elif env.get('CNI_COMMAND') == 'DEL':
self._delete(params)
elif env.get('CNI_COMMAND') == 'VERSION':
self._write_version(fout)
else:
raise k_exc.CNIError(_("unknown CNI_COMMAND: %s")
% env['CNI_COMMAND'])
return 0
except Exception as ex:
# LOG.exception
self._write_exception(fout, str(ex))
return 1

def _vif_data(self, vif):
result = {}
nameservers = []

Expand All @@ -137,5 +122,61 @@ def _write_vif(self, fout, vif):

if nameservers:
result['dns'] = {'nameservers': nameservers}
return result


self._write_dict(fout, result)
class CNIStandaloneRunner(CNIRunner):

def __init__(self, plugin):
self._plugin = plugin

def _add(self, params):
vif = self._plugin.add(params)
return self._vif_data(vif)

def _delete(self, params):
self._plugin.delete(params)

def prepare_env(self, env, stdin):
return utils.CNIParameters(env, stdin)


class CNIDaemonizedRunner(CNIRunner):

def _add(self, params):
resp = self._make_request('addNetwork', params, httplib.ACCEPTED)
vif = base.VersionedObject.obj_from_primitive(resp.json())
return self._vif_data(vif)

def _delete(self, params):
self._make_request('delNetwork', params, httplib.NO_CONTENT)

def prepare_env(self, env, stdin):
cni_envs = {}
cni_envs.update(
{k: v for k, v in env.items() if k.startswith('CNI_')})
cni_envs['config_kuryr'] = dict(stdin)
return cni_envs

def _make_request(self, path, cni_envs, expected_status=None):
method = 'POST'

address = config.CONF.cni_daemon.bind_address
url = 'http://%s/%s' % (address, path)
try:
LOG.debug('Making request to CNI Daemon. %(method)s %(path)s\n'
'%(body)s',
{'method': method, 'path': url, 'body': cni_envs})
resp = requests.post(url, json=cni_envs,
headers={'Connection': 'close'})
except requests.ConnectionError:
LOG.exception('Looks like %s cannot be reached. Is kuryr-daemon '
'running?', address)
raise
LOG.debug('CNI Daemon returned "%(status)d %(reason)s".',
{'status': resp.status_code, 'reason': resp.reason})
if expected_status and resp.status_code != expected_status:
LOG.error('CNI daemon returned error "%(status)d %(reason)s".',
{'status': resp.status_code, 'reason': resp.reason})
raise k_exc.CNIError('Got invalid status code from CNI daemon.')
return resp
Empty file.

0 comments on commit 2f65d99

Please sign in to comment.