Skip to content

Commit

Permalink
Lifecycle hook implementation
Browse files Browse the repository at this point in the history
* Added lifecycle hook properties to deletion policy
* Check if deletion policy has lifecycle hook properties during
deletion and perform lifecycle specific actions if it does
* Added complete lifecycle API endpoint
* Bumped microversion to 1.9
* Added unit-test cases for lifecycle hooks
* Added lifecycle hook notification
* Added lifecycle hook properties to deletion policy
* Modified node_action to process policy preop if it was a
derived action with lifecycle hook properties
* Modified LB policy to defer deletion of node until node_deletion
action is executed if lifecycle hook properties are defined in
deletion policy
* Added zaqar test driver
* Added documentation for lifecycle hook

Depends-On: Id7804abc351c063563b58fb69a66074d0d854ecf

Change-Id: I888a01c4f26959649121d6f82430017858a4c481
  • Loading branch information
dkt26111 committed Jan 30, 2018
1 parent a783494 commit 3dd9f3e
Show file tree
Hide file tree
Showing 34 changed files with 1,092 additions and 43 deletions.
4 changes: 4 additions & 0 deletions doc/source/contributor/action.rst
Expand Up @@ -190,6 +190,10 @@ An action can be in one of the following statuses during its lifetime:
execution;
- ``WAITING``: Action object has dependencies on other actions, it may
become ``READY`` only when the dependents are all completed with successes;
- ``WAITING_LIFECYCLE_COMPLETION``: Action object is a node deletion that is
awaiting lifecycle completion. It will become ``READY`` when complete
lifecycle API is called or the lifecycle hook timeout in deletion policy is
reached.
- ``RUNNING``: Action object is being executed by a worker thread;
- ``SUSPENDED``: Action object is suspended during execution, so the only way
to put it back to ``RUNNING`` status is to send it a ``RESUME`` signal;
Expand Down
12 changes: 12 additions & 0 deletions doc/source/contributor/cluster.rst
Expand Up @@ -376,6 +376,18 @@ the ``CLUSTER_DEL_NODES`` action will save the list of UUIDs of the deleted
nodes into the action's ``data`` field so that those policies could update the
associated resources.

If a deletion policy with hooks property is attached to the cluster, the
``CLUSTER_DEL_NODES`` action will create the ``CLUSTER_DEL_NODES`` actions
in ``WAITING_LIFECYCLE_COMPLETION`` status which does not execute them. It
also sends the lifecycle hook message to the target specified in the
deletion policy. If the complete lifecylcle API is called for a
``CLUSTER_DEL_NODES`` action, it will be executed. If all the
``CLUSTER_DEL_NODES`` actions are not executed before the hook timeout
specified in the deletion policy is reached, the remaining
``CLUSTER_DEL_NODES`` actions are moved into ``READY`` status and scheduled
for execution. When all actions complete, the ``CLUSTER_DEL_NODES``
returns with a success.

Note also that by default Senlin won't destroy the nodes that are deleted
from the cluster. It simply removes the nodes from the cluster so that they
become orphan nodes.
Expand Down
37 changes: 36 additions & 1 deletion doc/source/user/policy_types/deletion.rst
Expand Up @@ -32,7 +32,7 @@ Below is a typical spec for a deletion policy:
.. code-block:: yaml
type: senlin.policy.deletion
version: 1.0
version: 1.1
properties:
criteria: OLDEST_FIRST
destroy_after_deletion: false
Expand Down Expand Up @@ -121,6 +121,41 @@ in the policy spec. The cluster's desired capacity won't be changed after
cluster membership is modified.


Lifecycle Hook
~~~~~~~~~~~~~~

If there is a need to receive notification of a node deletion, you can
specify a lifecycle hook in the deletion policy:

.. code-block:: yaml
type: senlin.policy.deletion
version: 1.1
properties:
hooks:
type: 'zaqar'
timeout: 120
params:
queue: 'my_queue'
The valid values for the "``type`` are:

- ``zaqar``: send message to zaqar queue. The name of the zaqar must be
specified in ``queue`` property.

- ``webhook``: send message to webhook URL. The URL of the webhook must be
specified in ``url`` property.

``timeout`` property specifies the number of seconds to wait before the
actual node deletion happens. This timeout can be preempted by calling
complete lifecycle hook API.

.. NOTE::

Hooks of type ``webhook`` will be supported in a future version. Currently
only hooks of type ``zaqar`` are supported.


Deleting Nodes Across Regions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
14 changes: 14 additions & 0 deletions examples/policies/deletion_policy_lifecycle_hook.yaml
@@ -0,0 +1,14 @@
# Sample deletion policy that can be attached to a cluster.
type: senlin.policy.deletion
version: 1.1
description: A policy for choosing victim node(s) from a cluster for deletion.
properties:
hooks:
# type of lifecycle hook
type: zaqar
params:
# Name of zaqar queue to receive lifecycle hook message
queue: zaqar_queue_name
# Length in number of seconds before the actual deletion happens
timeout: 180

