Skip to content

Commit

Permalink
Ensure Pool Manager Cache is Cleared
Browse files Browse the repository at this point in the history
When actions (create, delete, update) are completed for a domain, remove
the statuses from the pool manager cache.

Change-Id: Icb452e3c83dca5bbcbf38ab44394b61a26867a3b
Closes-Bug: #1408202
  • Loading branch information
rjrjr committed Jan 22, 2015
1 parent 8f59a26 commit 565e502
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 165 deletions.
237 changes: 132 additions & 105 deletions designate/pool_manager/service.py
Expand Up @@ -19,6 +19,7 @@
from oslo.config import cfg
from oslo import messaging
from oslo_log import log as logging
from oslo_concurrency import lockutils

from designate import backend
from designate import exceptions
Expand All @@ -40,6 +41,7 @@
CREATE_ACTION = 'CREATE'
DELETE_ACTION = 'DELETE'
UPDATE_ACTION = 'UPDATE'
MAXIMUM_THRESHOLD = 100


@contextmanager
Expand Down Expand Up @@ -161,7 +163,7 @@ def create_domain(self, context, domain):

for server_backend in self.server_backends:
server = server_backend['server']
create_status = self._create_create_status(server, domain)
create_status = self._build_create_status(server, domain)
self._create_domain_on_server(
context, create_status, domain, server_backend)

Expand All @@ -175,11 +177,12 @@ def delete_domain(self, context, domain):

for server_backend in self.server_backends:
server = server_backend['server']
delete_status = self._create_delete_status(server, domain)
delete_status = self._build_delete_status(server, domain)
self._delete_domain_on_server(
context, delete_status, domain, server_backend)

