Skip to content

Commit

Permalink
Fix health manager performance regression
Browse files Browse the repository at this point in the history
When running stress tests against the Octavia Health Manager it was
observed that the scalability and performance of the health manager has
degraded.
It was observed that the ORM layer was forming poorly optimized queries,
putting excessive load on the database engine and unnecessary code paths
were executing for each heartbeat message.
This patch optimizes the health manager processing of amphora-agent
heartbeat messages by optimizing the database requests, pool processing,
and event streamer code paths.

Story: 2001896
Task: 14381
(cherry picked from commit f13a2e6)

Change-Id: If2f81129ca94882b42b04ddf5652ff03e8a48edf
  • Loading branch information
johnsom committed Sep 25, 2018
1 parent 67ca77a commit 8c54a22
Show file tree
Hide file tree
Showing 4 changed files with 569 additions and 235 deletions.
212 changes: 124 additions & 88 deletions octavia/controller/healthmanager/health_drivers/update_db.py
Expand Up @@ -47,43 +47,83 @@ def __init__(self):
self.loadbalancer_repo = repo.LoadBalancerRepository()
self.member_repo = repo.MemberRepository()
self.pool_repo = repo.PoolRepository()
self.sync_prv_status = CONF.health_manager.sync_provisioning_status

def emit(self, info_type, info_id, info_obj):
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
self.event_streamer.emit(cnt)

def _update_status_and_emit_event(self, session, repo, entity_type,
entity_id, new_op_status, old_op_status,
current_prov_status):
entity_id, new_op_status, old_op_status):
message = {}
if old_op_status.lower() != new_op_status.lower():
LOG.debug("%s %s status has changed from %s to "
"%s. Updating db and sending event.",
"%s, updating db.",
entity_type, entity_id, old_op_status,
new_op_status)
repo.update(session, entity_id, operating_status=new_op_status)
# Map the status for neutron-lbaas
if new_op_status == constants.DRAINING:
new_op_status = constants.ONLINE
message.update({constants.OPERATING_STATUS: new_op_status})
if self.sync_prv_status:
LOG.debug("%s %s provisioning_status %s. "
"Sending event.",
entity_type, entity_id, current_prov_status)
message.update(
{constants.PROVISIONING_STATUS: current_prov_status})
if message:
self.emit(entity_type, entity_id, message)
if (CONF.health_manager.event_streamer_driver !=
constants.NOOP_EVENT_STREAMER):
if CONF.health_manager.sync_provisioning_status:
current_prov_status = repo.get(
session, id=entity_id).provisioning_status
LOG.debug("%s %s provisioning_status %s. "
"Sending event.",
entity_type, entity_id, current_prov_status)
message.update(
{constants.PROVISIONING_STATUS: current_prov_status})
if message:
self.emit(entity_type, entity_id, message)

def update_health(self, health, srcaddr):
# The executor will eat any exceptions from the update_health code
# so we need to wrap it and log the unhandled exception
start_time = timeit.default_timer()
try:
self._update_health(health, srcaddr)
except Exception:
LOG.exception('update_health encountered an unknown error '
'processing health message for amphora {0} with IP '
'{1}'.format(health['id'], srcaddr))
except Exception as e:
LOG.exception('Health update for amphora %(amp)s encountered '
'error %(err)s. Skipping health update.',
{'amp': health['id'], 'err': str(e)})
# TODO(johnsom) We need to set a warning threshold here
LOG.debug('Health Update finished in: {0} seconds'.format(
timeit.default_timer() - start_time))

# Health heartbeat messsage pre-versioning with UDP listeners
# need to adjust the expected listener count
# This is for backward compatibility with Rocky pre-versioning
# heartbeat amphora.
def _update_listener_count_for_UDP(self, session, lb_id,
expected_listener_count):
lb_db_obj = self.loadbalancer_repo.get(session, id=lb_id)

