Skip to content
This repository has been archived by the owner on Mar 22, 2018. It is now read-only.

ContainerManagerService #35

Merged
merged 4 commits into from
Dec 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions conf/containermgr.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"container_handlers": [{
"name": "OpenshiftA1",
"handler": "commissaire.containermgr.kubernetes",
"server_url": "http://192.168.152.102:8080/"
}]
}
12 changes: 12 additions & 0 deletions conf/systemd/commissaire-containermgr.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[Unit]
Description=Commissaire Container Manager Service
Documentation=https://commissaire.readthedocs.io/
After=network.target

[Service]
ExecStart=/usr/bin/commissaire-containermgr-service -c /etc/commisasire/containermgr.conf
PIDFile=/var/run/commissaire-containermgr-service.pid
Type=simple

[Install]
WantedBy=multi-user.target
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def extract_requirements(filename):
'commissaire_service.storage:main'),
('commissaire-clusterexec-service = '
'commissaire_service.clusterexec:main'),
('commissaire-containermgr-service = '
'commissaire_service.containermgr:main'),
('commissaire-investigator-service = '
'commissaire_service.investigator:main'),
('commissaire-watcher-service = '
Expand Down
256 changes: 256 additions & 0 deletions src/commissaire_service/containermgr/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
# Copyright (C) 2016 Red Hat, Inc
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import importlib

from commissaire.containermgr import ContainerManagerError
from commissaire.util.config import ConfigurationError, read_config_file

from commissaire_service.service import CommissaireService
from commissaire_service.containermgr.containerhandlermanager import (
ContainerHandlerManager)


class ContainerManagerService(CommissaireService):
"""
Provides access to Container Managers.
"""

def __init__(self, exchange_name, connection_url, config_file=None):
"""
Creates a new ContainerManagerService and sets up ContainerHandler
instances according to the config_file. If config_file is omitted,
it will try the default location (/etc/commissaire/commissaire.conf).

:param exchange_name: Name of the topic exchange
:type exchange_name: str
:param connection_url: Kombu connection URL
:type connection_url: str
:param config_file: Optional configuration file path
:type config_file: str or None
"""
queue_kwargs = [{
'name': 'containermgr',
'routing_key': 'container.*',
'exclusive': False,
}]
super().__init__(exchange_name, connection_url, queue_kwargs)
self._manager = ContainerHandlerManager()

config_data = read_config_file(config_file)
container_handlers = config_data.get('container_handlers', [])

if len(container_handlers) == 0:
self.logger.info('No ContainerManagerHandlers were provided.')
for config in container_handlers:
self.register(config)

def register(self, config):
"""
Registers a new container handler type after extracting and validating
information required for registration from the configuration data.

:param config: A configuration dictionary
:type config: dict
"""
if type(config) is not dict:
raise ConfigurationError(
'Store handler format must be a JSON object, got a '
'{} instead: {}'.format(type(config).__name__, config))

# Import the handler class.
try:
module_name = config.pop('handler')
except KeyError as error:
raise ConfigurationError(
'Container handler configuration missing "{}" key: '
'{}'.format(error, config))
try:
module = importlib.import_module(module_name)
handler_type = getattr(module, 'ContainerHandler')
except ImportError:
raise ConfigurationError(
'Invalid container handler module name: {}'.format(
module_name))

self._manager.register(handler_type, config)

def on_list_handlers(self, message):
"""
Handler for the "container.list_handlers" routing key.

Returns a list of registered container handlers as dictionaries.
Each dictionary contains the following:

'name' : The name of the container handler
'handler_type' : Type type of the container handler
'config' : Dictionary of configuration values

:param message: A message instance
:type message: kombu.message.Message
"""
result = []
for name, handler in self._manager.handlers.items():
result.append({
'name': name,
'handler_type': handler.__class__.__name__,
})
return result

def on_node_registered(self, message, container_handler_name, address):
"""
Checks if a node is registered to a specific container manager.

:param message: A message instance
:type message: kombu.message.Message
:param container_handler_name: Name of the ContainerHandler to use.
:type container_handler_name: str
:param address: Address of the node
:type address: str
:returns: Whether the node is registered
:rtype: bool
"""
return self._node_operation(
container_handler_name, 'node_registered', address)

def on_register_node(self, message, container_handler_name, address):
"""
Registers a node to a container manager.

:param message: A message instance
:type message: kombu.message.Message
:param container_handler_name: Name of the ContainerHandler to use.
:type container_handler_name: str
:param address: Address of the node
:type address: str
:returns: Whether the node is registered
:rtype: bool
"""
return self._node_operation(
container_handler_name, 'register_node', address)

def on_remove_node(self, message, container_handler_name, address):
"""
Removes a node from a container manager.

:param message: A message instance
:type message: kombu.message.Message
:param container_handler_name: Name of the ContainerHandler to use.
:type container_handler_name: str
:param address: Address of the node
:type address: str
:returns: Whether the node is registered
:rtype: bool
"""
return self._node_operation(
container_handler_name, 'remove_node', address)

def _node_operation(self, container_handler_name, method, address):
"""
Common code for getting node information.

:param container_handler_name: Name of the ContainerHandler to use.
:type container_handler_name: str
:param method: The containermgr method to call.
:type method: str
:param address: Address of the node
:type address: str
:returns: Whether the node is registered
:rtype: bool
"""
try:
container_handler = self._manager.handlers[container_handler_name]
result = getattr(container_handler, method).__call__(address)

self.logger.info(
'{} called for {} via the container manager {}'.format(
method, address, container_handler_name))
self.logger.debug('Result: {}'.format(result))

if bool(result):
return result

except ContainerManagerError as error:
self.logger.info('{} raised ContainerManagerError: {}'.format(
error))
except KeyError:
self.logger.error('ContainerHandler {} does not exist.'.format(
container_handler_name))
except Exception as error:
self.logger.error(
'Unexpected error while attempting {} for node "{}" with '
'containermgr "{}". {}: {}'.format(
method, address, container_handler_name,
error.__class__.__name__, error))

return False

def on_get_node_status(self, message, container_handler_name, address):
"""
Gets a nodes status from the container manager.

:param message: A message instance
:type message: kombu.message.Message
:param container_handler_name: Name of the ContainerHandler to use.
:type container_handler_name: str
:param address: Address of the node
:type address: str
:returns: Status of the node according to the container manager.
:rtype: dict
"""
result = self._node_operation(
container_handler_name, 'get_node_status', address)
if result is False:
error = 'No status available for node {}'.format(address)
self.logger.error(result)
raise Exception(error)
return result


def main(): # pragma: no cover
"""
Main entry point.
"""
import argparse

parser = argparse.ArgumentParser()
parser.add_argument(
'-c', '--config', type=str,
help='Configuration file to use.')
parser.add_argument(
'--bus-exchange', type=str, default='commissaire',
help='Message bus exchange name.')
parser.add_argument(
'--bus-uri', type=str, metavar='BUS_URI',
default='redis://127.0.0.1:6379/', # FIXME: Remove before release
help=(
'Message bus connection URI. See:'
'http://kombu.readthedocs.io/en/latest/userguide/connections.html')
)

args = parser.parse_args()

try:
service = ContainerManagerService(
exchange_name=args.bus_exchange,
connection_url=args.bus_uri,
config_file=args.config)
service.run()
except KeyboardInterrupt:
pass


if __name__ == '__main__': # pragma: no cover
main()
57 changes: 57 additions & 0 deletions src/commissaire_service/containermgr/containerhandlermanager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (C) 2016 Red Hat, Inc
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging


class ContainerHandlerManager(object):
"""
Configures ContainerHandler instances and routes storage requests to
the appropriate handler.
"""

def __init__(self):
"""
Creates a new ContainerHandlerManager instance.
"""
self._handlers = {}
self.logger = logging.getLogger('containermgr')
self.logger.setLevel(logging.DEBUG)

def register(self, handler_type, config):
"""
Registers a ContainerHandler for use in remote calls.

:param handler_type: A class derived from ContainerHandler
:type handler_type: type
:param config: Configuration parameters for the handler
:type config: dict
"""
handler_type.check_config(config)
self._handlers[config['name']] = handler_type(config)
self.logger.info('Registered container handler {}'.format(
config['name']))
self.logger.debug('{}: {}'.format(
self._handlers[config['name']], config))

@property
def handlers(self):
"""
Returns all configured container manager instances.

:returns: dict of container managers
:rtype: dict
"""
return self._handlers
37 changes: 0 additions & 37 deletions src/commissaire_service/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import importlib
import json

from time import sleep

import commissaire.models as models

from commissaire import constants as C
Expand Down Expand Up @@ -230,41 +228,6 @@ def on_list_store_handlers(self, message):
})
return result

def on_node_registered(self, message, cluster_type, address):
"""
Checks if a cluster node at the given address is registered on a
cluster of the given type. This method may take several seconds
to complete if the cluster node is unresponsive, as it retries a
few times with a sleep delay.

:param message: A message instance
:type message: kombu.message.Message
:param cluster_type: A cluster type constant
:type cluster_type: str
:param address: Address of the cluster node
:type address: str
:returns: Whether the node is registered
:rtype: bool
"""
for con_mgr in self._manager.list_container_managers(cluster_type):
# Try 3 times waiting 5 seconds each time before giving up.
for attempt in range(3):
if con_mgr.node_registered(address):
self.logger.info(
'{} has been registered with the '
'container manager'.format(address))
return True
if attempt == 2:
self.logger.warn(
'Could not register with the container manager')
return False
self.logger.debug(
'{} has not been registered with the container '
'manager. Checking again in 5 seconds...'.format(
address))
sleep(5)
return False


def main(): # pragma: no cover
"""
Expand Down