Skip to content

Commit

Permalink
Merge "Re-factored Heartbeat implementation"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and openstack-gerrit committed Mar 15, 2020
2 parents 2a05203 + 3fccc25 commit 46dc595
Show file tree
Hide file tree
Showing 15 changed files with 176 additions and 246 deletions.
4 changes: 2 additions & 2 deletions designate/api/wsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from paste import deploy

from designate import conf
from designate import heartbeat_emitter
from designate import policy
from designate import rpc
from designate import service
from designate.common import config

CONF = conf.CONF
Expand All @@ -47,7 +47,7 @@ def init_application():
if not rpc.initialized():
rpc.init(CONF)

heartbeat = service.Heartbeat('api')
heartbeat = heartbeat_emitter.get_heartbeat_emitter('api')
heartbeat.start()

conf = conf_files[0]
Expand Down
3 changes: 2 additions & 1 deletion designate/cmd/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from oslo_reports import guru_meditation_report as gmr

import designate.conf
from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
Expand All @@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()

server = agent_service.Service()
heartbeat = service.Heartbeat(server.service_name)
heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:agent'].workers)
heartbeat.start()
service.wait()
3 changes: 2 additions & 1 deletion designate/cmd/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from oslo_reports import guru_meditation_report as gmr

import designate.conf
from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
Expand All @@ -40,7 +41,7 @@ def main():
hookpoints.log_hook_setup()

server = api_service.Service()
heartbeat = service.Heartbeat(server.service_name)
heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:api'].workers)
heartbeat.start()
service.wait()
4 changes: 3 additions & 1 deletion designate/cmd/central.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from oslo_reports import guru_meditation_report as gmr

import designate.conf
from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
Expand All @@ -38,7 +39,8 @@ def main():
hookpoints.log_hook_setup()

server = central_service.Service()
heartbeat = service.Heartbeat(server.service_name, rpc_api=server)
heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name,
rpc_api=server)
service.serve(server, workers=CONF['service:central'].workers)
heartbeat.start()
service.wait()
3 changes: 2 additions & 1 deletion designate/cmd/mdns.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from oslo_reports import guru_meditation_report as gmr

import designate.conf
from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
Expand All @@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()

server = mdns_service.Service()
heartbeat = service.Heartbeat(server.service_name)
heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:mdns'].workers)
heartbeat.start()
service.wait()
3 changes: 2 additions & 1 deletion designate/cmd/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from oslo_reports import guru_meditation_report as gmr

import designate.conf
from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
Expand All @@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()

server = producer_service.Service()
heartbeat = service.Heartbeat(server.service_name)
heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:producer'].workers)
heartbeat.start()
service.wait()
3 changes: 2 additions & 1 deletion designate/cmd/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from oslo_reports import guru_meditation_report as gmr

import designate.conf
from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
Expand All @@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()

server = sink_service.Service()
heartbeat = service.Heartbeat(server.service_name)
heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:sink'].workers)
heartbeat.start()
service.wait()
3 changes: 2 additions & 1 deletion designate/cmd/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from oslo_reports import guru_meditation_report as gmr

import designate.conf
from designate import heartbeat_emitter
from designate import hookpoints
from designate import service
from designate import utils
Expand All @@ -38,7 +39,7 @@ def main():
hookpoints.log_hook_setup()

server = worker_service.Service()
heartbeat = service.Heartbeat(server.service_name)
heartbeat = heartbeat_emitter.get_heartbeat_emitter(server.service_name)
service.serve(server, workers=CONF['service:worker'].workers)
heartbeat.start()
service.wait()
73 changes: 40 additions & 33 deletions designate/service_status.py → designate/heartbeat_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,75 +28,82 @@
LOG = logging.getLogger(__name__)


class HeartBeatEmitter(plugin.DriverPlugin):
def get_heartbeat_emitter(service_name, **kwargs):
cls = HeartbeatEmitter.get_driver(
CONF.heartbeat_emitter.emitter_type
)
return cls(service_name, **kwargs)


class HeartbeatEmitter(plugin.DriverPlugin):
__plugin_ns__ = 'designate.heartbeat_emitter'
__plugin_type__ = 'heartbeat_emitter'