# For udp listener, the udp health won't send out by amp agent.
# Once the default_pool of udp listener have the first enabled
# member, then the health will be sent out. So during this
# period, need to figure out the udp listener and ignore them
# by changing expected_listener_count.
for listener in lb_db_obj.listeners:
need_remove = False
if listener.protocol == constants.PROTOCOL_UDP:
enabled_members = ([member
for member in
listener.default_pool.members
if member.enabled]
if listener.default_pool else [])
if listener.default_pool:
if not listener.default_pool.members:
need_remove = True
elif not enabled_members:
need_remove = True
else:
need_remove = True

if need_remove:
expected_listener_count = expected_listener_count - 1
return expected_listener_count

def _update_health(self, health, srcaddr):
"""This function is to update db info based on amphora status
Expand All @@ -108,43 +148,25 @@ def _update_health(self, health, srcaddr):
}
"""
start_time = timeit.default_timer()
session = db_api.get_session()

# We need to see if all of the listeners are reporting in
db_lb = self.amphora_repo.get_lb_for_amphora(session, health['id'])
db_lb = self.amphora_repo.get_lb_for_health_update(session,
health['id'])
ignore_listener_count = False
listeners = health['listeners']

if db_lb:
expected_listener_count = len(db_lb.listeners)

# For udp listener, the udp health won't send out by amp agent.
# Once the default_pool of udp listener have the first enabled
# member, then the health will be sent out. So during this period,
# need to figure out the udp listener and ignore them by changing
# expected_listener_count.
for listener in db_lb.listeners:
need_remove = False
if listener.protocol == constants.PROTOCOL_UDP:
enabled_members = ([member
for member in
listener.default_pool.members
if member.enabled]
if listener.default_pool else [])
if listener.default_pool:
if not listener.default_pool.members:
need_remove = True
elif not enabled_members:
need_remove = True
else:
need_remove = True

if need_remove:
expected_listener_count = expected_listener_count - 1

if 'PENDING' in db_lb.provisioning_status:
expected_listener_count = len(db_lb.get('listeners', {}))
if 'PENDING' in db_lb['provisioning_status']:
ignore_listener_count = True
else:

# If this is a heartbeat older than versioning, handle
# UDP special for backward compatibility.
if 'ver' not in health:
expected_listener_count = (
self._update_listener_count_for_UDP(
session, db_lb['id'], expected_listener_count))
else:
# If this is not a spare amp, log and skip it.
amp = self.amphora_repo.get(session, id=health['id'])
Expand All @@ -160,6 +182,8 @@ def _update_health(self, health, srcaddr):
return
expected_listener_count = 0

listeners = health['listeners']

# Do not update amphora health if the reporting listener count
# does not match the expected listener count
if len(listeners) == expected_listener_count or ignore_listener_count:
Expand All @@ -169,6 +193,9 @@ def _update_health(self, health, srcaddr):
# if we're running too far behind, warn and bail
proc_delay = time.time() - health['recv_time']
hb_interval = CONF.health_manager.heartbeat_interval
# TODO(johnsom) We need to set a warning threshold here, and
# escalate to critical when it reaches the
# heartbeat_interval
if proc_delay >= hb_interval:
LOG.warning('Amphora %(id)s health message was processed too '
'slowly: %(delay)ss! The system may be overloaded '
Expand Down Expand Up @@ -198,16 +225,17 @@ def _update_health(self, health, srcaddr):
return

processed_pools = []
potential_offline_pools = {}

# We got a heartbeat so lb is healthy until proven otherwise
if db_lb.enabled is False:
if db_lb['enabled'] is False:
lb_status = constants.OFFLINE
else:
lb_status = constants.ONLINE

for db_listener in db_lb.listeners:
for listener_id in db_lb.get('listeners', {}):
db_op_status = db_lb['listeners'][listener_id]['operating_status']
listener_status = None
listener_id = db_listener.id
listener = None

if listener_id not in listeners:
Expand All @@ -230,13 +258,11 @@ def _update_health(self, health, srcaddr):
'status': listener.get('status')})

try:
if listener_status is not None:
if (listener_status is not None and
listener_status != db_op_status):
self._update_status_and_emit_event(
session, self.listener_repo, constants.LISTENER,
listener_id, listener_status,
db_listener.operating_status,
db_listener.provisioning_status
)
listener_id, listener_status, db_op_status)
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Listener %s is not in DB", listener_id)

