Skip to content

Commit

Permalink
Refactored service layer
Browse files Browse the repository at this point in the history
Service layer has been simplified by removing abstraction
and making the implementation more in line with other
OpenStack projects.

Moved Heartbeat code out of Service class and
into the console scripts. We only need one instance
of the Heartbeat Emitter.

Cleaned up the WSGI code by making use of the
reusable oslo_service.wsgi code.

* Added Heartbeat to designate-sink.
* Cleaned up and refactored Service layers.
* Fixed various bugs e.g. errors on shutdown.
* Removed deprecated options host, port etc.
* Simplified Heartbeat implementation.

Closes-Bug: #1442141
Change-Id: I536b92407bf6ca5bddf4c048909cd13d4e094d26
  • Loading branch information
eandersson committed Sep 20, 2019
1 parent 23f6a79 commit a09064a
Show file tree
Hide file tree
Showing 39 changed files with 564 additions and 474 deletions.
1 change: 0 additions & 1 deletion designate/__init__.py
Expand Up @@ -24,7 +24,6 @@
import oslo_messaging as messaging

_EXTRA_DEFAULT_LOG_LEVELS = [
'eventlet.wsgi.server=WARN',
'kazoo.client=WARN',
'keystone=INFO',
'oslo_service.loopingcall=WARN',
Expand Down
36 changes: 23 additions & 13 deletions designate/agent/service.py
Expand Up @@ -37,35 +37,45 @@
CONF = cfg.CONF


class Service(service.DNSService, service.Service):
class Service(service.Service):
_dns_default_port = DEFAULT_AGENT_PORT

def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
def __init__(self):
super(Service, self).__init__(
self.service_name, threads=cfg.CONF['service:agent'].threads
)

self.dns_service = service.DNSService(
self.dns_application, self.tg,
cfg.CONF['service:agent'].listen,
cfg.CONF['service:agent'].tcp_backlog,
cfg.CONF['service:agent'].tcp_recv_timeout,
)

backend_driver = cfg.CONF['service:agent'].backend_driver
self.backend = agent_backend.get_backend(backend_driver, self)

def start(self):
super(Service, self).start()
self.dns_service.start()
self.backend.start()

def stop(self, graceful=False):
self.dns_service.stop()
self.backend.stop()
super(Service, self).stop(graceful)

@property
def service_name(self):
return 'agent'

@property
@utils.cache_result
def _dns_application(self):
def dns_application(self):
# Create an instance of the RequestHandler class
application = handler.RequestHandler()
if cfg.CONF['service:agent'].notify_delay > 0.0:
application = dnsutils.LimitNotifyMiddleware(application)
application = dnsutils.SerializationMiddleware(application)

return application

def start(self):
super(Service, self).start()
self.backend.start()

def stop(self):
super(Service, self).stop()
# TODO(kiall): Shouldn't we be stppping the backend here too? To fix
# in another review.
29 changes: 10 additions & 19 deletions designate/api/service.py
Expand Up @@ -18,41 +18,32 @@
from paste import deploy

from designate import exceptions
from designate import utils
from designate import service
from designate import service_status

from designate import utils

LOG = logging.getLogger(__name__)


class Service(service.WSGIService, service.Service):
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)