def __init__(self, service, status_factory=None, *args, **kwargs):
super(HeartBeatEmitter, self).__init__()
def __init__(self, service_name, **kwargs):
super(HeartbeatEmitter, self).__init__()

self._service = service
self._status = 'UP'
self._stats = {}
self._capabilities = {}

self._service_name = service_name
self._hostname = CONF.host

self._timer = loopingcall.FixedIntervalLoopingCall(
self._emit_heartbeat
)
self._status_factory = status_factory

def _get_status(self):
if self._status_factory is not None:
return self._status_factory()
def start(self):
self._timer.start(
CONF.heartbeat_emitter.heartbeat_interval,
stop_on_exception=False
)

def stop(self):
self._timer.stop()

def get_status(self):
return self._status, self._stats, self._capabilities

return True, {}, {}
@abc.abstractmethod
def transmit(self, status):
pass

def _emit_heartbeat(self):
"""
Returns Status, Stats, Capabilities
"""
status, stats, capabilities = self._get_status()
status, stats, capabilities = self.get_status()

service_status = objects.ServiceStatus(
service_name=self._service,
service_name=self._service_name,
hostname=self._hostname,
status=status,
stats=stats,
capabilities=capabilities,
heartbeated_at=timeutils.utcnow()
)

LOG.trace("Emitting %s", service_status)

self._transmit(service_status)
LOG.trace('Emitting %s', service_status)

@abc.abstractmethod
def _transmit(self, status):
pass

def start(self):
self._timer.start(
CONF.heartbeat_emitter.heartbeat_interval,
stop_on_exception=False
)

def stop(self):
self._timer.stop()
self.transmit(service_status)


class NoopEmitter(HeartBeatEmitter):
class NoopEmitter(HeartbeatEmitter):
__plugin_name__ = 'noop'

def _transmit(self, status):
LOG.debug(status)
def transmit(self, status):
LOG.info(status)


class RpcEmitter(HeartBeatEmitter):
class RpcEmitter(HeartbeatEmitter):
__plugin_name__ = 'rpc'

def __init__(self, service, rpc_api=None, *args, **kwargs):
super(RpcEmitter, self).__init__(service, *args, **kwargs)
def __init__(self, service_name, rpc_api=None, **kwargs):
super(RpcEmitter, self).__init__(service_name, **kwargs)
self.rpc_api = rpc_api

def _transmit(self, status):
def transmit(self, status):
admin_context = context.DesignateContext.get_admin_context()
api = self.rpc_api or central_rpcapi.CentralAPI.get_instance()
api.update_service_status(admin_context, status)
27 changes: 0 additions & 27 deletions designate/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

from designate import policy
from designate import rpc
from designate import service_status
from designate import utils
from designate import version
import designate.conf
Expand Down Expand Up @@ -68,32 +67,6 @@ def stop(self, graceful=True):
super(Service, self).stop(graceful)


class Heartbeat(object):
def __init__(self, name, rpc_api=None):
self.name = name

self._status = 'UP'
self._stats = {}
self._capabilities = {}

emitter_cls = service_status.HeartBeatEmitter.get_driver(
CONF.heartbeat_emitter.emitter_type
)
self.heartbeat_emitter = emitter_cls(
self.name,
status_factory=self.get_status, rpc_api=rpc_api
)

def get_status(self):
return self._status, self._stats, self._capabilities

def start(self):
self.heartbeat_emitter.start()

def stop(self):
self.heartbeat_emitter.stop()


class RPCService(Service):
def __init__(self, name, rpc_topic, threads=None):
super(RPCService, self).__init__(name, threads)
Expand Down
2 changes: 1 addition & 1 deletion designate/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
group='service:api')
CONF.import_opt('connection', 'designate.storage.impl_sqlalchemy',
group='storage:sqlalchemy')
CONF.import_opt('emitter_type', 'designate.service_status',
CONF.import_opt('emitter_type', 'designate.heartbeat_emitter',
group="heartbeat_emitter")
CONF.import_opt('scheduler_filters', 'designate.scheduler',
group="service:central")
Expand Down
55 changes: 0 additions & 55 deletions designate/tests/unit/test_heartbeat.py

This file was deleted.

0 comments on commit 46dc595

Please sign in to comment.