7 changes: 7 additions & 0 deletions senlin/api/openstack/history.rst 100644 → 100755
Expand Up @@ -98,3 +98,10 @@ it can be used by both users and developers.
---
- Added ``force`` parameter to cluster delete request.
- Added ``force`` parameter to node delete request.

1.9
---
- Added ``cluster_complete_lifecycle`` API. This API enables users to
trigger the immediate deletion of the nodes identified for deferred
deletion during scale-in operation.

15 changes: 13 additions & 2 deletions senlin/api/openstack/v1/clusters.py 100644 → 100755
Expand Up @@ -33,11 +33,11 @@ class ClusterController(wsgi.Controller):
SUPPORTED_ACTIONS = (
ADD_NODES, DEL_NODES, SCALE_OUT, SCALE_IN, RESIZE,
POLICY_ATTACH, POLICY_DETACH, POLICY_UPDATE,
CHECK, RECOVER, REPLACE_NODES
CHECK, RECOVER, REPLACE_NODES, COMPLETE_LIFECYCLE
) = (
'add_nodes', 'del_nodes', 'scale_out', 'scale_in', 'resize',
'policy_attach', 'policy_detach', 'policy_update',
'check', 'recover', 'replace_nodes'
'check', 'recover', 'replace_nodes', 'complete_lifecycle'
)

@util.policy_enforce
Expand Down Expand Up @@ -228,6 +228,17 @@ def _do_recover(self, req, cid, data):
obj = util.parse_request('ClusterRecoverRequest', req, params)
return self.rpc_client.call(req.context, 'cluster_recover', obj)

@wsgi.Controller.api_version('1.9')
def _do_complete_lifecycle(self, req, cid, data):
lifecycle_action_token = data.get('lifecycle_action_token', None)

params = {'identity': cid,
'lifecycle_action_token': lifecycle_action_token}
obj = util.parse_request('ClusterCompleteLifecycleRequest', req,
params)
return self.rpc_client.call(req.context, 'cluster_complete_lifecycle',
obj)

@util.policy_enforce
def action(self, req, cluster_id, body=None):
"""Perform specified action on a cluster."""
Expand Down
2 changes: 1 addition & 1 deletion senlin/api/openstack/v1/version.py 100644 → 100755
Expand Up @@ -24,7 +24,7 @@ class VersionController(object):
# This includes any semantic changes which may not affect the input or
# output formats or even originate in the API code layer.
_MIN_API_VERSION = "1.0"
_MAX_API_VERSION = "1.8"
_MAX_API_VERSION = "1.9"

DEFAULT_API_VERSION = _MIN_API_VERSION

Expand Down
13 changes: 13 additions & 0 deletions senlin/common/config.py 100644 → 100755
Expand Up @@ -193,6 +193,19 @@
cfg.CONF.register_group(receiver_group)
cfg.CONF.register_opts(receiver_opts, group=receiver_group)

# Notification group
notification_group = cfg.OptGroup('notification')
notification_opts = [
cfg.IntOpt('max_message_size', default=65535,
help=_('The max size(bytes) of message can be posted to '
'notification queue.')),
cfg.IntOpt('ttl', default=300,
help=_('The ttl in seconds of a message posted to '
'notification queue.'))
]
cfg.CONF.register_group(notification_group)
cfg.CONF.register_opts(notification_opts, group=notification_group)

# Zaqar group
zaqar_group = cfg.OptGroup(
'zaqar', title='Zaqar Options',
Expand Down
14 changes: 11 additions & 3 deletions senlin/common/consts.py 100644 → 100755
Expand Up @@ -41,10 +41,11 @@
)

ACTION_CAUSES = (
CAUSE_RPC, CAUSE_DERIVED,
CAUSE_RPC, CAUSE_DERIVED, CAUSE_DERIVED_LCH
) = (
'RPC Request',
'Derived Action',
'Derived Action with Lifecycle Hook'
)

CLUSTER_ACTION_NAMES = (
Expand Down Expand Up @@ -257,10 +258,11 @@
# senlin.engine.actions.base module.
ACTION_STATUSES = (
ACTION_INIT, ACTION_WAITING, ACTION_READY, ACTION_RUNNING,
ACTION_SUCCEEDED, ACTION_FAILED, ACTION_CANCELLED
ACTION_SUCCEEDED, ACTION_FAILED, ACTION_CANCELLED,
ACTION_WAITING_LIFECYCLE_COMPLETION,
) = (
'INIT', 'WAITING', 'READY', 'RUNNING',
'SUCCEEDED', 'FAILED', 'CANCELLED',
'SUCCEEDED', 'FAILED', 'CANCELLED', 'WAITING_LIFECYCLE_COMPLETION'
)

EVENT_LEVELS = {
Expand Down Expand Up @@ -297,3 +299,9 @@
) = (
'start', 'end', 'error',
)

LIFECYCLE_TRANSITION_TYPE = (
LIFECYCLE_NODE_TERMINATION,
) = (
'termination',
)
4 changes: 4 additions & 0 deletions senlin/db/api.py 100644 → 100755
Expand Up @@ -368,6 +368,10 @@ def action_mark_succeeded(context, action_id, timestamp):
return IMPL.action_mark_succeeded(context, action_id, timestamp)


def action_mark_ready(context, action_id, timestamp):
return IMPL.action_mark_ready(context, action_id, timestamp)


def action_mark_failed(context, action_id, timestamp, reason=None):
return IMPL.action_mark_failed(context, action_id, timestamp, reason)

Expand Down
15 changes: 15 additions & 0 deletions senlin/db/sqlalchemy/api.py 100644 → 100755
Expand Up @@ -1115,6 +1115,21 @@ def action_mark_succeeded(context, action_id, timestamp):
subquery.delete(synchronize_session='fetch')


@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True,
retry_interval=0.5, inc_retry_interval=True)
def action_mark_ready(context, action_id, timestamp):
with session_for_write() as session:

query = session.query(models.Action).filter_by(id=action_id)
values = {
'owner': None,
'status': consts.ACTION_READY,
'status_reason': 'Lifecycle timeout.',
'end_time': timestamp,
}
query.update(values, synchronize_session=False)


def _mark_failed(session, action_id, timestamp, reason=None):
# mark myself as failed
query = session.query(models.Action).filter_by(id=action_id)
Expand Down
14 changes: 14 additions & 0 deletions senlin/drivers/openstack/zaqar_v2.py 100644 → 100755
Expand Up @@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.

from openstack import exceptions as sdk_exc

from senlin.drivers import base
from senlin.drivers.openstack import sdk

Expand All @@ -26,6 +28,14 @@ def __init__(self, params):
def queue_create(self, **attrs):
return self.conn.message.create_queue(**attrs)

@sdk.translate_exception
def queue_exists(self, queue_name):
try:
self.conn.message.get_queue(queue_name)
return True
except sdk_exc.ResourceNotFound:
return False

@sdk.translate_exception
def queue_delete(self, queue, ignore_missing=True):
return self.conn.message.delete_queue(queue, ignore_missing)
Expand Down Expand Up @@ -54,3 +64,7 @@ def message_delete(self, queue_name, message, claim_id=None,
ignore_missing=True):
return self.conn.message.delete_message(queue_name, message,
claim_id, ignore_missing)

@sdk.translate_exception
def message_post(self, queue_name, message):
return self.conn.message.post_message(queue_name, message)
3 changes: 2 additions & 1 deletion senlin/drivers/openstack_test/__init__.py
Expand Up @@ -12,14 +12,15 @@

from senlin.drivers.openstack import ceilometer_v2
from senlin.drivers.openstack import keystone_v3
from senlin.drivers.openstack import zaqar_v2

from senlin.drivers.openstack_test import cinder_v2
from senlin.drivers.openstack_test import heat_v1
from senlin.drivers.openstack_test import lbaas
from senlin.drivers.openstack_test import mistral_v2
from senlin.drivers.openstack_test import neutron_v2
from senlin.drivers.openstack_test import nova_v2
from senlin.drivers.openstack_test import octavia_v2
from senlin.drivers.openstack_test import zaqar_v2


block_storage = cinder_v2.CinderClient
Expand Down
74 changes: 74 additions & 0 deletions senlin/drivers/openstack_test/zaqar_v2.py
@@ -0,0 +1,74 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from senlin.drivers import base
from senlin.drivers.openstack_test import sdk

FAKE_SUBSCRIPTION_ID = "0d8dbb71-1538-42ac-99fb-bb52d0ad1b6f"
FAKE_MESSAGE_ID = "51db6f78c508f17ddc924357"
FAKE_CLAIM_ID = "51db7067821e727dc24df754"


class ZaqarClient(base.DriverBase):
'''Fake zaqar V2 driver for test.'''

def __init__(self, ctx):
self.fake_subscription = {
"subscription_id": FAKE_SUBSCRIPTION_ID
}
self.fake_claim = {
"messages": [
{
"body": {
"event": "BackupStarted"
},
"age": 239,
"href": "/v2/queues/demoqueue/messages/" +
FAKE_MESSAGE_ID + "?claim_id=" + FAKE_CLAIM_ID,
"ttl": 300
}
]
}
self.fake_message = {
"resources": [
"/v2/queues/demoqueue/messages/" + FAKE_MESSAGE_ID
]
}

def queue_create(self, **attrs):
return

def queue_exists(self, queue_name):
return True

def queue_delete(self, queue, ignore_missing=True):
return

def subscription_create(self, queue_name, **attrs):
return sdk.FakeResourceObject(self.fake_subscription)

def subscription_delete(self, queue_name, subscription,
ignore_missing=True):
return

def claim_create(self, queue_name, **attrs):
return sdk.FakeResourceObject(self.fake_claim)

def claim_delete(self, queue_name, claim, ignore_missing=True):
return

def message_delete(self, queue_name, message, claim_id=None,
ignore_missing=True):
return

def message_post(self, queue_name, message):
return sdk.FakeResourceObject(self.fake_message)

0 comments on commit 3dd9f3e

Please sign in to comment.