Skip to content

Commit

Permalink
Allow DB retries on controller_worker creates
Browse files Browse the repository at this point in the history
Temporary workaround for commit ordering issues with the amphora driver,
until the driver can be rewritten against the real driver interface.

Story: 2002127
Task: 19809
Change-Id: Idfaca392b278a6efad36e51adaedc6c80372a006
  • Loading branch information
rm-you committed Jun 1, 2018
1 parent 190f682 commit 48e8556
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 12 deletions.
4 changes: 0 additions & 4 deletions octavia/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,6 @@ class GlanceNoTaggedImages(OctaviaException):
message = _("No Glance images are tagged with %(tag)s tag.")


class NoSuitableAmphoraException(OctaviaException):
message = _('Unable to allocate an amphora due to: %(msg)s')


# This is an internal use exception for the taskflow work flow
# and will not be exposed to the customer. This means it is a
# normal part of operation while waiting for compute to go active
Expand Down
87 changes: 80 additions & 7 deletions octavia/controller/worker/controller_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

from oslo_config import cfg
from oslo_utils import excutils
from sqlalchemy.orm import exc as db_exceptions
from taskflow.listeners import logging as tf_logging
import tenacity

from octavia.common import base_taskflow
from octavia.common import constants
Expand All @@ -35,6 +37,11 @@
CONF = cfg.CONF
LOG = logging.getLogger(__name__)

RETRY_ATTEMPTS = 15
RETRY_INITIAL_DELAY = 1
RETRY_BACKOFF = 1
RETRY_MAX = 5


class ControllerWorker(base_taskflow.BaseTaskFlowEngine):

Expand Down Expand Up @@ -109,15 +116,24 @@ def delete_amphora(self, amphora_id):
log=LOG):
delete_amp_tf.run()

@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_health_monitor(self, health_monitor_id):
"""Creates a health monitor.
:param pool_id: ID of the pool to create a health monitor on
:returns: None
:raises NoSuitablePool: Unable to find the node pool
:raises NoResultFound: Unable to find the object
"""
health_mon = self._health_mon_repo.get(db_apis.get_session(),
id=health_monitor_id)
if not health_mon:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'health_monitor', health_monitor_id)
raise db_exceptions.NoResultFound

pool = health_mon.pool
listeners = pool.listeners
Expand Down Expand Up @@ -185,15 +201,25 @@ def update_health_monitor(self, health_monitor_id, health_monitor_updates):
log=LOG):
update_hm_tf.run()

@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_listener(self, listener_id):
"""Creates a listener.
:param listener_id: ID of the listener to create
:returns: None
:raises NoSuitableLB: Unable to find the load balancer
:raises NoResultFound: Unable to find the object
"""
listener = self._listener_repo.get(db_apis.get_session(),
id=listener_id)
if not listener:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'listener', listener_id)
raise db_exceptions.NoResultFound

load_balancer = listener.load_balancer

create_listener_tf = self._taskflow_load(self._listener_flows.
Expand Down Expand Up @@ -251,6 +277,11 @@ def update_listener(self, listener_id, listener_updates):
with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG):
update_listener_tf.run()

@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_load_balancer(self, load_balancer_id):
"""Creates a load balancer by allocating Amphorae.
Expand All @@ -260,8 +291,13 @@ def create_load_balancer(self, load_balancer_id):
:param load_balancer_id: ID of the load balancer to create
:returns: None
:raises NoSuitableAmphoraException: Unable to allocate an Amphora.
:raises NoResultFound: Unable to find the object
"""
lb = self._lb_repo.get(db_apis.get_session(), id=load_balancer_id)
if not lb:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'load_balancer', load_balancer_id)
raise db_exceptions.NoResultFound

store = {constants.LOADBALANCER_ID: load_balancer_id,
constants.BUILD_TYPE_PRIORITY:
Expand All @@ -273,7 +309,6 @@ def create_load_balancer(self, load_balancer_id):
constants.LOADBALANCER_TOPOLOGY: topology
}