Expand All @@ -245,38 +271,52 @@ def _update_health(self, health, srcaddr):

pools = listener['pools']

# Process pools bound to listeners
for db_pool in db_listener.pools:
for db_pool_id in db_lb.get('pools', {}):
# If we saw this pool already on another listener
# skip it.
if db_pool_id in processed_pools:
continue
db_pool_dict = db_lb['pools'][db_pool_id]
lb_status = self._process_pool_status(
session, db_pool, pools, lb_status, processed_pools)
session, db_pool_id, db_pool_dict, pools,
lb_status, processed_pools, potential_offline_pools)

# Process pools bound to the load balancer
for db_pool in db_lb.pools:
# Don't re-process pools shared with listeners
if db_pool.id in processed_pools:
for pool_id in potential_offline_pools:
# Skip if we eventually found a status for this pool
if pool_id in processed_pools:
continue
lb_status = self._process_pool_status(
session, db_pool, [], lb_status, processed_pools)
try:
# If the database doesn't already show the pool offline, update
if potential_offline_pools[pool_id] != constants.OFFLINE:
self._update_status_and_emit_event(
session, self.pool_repo, constants.POOL,
pool_id, constants.OFFLINE,
potential_offline_pools[pool_id])
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Pool %s is not in DB", pool_id)

# Update the load balancer status last
try:
self._update_status_and_emit_event(
session, self.loadbalancer_repo,
constants.LOADBALANCER, db_lb.id, lb_status,
db_lb.operating_status, db_lb.provisioning_status
)
if lb_status != db_lb['operating_status']:
self._update_status_and_emit_event(
session, self.loadbalancer_repo,
constants.LOADBALANCER, db_lb['id'], lb_status,
db_lb[constants.OPERATING_STATUS])
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Load balancer %s is not in DB", db_lb.id)
LOG.debug('Health Update finished in: {0} seconds'.format(
timeit.default_timer() - start_time))

def _process_pool_status(self, session, db_pool, pools, lb_status,
processed_pools):
def _process_pool_status(
self, session, pool_id, db_pool_dict, pools, lb_status,
processed_pools, potential_offline_pools):
pool_status = None
pool_id = db_pool.id

if pool_id not in pools:
pool_status = constants.OFFLINE
# If we don't have a status update for this pool_id
# add it to the list of potential offline pools and continue.
# We will check the potential offline pool list after we
# finish processing the status updates from all of the listeners.
potential_offline_pools[pool_id] = db_pool_dict['operating_status']
return lb_status
else:
pool = pools[pool_id]

Expand All @@ -298,10 +338,10 @@ def _process_pool_status(self, session, db_pool, pools, lb_status,
# Deal with the members that are reporting from
# the Amphora
members = pool['members']
for db_member in db_pool.members:
for member_id in db_pool_dict.get('members', {}):
member_status = None
member_db_status = db_member.operating_status
member_id = db_member.id
member_db_status = (
db_pool_dict['members'][member_id]['operating_status'])

if member_id not in members:
if member_db_status != constants.NO_MONITOR:
Expand Down Expand Up @@ -334,25 +374,21 @@ def _process_pool_status(self, session, db_pool, pools, lb_status,
'status': status})

try:
if member_status is not None:
if (member_status is not None and
member_status != member_db_status):
self._update_status_and_emit_event(
session, self.member_repo,
constants.MEMBER,
member_id, member_status,
db_member.operating_status,
db_member.provisioning_status
)
session, self.member_repo, constants.MEMBER,
member_id, member_status, member_db_status)
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Member %s is not able to update "
"in DB", member_id)

try:
if pool_status is not None:
if (pool_status is not None and
pool_status != db_pool_dict['operating_status']):
self._update_status_and_emit_event(
session, self.pool_repo, constants.POOL,
pool_id, pool_status, db_pool.operating_status,
db_pool.provisioning_status
)
pool_id, pool_status, db_pool_dict['operating_status'])
except sqlalchemy.orm.exc.NoResultFound:
LOG.error("Pool %s is not in DB", pool_id)

Expand Down

0 comments on commit 8c54a22

Please sign in to comment.