if not self._is_delete_consensus(context, domain):
if self._is_in_cache(context, domain, DELETE_ACTION) \
and not self._is_delete_consensus(context, domain):
status = ERROR_STATUS
LOG.warn(_LW('Consensus not reached '
'for deleting domain %(domain)s') %
Expand Down Expand Up @@ -211,40 +214,49 @@ def update_status(self, context, domain, server,
"""
LOG.debug("Calling update_status for %s" % domain.name)

update_status = self._retrieve_from_cache(
context, server, domain, UPDATE_ACTION)
cache_serial = update_status.serial_number

LOG.debug('For domain %s on server %s the cache serial is %s '
'and the actual serial is %s.' %
(domain.name, self._get_destination(server),
cache_serial, actual_serial))
if actual_serial and cache_serial < actual_serial:
update_status.status = status
update_status.serial_number = actual_serial
self._store_in_cache(context, update_status)

consensus_serial = self._get_consensus_serial(context, domain)

if cache_serial < consensus_serial:
LOG.info(_LI('For domain %(domain)s '
'the consensus serial is %(consensus_serial)s.') %
{'domain': domain.name,
'consensus_serial': consensus_serial})

self.central_api.update_status(
context, domain.id, SUCCESS_STATUS, consensus_serial)

if status == ERROR_STATUS:
error_serial = self._get_error_serial(
context, domain, consensus_serial)
if error_serial > consensus_serial or error_serial == 0:
LOG.warn(_LW('For domain %(domain)s '
'the error serial is %(error_serial)s.') %
with lockutils.lock('update-status-%s' % domain.id):
try:
update_status = self._retrieve_one_from_cache(
context, server, domain, UPDATE_ACTION)
except exceptions.PoolManagerStatusNotFound:
update_status = self._build_update_status(server, domain)
self._store_in_cache(context, update_status)
cache_serial = update_status.serial_number

LOG.debug('For domain %s on server %s the cache serial is %s '
'and the actual serial is %s.' %
(domain.name, self._get_destination(server),
cache_serial, actual_serial))
if actual_serial and cache_serial < actual_serial:
update_status.status = status
update_status.serial_number = actual_serial
self._store_in_cache(context, update_status)

consensus_serial = self._get_consensus_serial(context, domain)

if cache_serial < consensus_serial:
LOG.info(_LI('For domain %(domain)s '
'the consensus serial is %(consensus_serial)s.') %
{'domain': domain.name,
'error_serial': error_serial})
'consensus_serial': consensus_serial})
self.central_api.update_status(
context, domain.id, ERROR_STATUS, error_serial)
context, domain.id, SUCCESS_STATUS, consensus_serial)

if status == ERROR_STATUS:
error_serial = self._get_error_serial(
context, domain, consensus_serial)
if error_serial > consensus_serial or error_serial == 0:
LOG.warn(_LW('For domain %(domain)s '
'the error serial is %(error_serial)s.') %
{'domain': domain.name,
'error_serial': error_serial})
self.central_api.update_status(
context, domain.id, ERROR_STATUS, error_serial)

if consensus_serial == domain.serial \
and self._is_update_consensus(context, domain,
MAXIMUM_THRESHOLD):
self._clear_cache(context, domain, UPDATE_ACTION)

def periodic_recovery(self):
"""
Expand All @@ -255,8 +267,8 @@ def periodic_recovery(self):
context = DesignateContext.get_admin_context(all_tenants=True)

try:
self._periodic_create_domains_that_failed(context)
self._periodic_delete_domains_that_failed(context)
self._periodic_create_domains_that_failed(context)
self._periodic_update_domains_that_failed(context)
except Exception:
LOG.exception(_LE('An unhandled exception in periodic recovery '
Expand Down Expand Up @@ -301,18 +313,13 @@ def _create_domain_on_server(self, context, create_status, domain,
{'domain': domain.name,
'server': self._get_destination(server)})

update_status = self._create_update_status(server, domain)
update_status.serial_number = 0
# Setting the update status to ERROR ensures the periodic
# recovery is run if there is a problem.
update_status.status = ERROR_STATUS
self._store_in_cache(context, update_status)
if self._is_create_consensus(context, domain, MAXIMUM_THRESHOLD):
self._clear_cache(context, domain, CREATE_ACTION)

# PowerDNS needs to explicitly send a NOTIFY for the AXFR to
# happen whereas BIND9 does an AXFR implicitly after the domain
# is created. Sending a NOTIFY for all cases.
self._notify_zone_changed(context, domain, server)
self._poll_for_serial_number(context, domain, server)
self._update_domain_on_server(context, domain, server_backend)
except exceptions.Backend:
create_status.status = ERROR_STATUS
self._store_in_cache(context, create_status)
Expand All @@ -323,8 +330,8 @@ def _create_domain_on_server(self, context, create_status, domain,

def _periodic_create_domains_that_failed(self, context):

create_statuses = self._find_pool_manager_statuses(
context, CREATE_ACTION, status=ERROR_STATUS)
create_statuses = self._retrieve_from_cache(
context, action=CREATE_ACTION, status=ERROR_STATUS)

for create_status in create_statuses:
domain = self.central_api.get_domain(
Expand All @@ -349,13 +356,18 @@ def _delete_domain_on_server(self, context, delete_status, domain,
'from server %(server)s.') %
{'domain': domain.name,
'server': self._get_destination(server)})

if not consensus_existed \
and self._is_delete_consensus(context, domain):
LOG.info(_LI('Consensus reached '
'for deleting domain %(domain)s') %
{'domain': domain.name})
self.central_api.update_status(
context, domain.id, SUCCESS_STATUS, domain.serial)

if self._is_delete_consensus(context, domain,
MAXIMUM_THRESHOLD):
self._clear_cache(context, domain)
except exceptions.Backend:
delete_status.status = ERROR_STATUS
self._store_in_cache(context, delete_status)
Expand All @@ -366,8 +378,8 @@ def _delete_domain_on_server(self, context, delete_status, domain,

def _periodic_delete_domains_that_failed(self, context):

delete_statuses = self._find_pool_manager_statuses(
context, DELETE_ACTION, status=ERROR_STATUS)
delete_statuses = self._retrieve_from_cache(
context, action=DELETE_ACTION, status=ERROR_STATUS)

# Used to retrieve a domain from Central that may have already been
# "deleted".
Expand All @@ -384,30 +396,17 @@ def _update_domain_on_server(self, context, domain, server_backend):

server = server_backend['server']

try:
update_status = self._retrieve_from_cache(
context, server, domain, UPDATE_ACTION)
if update_status.status == ERROR_STATUS \
or update_status.serial_number < domain.serial:
self._notify_zone_changed(context, domain, server)
self._poll_for_serial_number(context, domain, server)
LOG.info(_LI('Updating domain %(domain)s '
'on server %(server)s.') %
{'domain': domain.name,
'server': self._get_destination(server)})
else:
# TODO(Ron): Do not log this warning on a periodic_sync.
LOG.warn(_LW('No need to update domain %(domain)s '
'on server %(server)s.') %
{'domain': domain.name,
'server': self._get_destination(server)})
except exceptions.PoolManagerStatusNotFound:
pass
self._notify_zone_changed(context, domain, server)
self._poll_for_serial_number(context, domain, server)
LOG.info(_LI('Updating domain %(domain)s '
'on server %(server)s.') %
{'domain': domain.name,
'server': self._get_destination(server)})

def _periodic_update_domains_that_failed(self, context):

update_statuses = self._find_pool_manager_statuses(
context, UPDATE_ACTION, status=ERROR_STATUS)
update_statuses = self._retrieve_from_cache(
context, action=UPDATE_ACTION, status=ERROR_STATUS)

for update_status in update_statuses:
domain = self.central_api.get_domain(
Expand Down Expand Up @@ -439,22 +438,9 @@ def _get_destination(server):
def _percentage(count, total_count):
return (Decimal(count) / Decimal(total_count)) * Decimal(100)

def _exceed_or_meet_threshold(self, count):
def _exceed_or_meet_threshold(self, count, threshold):
return self._percentage(
count, len(self.server_backends)) >= Decimal(self.threshold)

def _find_pool_manager_statuses(self, context, action,
domain=None, status=None):
criterion = {
'action': action
}
if domain:
criterion['domain_id'] = domain.id
if status:
criterion['status'] = status

return self.cache.find_pool_manager_statuses(
context, criterion=criterion)
count, len(self.server_backends)) >= Decimal(threshold)

@staticmethod
def _get_sorted_serials(pool_manager_statuses, descending=False):
Expand All @@ -470,64 +456,105 @@ def _get_serials_ascending(self, pool_manager_statuses):
def _get_serials_descending(self, pool_manager_statuses):
return self._get_sorted_serials(pool_manager_statuses, descending=True)

def _is_success_consensus(self, context, domain, action):
def _is_success_consensus(self, context, domain, action, threshold=None):
success_count = 0
pool_manager_statuses = self._find_pool_manager_statuses(
context, action, domain=domain)
pool_manager_statuses = self._retrieve_from_cache(
context, domain=domain, action=action)
for pool_manager_status in pool_manager_statuses:
if pool_manager_status.status == SUCCESS_STATUS:
success_count += 1
return self._exceed_or_meet_threshold(success_count)
if threshold is None:
threshold = self.threshold
return self._exceed_or_meet_threshold(success_count, threshold)

def _is_create_consensus(self, context, domain, threshold=None):
return self._is_success_consensus(
context, domain, CREATE_ACTION, threshold)

def _is_delete_consensus(self, context, domain):
return self._is_success_consensus(context, domain, DELETE_ACTION)
def _is_delete_consensus(self, context, domain, threshold=None):
return self._is_success_consensus(
context, domain, DELETE_ACTION, threshold)

def _is_update_consensus(self, context, domain, threshold=None):
return self._is_success_consensus(
context, domain, UPDATE_ACTION, threshold)

def _get_consensus_serial(self, context, domain):
consensus_serial = 0
update_statuses = self._find_pool_manager_statuses(
context, UPDATE_ACTION, domain=domain)
update_statuses = self._retrieve_from_cache(
context, domain=domain, action=UPDATE_ACTION)
for serial in self._get_serials_descending(update_statuses):
serial_count = 0
for update_status in update_statuses:
if update_status.serial_number >= serial:
serial_count += 1
if self._exceed_or_meet_threshold(serial_count):
if self._exceed_or_meet_threshold(serial_count, self.threshold):
consensus_serial = serial
break
return consensus_serial

def _get_error_serial(self, context, domain, consensus_serial):
error_serial = 0
if not self._is_success_consensus(context, domain, UPDATE_ACTION):
update_statuses = self._find_pool_manager_statuses(
context, UPDATE_ACTION, domain=domain)
update_statuses = self._retrieve_from_cache(
context, domain=domain, action=UPDATE_ACTION)
for serial in self._get_serials_ascending(update_statuses):
if serial > consensus_serial:
error_serial = serial
break
return error_serial

@staticmethod
def _create_pool_manager_status(server, domain, action):
def _build_status_object(server, domain, action, serial_number,
status=None):
values = {
'server_id': server.id,
'domain_id': domain.id,
'status': None,
'serial_number': domain.serial,
'status': status,
'serial_number': serial_number,
'action': action
}
return objects.PoolManagerStatus(**values)

def _create_create_status(self, server, domain):
return self._create_pool_manager_status(server, domain, CREATE_ACTION)
def _build_create_status(self, server, domain):
return self._build_status_object(
server, domain, CREATE_ACTION, domain.serial)

def _build_delete_status(self, server, domain):
return self._build_status_object(
server, domain, DELETE_ACTION, domain.serial)

def _create_delete_status(self, server, domain):
return self._create_pool_manager_status(server, domain, DELETE_ACTION)
def _build_update_status(self, server, domain):
# Setting the update status to ERROR ensures the periodic
# recovery is run if there is a problem.
return self._build_status_object(
server, domain, UPDATE_ACTION, 0, status='ERROR')

def _create_update_status(self, server, domain):
return self._create_pool_manager_status(server, domain, UPDATE_ACTION)
# Methods for manipulating the cache.
def _clear_cache(self, context, domain, action=None):
pool_manager_statuses = self._retrieve_from_cache(
context, domain=domain, action=action)
for pool_manager_status in pool_manager_statuses:
self.cache.delete_pool_manager_status(
context, pool_manager_status.id)

def _is_in_cache(self, context, domain, action):
return len(self._retrieve_from_cache(
context, domain=domain, action=action)) > 0

def _retrieve_from_cache(self, context,
domain=None, action=None, status=None):
criterion = {}
if domain:
criterion['domain_id'] = domain.id
if action:
criterion['action'] = action
if status:
criterion['status'] = status
return self.cache.find_pool_manager_statuses(
context, criterion=criterion)

def _retrieve_from_cache(self, context, server, domain, action):
def _retrieve_one_from_cache(self, context, server, domain, action):
criterion = {
'server_id': server.id,
'domain_id': domain.id,
Expand Down

0 comments on commit 565e502

Please sign in to comment.