lb = self._lb_repo.get(db_apis.get_session(), id=load_balancer_id)
create_lb_flow = self._lb_flows.get_create_load_balancer_flow(
topology=topology, listeners=lb.listeners)

Expand Down Expand Up @@ -330,6 +365,11 @@ def update_load_balancer(self, load_balancer_id, load_balancer_updates):
log=LOG):
update_lb_tf.run()

@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_member(self, member_id):
"""Creates a pool member.
Expand All @@ -339,6 +379,11 @@ def create_member(self, member_id):
"""
member = self._member_repo.get(db_apis.get_session(),
id=member_id)
if not member:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'member', member_id)
raise db_exceptions.NoResultFound

pool = member.pool
listeners = pool.listeners
load_balancer = pool.load_balancer
Expand Down Expand Up @@ -434,15 +479,24 @@ def update_member(self, member_id, member_updates):
log=LOG):
update_member_tf.run()

@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_pool(self, pool_id):
"""Creates a node pool.
:param pool_id: ID of the pool to create
:returns: None
:raises NoSuitableLB: Unable to find the load balancer
:raises NoResultFound: Unable to find the object
"""
pool = self._pool_repo.get(db_apis.get_session(),
id=pool_id)
if not pool:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'pool', pool_id)
raise db_exceptions.NoResultFound

listeners = pool.listeners
load_balancer = pool.load_balancer
Expand Down Expand Up @@ -506,15 +560,24 @@ def update_pool(self, pool_id, pool_updates):
log=LOG):
update_pool_tf.run()

@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_l7policy(self, l7policy_id):
"""Creates an L7 Policy.
:param l7policy_id: ID of the l7policy to create
:returns: None
:raises NoSuitableLB: Unable to find the load balancer
:raises NoResultFound: Unable to find the object
"""
l7policy = self._l7policy_repo.get(db_apis.get_session(),
id=l7policy_id)
if not l7policy:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'l7policy', l7policy_id)
raise db_exceptions.NoResultFound

listeners = [l7policy.listener]
load_balancer = l7policy.listener.load_balancer
Expand Down Expand Up @@ -574,15 +637,25 @@ def update_l7policy(self, l7policy_id, l7policy_updates):
log=LOG):
update_l7policy_tf.run()

@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_l7rule(self, l7rule_id):
"""Creates an L7 Rule.
:param l7rule_id: ID of the l7rule to create
:returns: None
:raises NoSuitableLB: Unable to find the load balancer
:raises NoResultFound: Unable to find the object
"""
l7rule = self._l7rule_repo.get(db_apis.get_session(),
id=l7rule_id)
if not l7rule:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'l7rule', l7rule_id)
raise db_exceptions.NoResultFound

l7policy = l7rule.l7policy
listeners = [l7policy.listener]
load_balancer = l7policy.listener.load_balancer
Expand Down
17 changes: 16 additions & 1 deletion octavia/tests/unit/controller/worker/test_controller_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def test_create_health_monitor(self,
mock_amp_repo_get):

_flow_mock.reset_mock()
mock_health_mon_repo_get.side_effect = [None, _health_mon_mock]

cw = controller_worker.ControllerWorker()
cw.create_health_monitor(_health_mon_mock)
Expand All @@ -206,6 +207,7 @@ def test_create_health_monitor(self,
_pool_mock}))

_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_health_mon_repo_get.call_count)

@mock.patch('octavia.controller.worker.flows.'
'health_monitor_flows.HealthMonitorFlows.'
Expand Down Expand Up @@ -300,6 +302,7 @@ def test_create_listener(self,
mock_amp_repo_get):

_flow_mock.reset_mock()
mock_listener_repo_get.side_effect = [None, _listener_mock]

cw = controller_worker.ControllerWorker()
cw.create_listener(LB_ID)
Expand All @@ -312,6 +315,7 @@ def test_create_listener(self,
[_listener_mock]}))