emitter_cls = service_status.HeartBeatEmitter.get_driver(
cfg.CONF.heartbeat_emitter.emitter_type
)
self.heartbeat_emitter = emitter_cls(
self.service_name, self.tg, status_factory=self._get_status
class Service(service.WSGIService):
def __init__(self):
super(Service, self).__init__(
self.wsgi_application,
self.service_name,
cfg.CONF['service:api'].listen,
)

def start(self):
super(Service, self).start()
self.heartbeat_emitter.start()

def _get_status(self):
status = "UP"
stats = {}
capabilities = {}
return status, stats, capabilities
def stop(self, graceful=True):
super(Service, self).stop(graceful)

@property
def service_name(self):
return 'api'

@property
def _wsgi_application(self):
def wsgi_application(self):
api_paste_config = cfg.CONF['service:api'].api_paste_config
config_paths = utils.find_config(api_paste_config)

Expand Down
29 changes: 16 additions & 13 deletions designate/central/service.py
Expand Up @@ -184,37 +184,40 @@ def notification_wrapper(self, *args, **kwargs):
return outer


class Service(service.RPCService, service.Service):
class Service(service.RPCService):
RPC_API_VERSION = '6.2'

target = messaging.Target(version=RPC_API_VERSION)

def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
def __init__(self):
self._scheduler = None
self._storage = None
self._quota = None

self.network_api = network_api.get_network_api(cfg.CONF.network_api)
super(Service, self).__init__(
self.service_name, cfg.CONF['service:central'].topic,
threads=cfg.CONF['service:central'].threads,
)

# update_service_status needs is called by the emitter so we pass
# ourselves as the rpc_api.
self.heartbeat_emitter.rpc_api = self
self.network_api = network_api.get_network_api(cfg.CONF.network_api)

@property
def scheduler(self):
if not hasattr(self, '_scheduler'):
if not self._scheduler:
# Get a scheduler instance
self._scheduler = scheduler.get_scheduler(storage=self.storage)
return self._scheduler

@property
def quota(self):
if not hasattr(self, '_quota'):
if not self._quota:
# Get a quota manager instance
self._quota = quota.get_quota()
return self._quota

@property
def storage(self):
if not hasattr(self, '_storage'):
if not self._storage:
# Get a storage connection
storage_driver = cfg.CONF['service:central'].storage_driver
self._storage = storage.get_storage(storage_driver)
Expand All @@ -232,8 +235,8 @@ def start(self):

super(Service, self).start()

def stop(self):
super(Service, self).stop()
def stop(self, graceful=True):
super(Service, self).stop(graceful)

@property
def mdns_api(self):
Expand All @@ -251,7 +254,7 @@ def worker_api(self):
def zone_api(self):
# TODO(timsim): Remove this when pool_manager_api is gone
if cfg.CONF['service:worker'].enabled:
return self.worker_api
return self.worker_api
return self.pool_manager_api

def _is_valid_zone_name(self, context, zone_name):
Expand Down
5 changes: 3 additions & 2 deletions designate/cmd/agent.py
Expand Up @@ -28,7 +28,6 @@

CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.agent', group='service:agent')
CONF.import_opt('threads', 'designate.agent', group='service:agent')


def main():
Expand All @@ -38,6 +37,8 @@ def main():

hookpoints.log_hook_setup()

server = agent_service.Service(threads=CONF['service:agent'].threads)
server = agent_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:agent'].workers)
heartbeat.start()
service.wait()
6 changes: 3 additions & 3 deletions designate/cmd/api.py
Expand Up @@ -29,7 +29,6 @@

CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.api', group='service:api')
CONF.import_opt('threads', 'designate.api', group='service:api')
cfg.CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token')


Expand All @@ -40,7 +39,8 @@ def main():

hookpoints.log_hook_setup()

server = api_service.Service(threads=CONF['service:api'].threads)
server = api_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:api'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()
9 changes: 5 additions & 4 deletions designate/cmd/central.py
Expand Up @@ -23,12 +23,11 @@
from designate import service
from designate import utils
from designate import version
from designate.central import service as central
from designate.central import service as central_service


CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.central', group='service:central')
CONF.import_opt('threads', 'designate.central', group='service:central')


def main():
Expand All @@ -38,7 +37,9 @@ def main():

hookpoints.log_hook_setup()

server = central.Service(threads=CONF['service:central'].threads)
server = central_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg,
rpc_api=server)
service.serve(server, workers=CONF['service:central'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()
6 changes: 3 additions & 3 deletions designate/cmd/mdns.py
Expand Up @@ -28,7 +28,6 @@

CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.mdns', group='service:mdns')
CONF.import_opt('threads', 'designate.mdns', group='service:mdns')


def main():
Expand All @@ -38,7 +37,8 @@ def main():

hookpoints.log_hook_setup()

server = mdns_service.Service(threads=CONF['service:mdns'].threads)
server = mdns_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:mdns'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()
9 changes: 3 additions & 6 deletions designate/cmd/pool_manager.py
Expand Up @@ -30,8 +30,6 @@
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.pool_manager',
group='service:pool_manager')
CONF.import_opt('threads', 'designate.pool_manager',
group='service:pool_manager')


def main():
Expand All @@ -53,12 +51,11 @@ def main():
'designate-worker', version='newton',
removal_version='rocky')

server = pool_manager_service.Service(
threads=CONF['service:pool_manager'].threads
)
server = pool_manager_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)

hookpoints.log_hook_setup()

service.serve(server, workers=CONF['service:pool_manager'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()
6 changes: 3 additions & 3 deletions designate/cmd/producer.py
Expand Up @@ -28,7 +28,6 @@
LOG = logging.getLogger(__name__)
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.producer', group='service:producer')
CONF.import_opt('threads', 'designate.producer', group='service:producer')


def main():
Expand All @@ -46,7 +45,8 @@ def main():

hookpoints.log_hook_setup()

server = producer_service.Service(threads=CONF['service:producer'].threads)
server = producer_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:producer'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()
5 changes: 3 additions & 2 deletions designate/cmd/sink.py
Expand Up @@ -28,7 +28,6 @@

CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.sink', group='service:sink')
CONF.import_opt('threads', 'designate.sink', group='service:sink')


def main():
Expand All @@ -38,6 +37,8 @@ def main():

hookpoints.log_hook_setup()

server = sink_service.Service(threads=CONF['service:sink'].threads)
server = sink_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:sink'].workers)
heartbeat.start()
service.wait()
6 changes: 3 additions & 3 deletions designate/cmd/worker.py
Expand Up @@ -28,7 +28,6 @@
LOG = logging.getLogger(__name__)
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.worker', group='service:worker')
CONF.import_opt('threads', 'designate.worker', group='service:worker')


def main():
Expand All @@ -46,7 +45,8 @@ def main():

hookpoints.log_hook_setup()

server = worker_service.Service(threads=CONF['service:worker'].threads)
server = worker_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:worker'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()
8 changes: 3 additions & 5 deletions designate/cmd/zone_manager.py
Expand Up @@ -30,8 +30,6 @@
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.producer',
group='service:zone_manager')
CONF.import_opt('threads', 'designate.producer',
group='service:zone_manager')


def main():
Expand All @@ -56,8 +54,8 @@ def main():

LOG.warning('Starting designate-producer under the zone-manager name')

server = producer_service.Service(
threads=CONF['service:zone_manager'].threads)
server = producer_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:zone_manager'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()
8 changes: 0 additions & 8 deletions designate/conf/agent.py
Expand Up @@ -27,14 +27,6 @@
help='Number of agent worker processes to spawn'),
cfg.IntOpt('threads', default=1000,
help='Number of agent greenthreads to spawn'),
cfg.IPOpt('host',
deprecated_for_removal=True,
deprecated_reason="Replaced by 'listen' option",
help='Agent Bind Host'),
cfg.PortOpt('port',
deprecated_for_removal=True,
deprecated_reason="Replaced by 'listen' option",
help='Agent Port Number'),
cfg.ListOpt('listen',
default=['0.0.0.0:%d' % DEFAULT_AGENT_PORT],
help='Agent host:port pairs to listen on'),
Expand Down

0 comments on commit a09064a

Please sign in to comment.