_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_listener_repo_get.call_count)

@mock.patch('octavia.controller.worker.flows.'
'listener_flows.ListenerFlows.get_delete_listener_flow',
Expand Down Expand Up @@ -406,7 +410,9 @@ def test_create_load_balancer_single(
'update_dict': {'topology': constants.TOPOLOGY_SINGLE},
constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY
}
setattr(mock_lb_repo_get.return_value, 'listeners', [])
lb_mock = mock.MagicMock()
lb_mock.listeners = []
mock_lb_repo_get.side_effect = [None, None, None, lb_mock]

cw = controller_worker.ControllerWorker()
cw.create_load_balancer(LB_ID)
Expand All @@ -416,6 +422,7 @@ def test_create_load_balancer_single(
mock_taskflow_load.assert_called_with(
mock_get_create_load_balancer_flow.return_value, store=store)
mock_eng.run.assert_any_call()
self.assertEqual(4, mock_lb_repo_get.call_count)

@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
'LoadBalancerFlows.get_create_load_balancer_flow',
Expand Down Expand Up @@ -687,6 +694,7 @@ def test_create_member(self,
mock_amp_repo_get):

_flow_mock.reset_mock()
mock_member_repo_get.side_effect = [None, _member_mock]

cw = controller_worker.ControllerWorker()
cw.create_member(MEMBER_ID)
Expand All @@ -702,6 +710,7 @@ def test_create_member(self,
_pool_mock}))

_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_member_repo_get.call_count)

@mock.patch('octavia.controller.worker.flows.'
'member_flows.MemberFlows.get_delete_member_flow',
Expand Down Expand Up @@ -823,6 +832,7 @@ def test_create_pool(self,
mock_amp_repo_get):

_flow_mock.reset_mock()
mock_pool_repo_get.side_effect = [None, _pool_mock]

cw = controller_worker.ControllerWorker()
cw.create_pool(POOL_ID)
Expand All @@ -836,6 +846,7 @@ def test_create_pool(self,
_load_balancer_mock}))

_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_pool_repo_get.call_count)

@mock.patch('octavia.controller.worker.flows.'
'pool_flows.PoolFlows.get_delete_pool_flow',
Expand Down Expand Up @@ -921,6 +932,7 @@ def test_create_l7policy(self,
mock_amp_repo_get):

_flow_mock.reset_mock()
mock_l7policy_repo_get.side_effect = [None, _l7policy_mock]

cw = controller_worker.ControllerWorker()
cw.create_l7policy(L7POLICY_ID)
Expand All @@ -934,6 +946,7 @@ def test_create_l7policy(self,
_load_balancer_mock}))

_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_l7policy_repo_get.call_count)

@mock.patch('octavia.controller.worker.flows.'
'l7policy_flows.L7PolicyFlows.get_delete_l7policy_flow',
Expand Down Expand Up @@ -1019,6 +1032,7 @@ def test_create_l7rule(self,
mock_amp_repo_get):

_flow_mock.reset_mock()
mock_l7rule_repo_get.side_effect = [None, _l7rule_mock]

cw = controller_worker.ControllerWorker()
cw.create_l7rule(L7RULE_ID)
Expand All @@ -1033,6 +1047,7 @@ def test_create_l7rule(self,
_load_balancer_mock}))

_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_l7rule_repo_get.call_count)

@mock.patch('octavia.controller.worker.flows.'
'l7rule_flows.L7RuleFlows.get_delete_l7rule_flow',
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ taskflow>=2.16.0 # Apache-2.0
diskimage-builder!=1.6.0,!=1.7.0,!=1.7.1,>=1.1.2 # Apache-2.0
futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # BSD
castellan>=0.16.0 # Apache-2.0
tenacity # Apache-2.0

#for the amphora api
Flask!=0.11,<1.0,>=0.10 # BSD
Expand Down

0 comments on commit 48e8556

Please sign in to comment.